diff options
| author | Paul Buetow <paul@buetow.org> | 2026-02-23 23:22:54 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-02-23 23:22:54 +0200 |
| commit | 0bfeccbed6edb106670024373e46b3891463920c (patch) | |
| tree | 49a760dc17913a7359a13e37cbfbc156c4e6e3b0 /internal/statsengine | |
| parent | a8a4f82675ed2df538e9fbf95504d5674a732c52 (diff) | |
task 302: add stats engine orchestration and snapshots
Diffstat (limited to 'internal/statsengine')
| -rw-r--r-- | internal/statsengine/engine.go | 203 | ||||
| -rw-r--r-- | internal/statsengine/engine_test.go | 130 |
2 files changed, 333 insertions, 0 deletions
diff --git a/internal/statsengine/engine.go b/internal/statsengine/engine.go new file mode 100644 index 0000000..79123a5 --- /dev/null +++ b/internal/statsengine/engine.go @@ -0,0 +1,203 @@ +package statsengine + +import ( + "ior/internal/event" + "ior/internal/types" + "math" + "sync" + "time" +) + +const trendWindowSlots = 20 + +// Engine aggregates streaming syscall data into immutable snapshots. +type Engine struct { + mu sync.Mutex + + now func() time.Time + startedAt time.Time + + totalSyscalls uint64 + totalErrors uint64 + totalBytes uint64 + totalReadBytes uint64 + totalWriteBytes uint64 + totalLatency uint64 + totalGap uint64 + + syscalls *syscallAccumulator + files *fileRanker + processes *processAccumulator + latencyHist *histogram + gapHist *histogram + latencySeries *ringTimeSeries + gapSeries *ringTimeSeries + throughputSeries *ringTimeSeries +} + +// NewEngine creates a new stats engine. +func NewEngine(topN int) *Engine { + return newEngineWithClock(topN, time.Now) +} + +func newEngineWithClock(topN int, now func() time.Time) *Engine { + if now == nil { + now = time.Now + } + + return &Engine{ + now: now, + startedAt: now(), + syscalls: newSyscallAccumulator(), + files: newFileRankerWithConfig(topN), + processes: newProcessAccumulator(), + latencyHist: newHistogram(), + gapHist: newHistogram(), + latencySeries: newRingTimeSeries(), + gapSeries: newRingTimeSeries(), + throughputSeries: newRingTimeSeries(), + } +} + +// Ingest updates all aggregates for one event pair. +func (e *Engine) Ingest(pair *event.Pair) { + if e == nil || pair == nil { + return + } + + e.mu.Lock() + defer e.mu.Unlock() + + now := e.now() + e.totalSyscalls++ + e.totalBytes += pair.Bytes + e.totalLatency += pair.Duration + e.totalGap += pair.DurationToPrev + + e.updateErrorAndByteClasses(pair) + e.syscalls.Add(pair) + e.files.Add(pair) + e.processes.Add(pair) + e.latencyHist.Increment(pair.Duration) + e.gapHist.Increment(pair.DurationToPrev) + e.latencySeries.Add(float64(pair.Duration), now) + e.gapSeries.Add(float64(pair.DurationToPrev), now) + e.throughputSeries.Add(float64(pair.Bytes), now) +} + +func (e *Engine) updateErrorAndByteClasses(pair *event.Pair) { + retEv, ok := pair.ExitEv.(*types.RetEvent) + if !ok { + return + } + if retEv.Ret < 0 { + e.totalErrors++ + } + + switch retEv.RetType { + case types.READ_CLASSIFIED: + e.totalReadBytes += pair.Bytes + case types.WRITE_CLASSIFIED: + e.totalWriteBytes += pair.Bytes + case types.TRANSFER_CLASSIFIED: + e.totalReadBytes += pair.Bytes + e.totalWriteBytes += pair.Bytes + } +} + +// Snapshot returns an immutable point-in-time view of all stats. +func (e *Engine) Snapshot() *Snapshot { + if e == nil { + return nil + } + + e.mu.Lock() + defer e.mu.Unlock() + + now := e.now() + elapsed := nonNegativeDuration(now.Sub(e.startedAt)) + rateDiv := elapsed.Seconds() + + latencySeries := e.latencySeries.Values() + gapSeries := e.gapSeries.Values() + throughputSeries := e.throughputSeries.Values() + + snapshot := NewSnapshot( + latencySeries, + gapSeries, + throughputSeries, + e.syscalls.Snapshot(elapsed), + e.files.Snapshot(), + e.processes.Snapshot(elapsed), + e.latencyHist.Snapshot(), + e.gapHist.Snapshot(), + ) + + snapshot.GeneratedAt = now + snapshot.Elapsed = elapsed + snapshot.TotalSyscalls = e.totalSyscalls + snapshot.TotalErrors = e.totalErrors + snapshot.TotalBytes = e.totalBytes + snapshot.SyscallRatePerSec = safeRate(e.totalSyscalls, rateDiv) + snapshot.ErrorRatePerSec = safeRate(e.totalErrors, rateDiv) + snapshot.ReadBytesPerSec = safeRate(e.totalReadBytes, rateDiv) + snapshot.WriteBytesPerSec = safeRate(e.totalWriteBytes, rateDiv) + snapshot.LatencyMeanNs = safeMean(e.totalLatency, e.totalSyscalls) + snapshot.GapMeanNs = safeMean(e.totalGap, e.totalSyscalls) + snapshot.LatencyTrend = detectTrend(latencySeries) + snapshot.GapTrend = detectTrend(gapSeries) + snapshot.ThroughputTrend = detectTrend(throughputSeries) + + return &snapshot +} + +func safeMean(total uint64, count uint64) float64 { + if count == 0 { + return 0 + } + return float64(total) / float64(count) +} + +func nonNegativeDuration(d time.Duration) time.Duration { + if d < 0 { + return 0 + } + return d +} + +func detectTrend(series []float64) Trend { + if len(series) < trendWindowSlots*2 { + return Trend{Direction: TrendStable} + } + + prev := average(series[len(series)-trendWindowSlots*2 : len(series)-trendWindowSlots]) + recent := average(series[len(series)-trendWindowSlots:]) + delta := recent - prev + + if prev == 0 { + if recent == 0 { + return Trend{Direction: TrendStable} + } + return Trend{Direction: TrendRising, DeltaPercent: 100} + } + + deltaPercent := delta / math.Abs(prev) * 100 + if math.Abs(deltaPercent) < 5 { + return Trend{Direction: TrendStable, DeltaPercent: deltaPercent} + } + if deltaPercent > 0 { + return Trend{Direction: TrendRising, DeltaPercent: deltaPercent} + } + return Trend{Direction: TrendFalling, DeltaPercent: deltaPercent} +} + +func average(values []float64) float64 { + if len(values) == 0 { + return 0 + } + sum := 0.0 + for _, v := range values { + sum += v + } + return sum / float64(len(values)) +} diff --git a/internal/statsengine/engine_test.go b/internal/statsengine/engine_test.go new file mode 100644 index 0000000..943fe9c --- /dev/null +++ b/internal/statsengine/engine_test.go @@ -0,0 +1,130 @@ +package statsengine + +import ( + "ior/internal/event" + "ior/internal/file" + "ior/internal/types" + "math" + "testing" + "time" +) + +type fakeClock struct { + now time.Time +} + +func (c *fakeClock) Now() time.Time { + return c.now +} + +func (c *fakeClock) Advance(d time.Duration) { + c.now = c.now.Add(d) +} + +func TestEngineIngestAndSnapshotIntegration(t *testing.T) { + clock := &fakeClock{now: time.Unix(1000, 0)} + engine := newEngineWithClock(2, clock.Now) + + engine.Ingest(newEnginePair(types.SYS_ENTER_READ, 100, types.READ_CLASSIFIED, "proc-a", 1, "/tmp/a", 100, 10, 3)) + clock.Advance(500 * time.Millisecond) + engine.Ingest(newEnginePair(types.SYS_ENTER_WRITE, -1, types.WRITE_CLASSIFIED, "proc-a", 1, "/tmp/a", 50, 20, 5)) + clock.Advance(500 * time.Millisecond) + engine.Ingest(newEnginePair(types.SYS_ENTER_COPY_FILE_RANGE, 80, types.TRANSFER_CLASSIFIED, "proc-b", 2, "/tmp/b", 20, 40, 8)) + clock.Advance(1 * time.Second) + + snap := engine.Snapshot() + if snap == nil { + t.Fatalf("expected snapshot") + } + + if snap.TotalSyscalls != 3 || snap.TotalErrors != 1 || snap.TotalBytes != 170 { + t.Fatalf("unexpected totals: syscalls=%d errors=%d bytes=%d", snap.TotalSyscalls, snap.TotalErrors, snap.TotalBytes) + } + if snap.LatencyMeanNs != (10+20+40)/3.0 { + t.Fatalf("unexpected latency mean: %v", snap.LatencyMeanNs) + } + if snap.GapMeanNs != (3+5+8)/3.0 { + t.Fatalf("unexpected gap mean: %v", snap.GapMeanNs) + } + + if math.Abs(snap.SyscallRatePerSec-1.5) > 1e-9 { + t.Fatalf("unexpected syscall rate: %v", snap.SyscallRatePerSec) + } + if math.Abs(snap.ErrorRatePerSec-0.5) > 1e-9 { + t.Fatalf("unexpected error rate: %v", snap.ErrorRatePerSec) + } + if math.Abs(snap.ReadBytesPerSec-60.0) > 1e-9 { + t.Fatalf("unexpected read bytes rate: %v", snap.ReadBytesPerSec) + } + if math.Abs(snap.WriteBytesPerSec-35.0) > 1e-9 { + t.Fatalf("unexpected write bytes rate: %v", snap.WriteBytesPerSec) + } + + if len(snap.Syscalls()) != 3 { + t.Fatalf("expected 3 syscall rows, got %d", len(snap.Syscalls())) + } + if len(snap.Files()) != 2 { + t.Fatalf("expected top 2 files due to topN=2, got %d", len(snap.Files())) + } + if len(snap.Processes()) != 2 { + t.Fatalf("expected 2 process rows, got %d", len(snap.Processes())) + } + if snap.LatencyHistogram.Total != 3 || snap.GapHistogram.Total != 3 { + t.Fatalf("unexpected histogram totals: latency=%d gap=%d", snap.LatencyHistogram.Total, snap.GapHistogram.Total) + } +} + +func TestEngineSnapshotWithNoEvents(t *testing.T) { + clock := &fakeClock{now: time.Unix(2000, 0)} + engine := newEngineWithClock(10, clock.Now) + + snap := engine.Snapshot() + if snap == nil { + t.Fatalf("expected snapshot") + } + if snap.TotalSyscalls != 0 || snap.TotalErrors != 0 || snap.TotalBytes != 0 { + t.Fatalf("expected zero totals, got %+v", snap) + } + if len(snap.Syscalls()) != 0 || len(snap.Files()) != 0 || len(snap.Processes()) != 0 { + t.Fatalf("expected empty rows in zero snapshot") + } +} + +func TestEngineTrendDetection(t *testing.T) { + if got := detectTrend(make([]float64, trendWindowSlots*2)); got.Direction != TrendStable { + t.Fatalf("expected stable for flat data, got %+v", got) + } + + series := make([]float64, trendWindowSlots*2) + for i := 0; i < trendWindowSlots; i++ { + series[i] = 10 + } + for i := trendWindowSlots; i < trendWindowSlots*2; i++ { + series[i] = 30 + } + if got := detectTrend(series); got.Direction != TrendRising { + t.Fatalf("expected rising trend, got %+v", got) + } + + for i := 0; i < trendWindowSlots; i++ { + series[i] = 40 + } + for i := trendWindowSlots; i < trendWindowSlots*2; i++ { + series[i] = 10 + } + if got := detectTrend(series); got.Direction != TrendFalling { + t.Fatalf("expected falling trend, got %+v", got) + } +} + +func newEnginePair(traceID types.TraceId, ret int64, retType uint32, comm string, pid uint32, path string, bytes uint64, duration uint64, gap uint64) *event.Pair { + return &event.Pair{ + EnterEv: &types.RetEvent{TraceId: traceID, Pid: pid}, + ExitEv: &types.RetEvent{TraceId: traceID, Pid: pid, Ret: ret, RetType: retType}, + Comm: comm, + Duration: duration, + DurationToPrev: gap, + Bytes: bytes, + File: file.NewFd(3, path, -1), + } +} |
