summaryrefslogtreecommitdiff
path: root/internal/statsengine
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-02-23 23:22:54 +0200
committerPaul Buetow <paul@buetow.org>2026-02-23 23:22:54 +0200
commit0bfeccbed6edb106670024373e46b3891463920c (patch)
tree49a760dc17913a7359a13e37cbfbc156c4e6e3b0 /internal/statsengine
parenta8a4f82675ed2df538e9fbf95504d5674a732c52 (diff)
task 302: add stats engine orchestration and snapshots
Diffstat (limited to 'internal/statsengine')
-rw-r--r--internal/statsengine/engine.go203
-rw-r--r--internal/statsengine/engine_test.go130
2 files changed, 333 insertions, 0 deletions
diff --git a/internal/statsengine/engine.go b/internal/statsengine/engine.go
new file mode 100644
index 0000000..79123a5
--- /dev/null
+++ b/internal/statsengine/engine.go
@@ -0,0 +1,203 @@
+package statsengine
+
+import (
+ "ior/internal/event"
+ "ior/internal/types"
+ "math"
+ "sync"
+ "time"
+)
+
+const trendWindowSlots = 20
+
+// Engine aggregates streaming syscall data into immutable snapshots.
+type Engine struct {
+ mu sync.Mutex
+
+ now func() time.Time
+ startedAt time.Time
+
+ totalSyscalls uint64
+ totalErrors uint64
+ totalBytes uint64
+ totalReadBytes uint64
+ totalWriteBytes uint64
+ totalLatency uint64
+ totalGap uint64
+
+ syscalls *syscallAccumulator
+ files *fileRanker
+ processes *processAccumulator
+ latencyHist *histogram
+ gapHist *histogram
+ latencySeries *ringTimeSeries
+ gapSeries *ringTimeSeries
+ throughputSeries *ringTimeSeries
+}
+
+// NewEngine creates a new stats engine.
+func NewEngine(topN int) *Engine {
+ return newEngineWithClock(topN, time.Now)
+}
+
+func newEngineWithClock(topN int, now func() time.Time) *Engine {
+ if now == nil {
+ now = time.Now
+ }
+
+ return &Engine{
+ now: now,
+ startedAt: now(),
+ syscalls: newSyscallAccumulator(),
+ files: newFileRankerWithConfig(topN),
+ processes: newProcessAccumulator(),
+ latencyHist: newHistogram(),
+ gapHist: newHistogram(),
+ latencySeries: newRingTimeSeries(),
+ gapSeries: newRingTimeSeries(),
+ throughputSeries: newRingTimeSeries(),
+ }
+}
+
+// Ingest updates all aggregates for one event pair.
+func (e *Engine) Ingest(pair *event.Pair) {
+ if e == nil || pair == nil {
+ return
+ }
+
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ now := e.now()
+ e.totalSyscalls++
+ e.totalBytes += pair.Bytes
+ e.totalLatency += pair.Duration
+ e.totalGap += pair.DurationToPrev
+
+ e.updateErrorAndByteClasses(pair)
+ e.syscalls.Add(pair)
+ e.files.Add(pair)
+ e.processes.Add(pair)
+ e.latencyHist.Increment(pair.Duration)
+ e.gapHist.Increment(pair.DurationToPrev)
+ e.latencySeries.Add(float64(pair.Duration), now)
+ e.gapSeries.Add(float64(pair.DurationToPrev), now)
+ e.throughputSeries.Add(float64(pair.Bytes), now)
+}
+
+func (e *Engine) updateErrorAndByteClasses(pair *event.Pair) {
+ retEv, ok := pair.ExitEv.(*types.RetEvent)
+ if !ok {
+ return
+ }
+ if retEv.Ret < 0 {
+ e.totalErrors++
+ }
+
+ switch retEv.RetType {
+ case types.READ_CLASSIFIED:
+ e.totalReadBytes += pair.Bytes
+ case types.WRITE_CLASSIFIED:
+ e.totalWriteBytes += pair.Bytes
+ case types.TRANSFER_CLASSIFIED:
+ e.totalReadBytes += pair.Bytes
+ e.totalWriteBytes += pair.Bytes
+ }
+}
+
+// Snapshot returns an immutable point-in-time view of all stats.
+func (e *Engine) Snapshot() *Snapshot {
+ if e == nil {
+ return nil
+ }
+
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ now := e.now()
+ elapsed := nonNegativeDuration(now.Sub(e.startedAt))
+ rateDiv := elapsed.Seconds()
+
+ latencySeries := e.latencySeries.Values()
+ gapSeries := e.gapSeries.Values()
+ throughputSeries := e.throughputSeries.Values()
+
+ snapshot := NewSnapshot(
+ latencySeries,
+ gapSeries,
+ throughputSeries,
+ e.syscalls.Snapshot(elapsed),
+ e.files.Snapshot(),
+ e.processes.Snapshot(elapsed),
+ e.latencyHist.Snapshot(),
+ e.gapHist.Snapshot(),
+ )
+
+ snapshot.GeneratedAt = now
+ snapshot.Elapsed = elapsed
+ snapshot.TotalSyscalls = e.totalSyscalls
+ snapshot.TotalErrors = e.totalErrors
+ snapshot.TotalBytes = e.totalBytes
+ snapshot.SyscallRatePerSec = safeRate(e.totalSyscalls, rateDiv)
+ snapshot.ErrorRatePerSec = safeRate(e.totalErrors, rateDiv)
+ snapshot.ReadBytesPerSec = safeRate(e.totalReadBytes, rateDiv)
+ snapshot.WriteBytesPerSec = safeRate(e.totalWriteBytes, rateDiv)
+ snapshot.LatencyMeanNs = safeMean(e.totalLatency, e.totalSyscalls)
+ snapshot.GapMeanNs = safeMean(e.totalGap, e.totalSyscalls)
+ snapshot.LatencyTrend = detectTrend(latencySeries)
+ snapshot.GapTrend = detectTrend(gapSeries)
+ snapshot.ThroughputTrend = detectTrend(throughputSeries)
+
+ return &snapshot
+}
+
+func safeMean(total uint64, count uint64) float64 {
+ if count == 0 {
+ return 0
+ }
+ return float64(total) / float64(count)
+}
+
+func nonNegativeDuration(d time.Duration) time.Duration {
+ if d < 0 {
+ return 0
+ }
+ return d
+}
+
+func detectTrend(series []float64) Trend {
+ if len(series) < trendWindowSlots*2 {
+ return Trend{Direction: TrendStable}
+ }
+
+ prev := average(series[len(series)-trendWindowSlots*2 : len(series)-trendWindowSlots])
+ recent := average(series[len(series)-trendWindowSlots:])
+ delta := recent - prev
+
+ if prev == 0 {
+ if recent == 0 {
+ return Trend{Direction: TrendStable}
+ }
+ return Trend{Direction: TrendRising, DeltaPercent: 100}
+ }
+
+ deltaPercent := delta / math.Abs(prev) * 100
+ if math.Abs(deltaPercent) < 5 {
+ return Trend{Direction: TrendStable, DeltaPercent: deltaPercent}
+ }
+ if deltaPercent > 0 {
+ return Trend{Direction: TrendRising, DeltaPercent: deltaPercent}
+ }
+ return Trend{Direction: TrendFalling, DeltaPercent: deltaPercent}
+}
+
+func average(values []float64) float64 {
+ if len(values) == 0 {
+ return 0
+ }
+ sum := 0.0
+ for _, v := range values {
+ sum += v
+ }
+ return sum / float64(len(values))
+}
diff --git a/internal/statsengine/engine_test.go b/internal/statsengine/engine_test.go
new file mode 100644
index 0000000..943fe9c
--- /dev/null
+++ b/internal/statsengine/engine_test.go
@@ -0,0 +1,130 @@
+package statsengine
+
+import (
+ "ior/internal/event"
+ "ior/internal/file"
+ "ior/internal/types"
+ "math"
+ "testing"
+ "time"
+)
+
+type fakeClock struct {
+ now time.Time
+}
+
+func (c *fakeClock) Now() time.Time {
+ return c.now
+}
+
+func (c *fakeClock) Advance(d time.Duration) {
+ c.now = c.now.Add(d)
+}
+
+func TestEngineIngestAndSnapshotIntegration(t *testing.T) {
+ clock := &fakeClock{now: time.Unix(1000, 0)}
+ engine := newEngineWithClock(2, clock.Now)
+
+ engine.Ingest(newEnginePair(types.SYS_ENTER_READ, 100, types.READ_CLASSIFIED, "proc-a", 1, "/tmp/a", 100, 10, 3))
+ clock.Advance(500 * time.Millisecond)
+ engine.Ingest(newEnginePair(types.SYS_ENTER_WRITE, -1, types.WRITE_CLASSIFIED, "proc-a", 1, "/tmp/a", 50, 20, 5))
+ clock.Advance(500 * time.Millisecond)
+ engine.Ingest(newEnginePair(types.SYS_ENTER_COPY_FILE_RANGE, 80, types.TRANSFER_CLASSIFIED, "proc-b", 2, "/tmp/b", 20, 40, 8))
+ clock.Advance(1 * time.Second)
+
+ snap := engine.Snapshot()
+ if snap == nil {
+ t.Fatalf("expected snapshot")
+ }
+
+ if snap.TotalSyscalls != 3 || snap.TotalErrors != 1 || snap.TotalBytes != 170 {
+ t.Fatalf("unexpected totals: syscalls=%d errors=%d bytes=%d", snap.TotalSyscalls, snap.TotalErrors, snap.TotalBytes)
+ }
+ if snap.LatencyMeanNs != (10+20+40)/3.0 {
+ t.Fatalf("unexpected latency mean: %v", snap.LatencyMeanNs)
+ }
+ if snap.GapMeanNs != (3+5+8)/3.0 {
+ t.Fatalf("unexpected gap mean: %v", snap.GapMeanNs)
+ }
+
+ if math.Abs(snap.SyscallRatePerSec-1.5) > 1e-9 {
+ t.Fatalf("unexpected syscall rate: %v", snap.SyscallRatePerSec)
+ }
+ if math.Abs(snap.ErrorRatePerSec-0.5) > 1e-9 {
+ t.Fatalf("unexpected error rate: %v", snap.ErrorRatePerSec)
+ }
+ if math.Abs(snap.ReadBytesPerSec-60.0) > 1e-9 {
+ t.Fatalf("unexpected read bytes rate: %v", snap.ReadBytesPerSec)
+ }
+ if math.Abs(snap.WriteBytesPerSec-35.0) > 1e-9 {
+ t.Fatalf("unexpected write bytes rate: %v", snap.WriteBytesPerSec)
+ }
+
+ if len(snap.Syscalls()) != 3 {
+ t.Fatalf("expected 3 syscall rows, got %d", len(snap.Syscalls()))
+ }
+ if len(snap.Files()) != 2 {
+ t.Fatalf("expected top 2 files due to topN=2, got %d", len(snap.Files()))
+ }
+ if len(snap.Processes()) != 2 {
+ t.Fatalf("expected 2 process rows, got %d", len(snap.Processes()))
+ }
+ if snap.LatencyHistogram.Total != 3 || snap.GapHistogram.Total != 3 {
+ t.Fatalf("unexpected histogram totals: latency=%d gap=%d", snap.LatencyHistogram.Total, snap.GapHistogram.Total)
+ }
+}
+
+func TestEngineSnapshotWithNoEvents(t *testing.T) {
+ clock := &fakeClock{now: time.Unix(2000, 0)}
+ engine := newEngineWithClock(10, clock.Now)
+
+ snap := engine.Snapshot()
+ if snap == nil {
+ t.Fatalf("expected snapshot")
+ }
+ if snap.TotalSyscalls != 0 || snap.TotalErrors != 0 || snap.TotalBytes != 0 {
+ t.Fatalf("expected zero totals, got %+v", snap)
+ }
+ if len(snap.Syscalls()) != 0 || len(snap.Files()) != 0 || len(snap.Processes()) != 0 {
+ t.Fatalf("expected empty rows in zero snapshot")
+ }
+}
+
+func TestEngineTrendDetection(t *testing.T) {
+ if got := detectTrend(make([]float64, trendWindowSlots*2)); got.Direction != TrendStable {
+ t.Fatalf("expected stable for flat data, got %+v", got)
+ }
+
+ series := make([]float64, trendWindowSlots*2)
+ for i := 0; i < trendWindowSlots; i++ {
+ series[i] = 10
+ }
+ for i := trendWindowSlots; i < trendWindowSlots*2; i++ {
+ series[i] = 30
+ }
+ if got := detectTrend(series); got.Direction != TrendRising {
+ t.Fatalf("expected rising trend, got %+v", got)
+ }
+
+ for i := 0; i < trendWindowSlots; i++ {
+ series[i] = 40
+ }
+ for i := trendWindowSlots; i < trendWindowSlots*2; i++ {
+ series[i] = 10
+ }
+ if got := detectTrend(series); got.Direction != TrendFalling {
+ t.Fatalf("expected falling trend, got %+v", got)
+ }
+}
+
+func newEnginePair(traceID types.TraceId, ret int64, retType uint32, comm string, pid uint32, path string, bytes uint64, duration uint64, gap uint64) *event.Pair {
+ return &event.Pair{
+ EnterEv: &types.RetEvent{TraceId: traceID, Pid: pid},
+ ExitEv: &types.RetEvent{TraceId: traceID, Pid: pid, Ret: ret, RetType: retType},
+ Comm: comm,
+ Duration: duration,
+ DurationToPrev: gap,
+ Bytes: bytes,
+ File: file.NewFd(3, path, -1),
+ }
+}