summaryrefslogtreecommitdiff
path: root/internal/statsengine
diff options
context:
space:
mode:
Diffstat (limited to 'internal/statsengine')
-rw-r--r--internal/statsengine/engine.go62
-rw-r--r--internal/statsengine/engine_reset_test.go10
-rw-r--r--internal/statsengine/engine_test.go10
-rw-r--r--internal/statsengine/filerank.go15
-rw-r--r--internal/statsengine/histogram.go15
-rw-r--r--internal/statsengine/process.go15
-rw-r--r--internal/statsengine/syscall.go15
7 files changed, 110 insertions, 32 deletions
diff --git a/internal/statsengine/engine.go b/internal/statsengine/engine.go
index 02aee37..f9602a9 100644
--- a/internal/statsengine/engine.go
+++ b/internal/statsengine/engine.go
@@ -5,6 +5,8 @@ import (
"sync"
"time"
+ "golang.org/x/sync/errgroup"
+
"ior/internal/event"
"ior/internal/types"
)
@@ -206,22 +208,45 @@ func (e *Engine) captureSnapshotInputs() snapshotInputs {
}
// buildSubSnapshots runs all five per-category snapshot builders concurrently
-// and returns their results bundled together.
-func buildSubSnapshots(in snapshotInputs, elapsed time.Duration) subSnapshots {
+// using errgroup so that any error from a sub-builder is captured and returned
+// to the caller instead of being silently dropped.
+func buildSubSnapshots(in snapshotInputs, elapsed time.Duration) (subSnapshots, error) {
var (
ss subSnapshots
- wg sync.WaitGroup
+ eg errgroup.Group
)
- wg.Add(5)
- go func() { defer wg.Done(); ss.syscalls = buildSyscallSnapshots(in.syscalls, elapsed) }()
- go func() { defer wg.Done(); ss.files = buildFileSnapshots(in.files) }()
- go func() { defer wg.Done(); ss.processes = buildProcessSnapshots(in.processes, elapsed) }()
- go func() { defer wg.Done(); ss.latencyHist = buildHistogramSnapshot(in.latencyHist) }()
- go func() { defer wg.Done(); ss.gapHist = buildHistogramSnapshot(in.gapHist) }()
- wg.Wait()
+ eg.Go(func() error {
+ var err error
+ ss.syscalls, err = buildSyscallSnapshots(in.syscalls, elapsed)
+ return err
+ })
+ eg.Go(func() error {
+ var err error
+ ss.files, err = buildFileSnapshots(in.files)
+ return err
+ })
+ eg.Go(func() error {
+ var err error
+ ss.processes, err = buildProcessSnapshots(in.processes, elapsed)
+ return err
+ })
+ eg.Go(func() error {
+ var err error
+ ss.latencyHist, err = buildHistogramSnapshot(in.latencyHist)
+ return err
+ })
+ eg.Go(func() error {
+ var err error
+ ss.gapHist, err = buildHistogramSnapshot(in.gapHist)
+ return err
+ })
+
+ if err := eg.Wait(); err != nil {
+ return subSnapshots{}, err
+ }
- return ss
+ return ss, nil
}
// populateSnapshotFields fills in the scalar fields of the snapshot from the
@@ -247,15 +272,20 @@ func populateSnapshotFields(snap *Snapshot, in snapshotInputs, elapsed time.Dura
// Snapshot returns an immutable point-in-time view of all stats.
// It captures engine state under the lock, then builds sub-snapshots
-// concurrently, and finally assembles the result without holding the lock.
-func (e *Engine) Snapshot() *Snapshot {
+// concurrently via errgroup, and finally assembles the result without holding
+// the lock. An error is returned if any sub-builder fails.
+func (e *Engine) Snapshot() (*Snapshot, error) {
if e == nil {
- return nil
+ return nil, nil
}
in := e.captureSnapshotInputs()
elapsed := nonNegativeDuration(in.now.Sub(in.startedAt))
- ss := buildSubSnapshots(in, elapsed)
+
+ ss, err := buildSubSnapshots(in, elapsed)
+ if err != nil {
+ return nil, err
+ }
snap := NewSnapshot(
in.latencySeries, in.gapSeries, in.throughputSeries,
@@ -264,7 +294,7 @@ func (e *Engine) Snapshot() *Snapshot {
)
populateSnapshotFields(&snap, in, elapsed)
- return &snap
+ return &snap, nil
}
func safeMean(total uint64, count uint64) float64 {
diff --git a/internal/statsengine/engine_reset_test.go b/internal/statsengine/engine_reset_test.go
index e798686..c09c059 100644
--- a/internal/statsengine/engine_reset_test.go
+++ b/internal/statsengine/engine_reset_test.go
@@ -10,13 +10,19 @@ import (
func TestEngineResetClearsAccumulatedStats(t *testing.T) {
e := NewEngine(8)
e.Ingest(newEnginePair(types.SYS_ENTER_READ, 7, types.READ_CLASSIFIED, "test", 1, "/tmp/a", 7, 1000, 50))
- before := e.Snapshot()
+ before, err := e.Snapshot()
+ if err != nil {
+ t.Fatalf("unexpected snapshot error: %v", err)
+ }
if before.TotalSyscalls == 0 {
t.Fatalf("expected non-zero totals before reset")
}
e.Reset()
- after := e.Snapshot()
+ after, err := e.Snapshot()
+ if err != nil {
+ t.Fatalf("unexpected snapshot error after reset: %v", err)
+ }
if after.TotalSyscalls != 0 || after.TotalBytes != 0 || after.TotalErrors != 0 {
t.Fatalf("expected totals cleared after reset, got %+v", after)
}
diff --git a/internal/statsengine/engine_test.go b/internal/statsengine/engine_test.go
index 7ba8c3a..f714844 100644
--- a/internal/statsengine/engine_test.go
+++ b/internal/statsengine/engine_test.go
@@ -33,7 +33,10 @@ func TestEngineIngestAndSnapshotIntegration(t *testing.T) {
engine.Ingest(newEnginePair(types.SYS_ENTER_COPY_FILE_RANGE, 80, types.TRANSFER_CLASSIFIED, "proc-b", 2, "/tmp/b", 20, 40, 8))
clock.Advance(1 * time.Second)
- snap := engine.Snapshot()
+ snap, err := engine.Snapshot()
+ if err != nil {
+ t.Fatalf("unexpected snapshot error: %v", err)
+ }
if snap == nil {
t.Fatalf("expected snapshot")
}
@@ -79,7 +82,10 @@ func TestEngineSnapshotWithNoEvents(t *testing.T) {
clock := &fakeClock{now: time.Unix(2000, 0)}
engine := newEngineWithClock(10, clock.Now)
- snap := engine.Snapshot()
+ snap, err := engine.Snapshot()
+ if err != nil {
+ t.Fatalf("unexpected snapshot error: %v", err)
+ }
if snap == nil {
t.Fatalf("expected snapshot")
}
diff --git a/internal/statsengine/filerank.go b/internal/statsengine/filerank.go
index ef43c6e..a397381 100644
--- a/internal/statsengine/filerank.go
+++ b/internal/statsengine/filerank.go
@@ -93,12 +93,18 @@ func (r *fileRanker) Add(pair *event.Pair) {
r.compactIfNeeded()
}
+// Snapshot returns a slice of FileSnapshots for all tracked files.
+// It panics on build error, which should never happen for a valid ranker.
func (r *fileRanker) Snapshot() []FileSnapshot {
if r == nil {
return nil
}
- return buildFileSnapshots(r.snapshotInputs())
+ snap, err := buildFileSnapshots(r.snapshotInputs())
+ if err != nil {
+ panic("buildFileSnapshots: " + err.Error())
+ }
+ return snap
}
func (r *fileRanker) snapshotInputs() []fileSnapshotInput {
@@ -119,7 +125,10 @@ func (r *fileRanker) snapshotInputs() []fileSnapshotInput {
return inputs
}
-func buildFileSnapshots(inputs []fileSnapshotInput) []FileSnapshot {
+// buildFileSnapshots converts raw file ranker inputs into sorted FileSnapshot
+// slices. The error return is reserved for future validation; currently this
+// function always succeeds.
+func buildFileSnapshots(inputs []fileSnapshotInput) ([]FileSnapshot, error) {
out := make([]FileSnapshot, 0, len(inputs))
for _, in := range inputs {
out = append(out, in.toSnapshot())
@@ -130,7 +139,7 @@ func buildFileSnapshots(inputs []fileSnapshotInput) []FileSnapshot {
}
return cmp.Compare(a.Path, b.Path)
})
- return out
+ return out, nil
}
func (r *fileRanker) addBytes(stats *fileRankStats, pair *event.Pair) {
diff --git a/internal/statsengine/histogram.go b/internal/statsengine/histogram.go
index 42460ea..4a5e3b4 100644
--- a/internal/statsengine/histogram.go
+++ b/internal/statsengine/histogram.go
@@ -47,12 +47,18 @@ func (h *histogram) Increment(durationNs uint64) {
h.total++
}
+// Snapshot returns a HistogramSnapshot of the current histogram state.
+// It panics on build error, which should never happen for a valid histogram.
func (h *histogram) Snapshot() HistogramSnapshot {
if h == nil {
return NewHistogramSnapshot(0, nil)
}
- return buildHistogramSnapshot(h.snapshotInputs())
+ snap, err := buildHistogramSnapshot(h.snapshotInputs())
+ if err != nil {
+ panic("buildHistogramSnapshot: " + err.Error())
+ }
+ return snap
}
func (h *histogram) snapshotInputs() histogramSnapshotInput {
@@ -65,7 +71,10 @@ func (h *histogram) snapshotInputs() histogramSnapshotInput {
}
}
-func buildHistogramSnapshot(in histogramSnapshotInput) HistogramSnapshot {
+// buildHistogramSnapshot converts a histogramSnapshotInput into a
+// HistogramSnapshot. The error return is reserved for future validation;
+// currently this function always succeeds.
+func buildHistogramSnapshot(in histogramSnapshotInput) (HistogramSnapshot, error) {
buckets := make([]HistogramBucketSnapshot, 0, histogramBucketCount)
for i := 0; i < histogramBucketCount; i++ {
lower, upper := histogramBucketRange(i)
@@ -77,7 +86,7 @@ func buildHistogramSnapshot(in histogramSnapshotInput) HistogramSnapshot {
})
}
- return NewHistogramSnapshot(in.total, buckets)
+ return NewHistogramSnapshot(in.total, buckets), nil
}
func histogramBucketIndex(durationNs uint64) int {
diff --git a/internal/statsengine/process.go b/internal/statsengine/process.go
index b7eb6e7..a52be38 100644
--- a/internal/statsengine/process.go
+++ b/internal/statsengine/process.go
@@ -84,12 +84,18 @@ func (a *processAccumulator) Add(pair *event.Pair) {
a.compactIfNeeded()
}
+// Snapshot returns a slice of ProcessSnapshots for all tracked processes.
+// It panics on build error, which should never happen for a valid accumulator.
func (a *processAccumulator) Snapshot(elapsed time.Duration) []ProcessSnapshot {
if a == nil {
return nil
}
- return buildProcessSnapshots(a.snapshotInputs(), elapsed)
+ snap, err := buildProcessSnapshots(a.snapshotInputs(), elapsed)
+ if err != nil {
+ panic("buildProcessSnapshots: " + err.Error())
+ }
+ return snap
}
func (a *processAccumulator) snapshotInputs() []processSnapshotInput {
@@ -110,7 +116,10 @@ func (a *processAccumulator) snapshotInputs() []processSnapshotInput {
return inputs
}
-func buildProcessSnapshots(inputs []processSnapshotInput, elapsed time.Duration) []ProcessSnapshot {
+// buildProcessSnapshots converts raw process accumulator inputs into sorted
+// ProcessSnapshot slices. The error return is reserved for future validation;
+// currently this function always succeeds.
+func buildProcessSnapshots(inputs []processSnapshotInput, elapsed time.Duration) ([]ProcessSnapshot, error) {
rateDiv := elapsed.Seconds()
result := make([]ProcessSnapshot, 0, len(inputs))
for _, in := range inputs {
@@ -125,7 +134,7 @@ func buildProcessSnapshots(inputs []processSnapshotInput, elapsed time.Duration)
}
return cmp.Compare(a.PID, b.PID)
})
- return result
+ return result, nil
}
func (a *processAccumulator) compactIfNeeded() {
diff --git a/internal/statsengine/syscall.go b/internal/statsengine/syscall.go
index 2ef929a..d58e8c9 100644
--- a/internal/statsengine/syscall.go
+++ b/internal/statsengine/syscall.go
@@ -99,12 +99,18 @@ func (a *syscallAccumulator) Add(pair *event.Pair) {
}
}
+// Snapshot returns a slice of SyscallSnapshots for all tracked syscalls.
+// It panics on build error, which should never happen for a valid accumulator.
func (a *syscallAccumulator) Snapshot(elapsed time.Duration) []SyscallSnapshot {
if a == nil {
return nil
}
- return buildSyscallSnapshots(a.snapshotInputs(), elapsed)
+ snap, err := buildSyscallSnapshots(a.snapshotInputs(), elapsed)
+ if err != nil {
+ panic("buildSyscallSnapshots: " + err.Error())
+ }
+ return snap
}
func (a *syscallAccumulator) snapshotInputs() []syscallSnapshotInput {
@@ -132,7 +138,10 @@ func (a *syscallAccumulator) snapshotInputs() []syscallSnapshotInput {
return inputs
}
-func buildSyscallSnapshots(inputs []syscallSnapshotInput, elapsed time.Duration) []SyscallSnapshot {
+// buildSyscallSnapshots converts raw syscall accumulator inputs into sorted
+// SyscallSnapshot slices. The error return is reserved for future validation;
+// currently this function always succeeds.
+func buildSyscallSnapshots(inputs []syscallSnapshotInput, elapsed time.Duration) ([]SyscallSnapshot, error) {
rateDiv := elapsed.Seconds()
result := make([]SyscallSnapshot, 0, len(inputs))
for _, in := range inputs {
@@ -144,7 +153,7 @@ func buildSyscallSnapshots(inputs []syscallSnapshotInput, elapsed time.Duration)
}
return cmp.Compare(a.Name, b.Name)
})
- return result
+ return result, nil
}
func (s *syscallStats) updateMinMax(duration uint64) {