summaryrefslogtreecommitdiff
path: root/internal/stats/stats.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/stats/stats.go')
-rw-r--r--internal/stats/stats.go38
1 files changed, 25 insertions, 13 deletions
diff --git a/internal/stats/stats.go b/internal/stats/stats.go
index 3a9a9ab..a8390ef 100644
--- a/internal/stats/stats.go
+++ b/internal/stats/stats.go
@@ -14,7 +14,6 @@ import (
"path/filepath"
"strconv"
"sync/atomic"
- "syscall"
"time"
)
@@ -27,6 +26,8 @@ const (
var windowSeconds int64 = int64(defaultWindow.Seconds())
+var errLockWouldBlock = errors.New("stats: lock would block")
+
// SetWindow sets the sliding window used for pruning and aggregation.
func SetWindow(d time.Duration) {
if d < time.Second {
@@ -88,19 +89,11 @@ func Update(ctx context.Context, provider, model string, sentBytes, recvBytes in
return err
}
defer f.Close()
- // Acquire exclusive flock; best-effort ctx support via polling
- for {
- if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err == nil {
- defer syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
- break
- }
- // Wait a bit or exit if context canceled
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-time.After(5 * time.Millisecond):
- }
+ unlock, err := acquireFileLock(ctx, f)
+ if err != nil {
+ return err
}
+ defer func() { _ = unlock() }()
// Read existing file (if any)
path := filepath.Join(dir, fileName)
var sf File
@@ -158,6 +151,25 @@ func Update(ctx context.Context, provider, model string, sentBytes, recvBytes in
return nil
}
+func acquireFileLock(ctx context.Context, f *os.File) (func() error, error) {
+ fd := f.Fd()
+ for {
+ err := tryLockFile(fd)
+ if err == nil {
+ return func() error { return unlockFile(fd) }, nil
+ }
+ if errors.Is(err, errLockWouldBlock) {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case <-time.After(5 * time.Millisecond):
+ }
+ continue
+ }
+ return nil, err
+ }
+}
+
// Snapshot reads and aggregates events within the configured window.
func TakeSnapshot() (Snapshot, error) {
dir, err := CacheDir()