diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-20 11:38:19 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-20 11:38:19 +0300 |
| commit | 9310b54d439d4a1a8d4d337987aa63884df0af76 (patch) | |
| tree | c6fb38085891a04ce81672f977af316a2e96b2fd /internal/statsengine | |
| parent | 5fd613562e2aa2ab3aac3349f44db88330046c1c (diff) | |
feat: add syscall aggregate sampling infrastructure (task 17)
Diffstat (limited to 'internal/statsengine')
| -rw-r--r-- | internal/statsengine/aggregate.go | 46 | ||||
| -rw-r--r-- | internal/statsengine/aggregate_test.go | 57 | ||||
| -rw-r--r-- | internal/statsengine/family.go | 24 | ||||
| -rw-r--r-- | internal/statsengine/histogram.go | 10 | ||||
| -rw-r--r-- | internal/statsengine/syscall.go | 23 |
5 files changed, 160 insertions, 0 deletions
diff --git a/internal/statsengine/aggregate.go b/internal/statsengine/aggregate.go new file mode 100644 index 0000000..03dad5d --- /dev/null +++ b/internal/statsengine/aggregate.go @@ -0,0 +1,46 @@ +package statsengine + +import "ior/internal/types" + +// SyscallAggregate is the kernel-side aggregate for one sys_enter trace ID. +type SyscallAggregate struct { + TraceID types.TraceId + Count uint64 + Errors uint64 + TotalLatencyNs uint64 + MinLatencyNs uint64 + MaxLatencyNs uint64 + LatencyHistogramNs [8]uint64 +} + +// IngestSyscallAggregates folds kernel aggregate rows into the engine. +func (e *Engine) IngestSyscallAggregates(rows []SyscallAggregate) { + if e == nil || len(rows) == 0 { + return + } + + e.mu.Lock() + defer e.mu.Unlock() + + now := e.now() + var batchLatency uint64 + var batchCount uint64 + for _, row := range rows { + if row.Count == 0 { + continue + } + + e.totalSyscalls += row.Count + e.totalErrors += row.Errors + e.totalLatency += row.TotalLatencyNs + e.syscalls.AddAggregate(row) + e.families.AddAggregate(row) + e.latencyHist.AddBucketCounts(row.LatencyHistogramNs) + + batchLatency += row.TotalLatencyNs + batchCount += row.Count + } + if batchCount > 0 { + e.latencySeries.Add(float64(batchLatency)/float64(batchCount), now) + } +} diff --git a/internal/statsengine/aggregate_test.go b/internal/statsengine/aggregate_test.go new file mode 100644 index 0000000..0d11214 --- /dev/null +++ b/internal/statsengine/aggregate_test.go @@ -0,0 +1,57 @@ +package statsengine + +import ( + "testing" + + "ior/internal/types" +) + +func TestIngestSyscallAggregatesUpdatesSnapshot(t *testing.T) { + engine := NewEngine(DefaultTopN) + engine.IngestSyscallAggregates([]SyscallAggregate{ + { + TraceID: types.SYS_ENTER_FUTEX, + Count: 3, + Errors: 1, + TotalLatencyNs: 90, + MinLatencyNs: 10, + MaxLatencyNs: 50, + LatencyHistogramNs: [8]uint64{ + 1, 1, 1, 0, 0, 0, 0, 0, + }, + }, + }) + + snap, err := engine.Snapshot() + if err != nil { + t.Fatalf("snapshot error: %v", err) + } + if snap.TotalSyscalls != 3 { + t.Fatalf("TotalSyscalls = %d, want 3", snap.TotalSyscalls) + } + if snap.TotalErrors != 1 { + t.Fatalf("TotalErrors = %d, want 1", snap.TotalErrors) + } + if snap.LatencyHistogram.Total != 3 { + t.Fatalf("LatencyHistogram.Total = %d, want 3", snap.LatencyHistogram.Total) + } + + syscalls := snap.Syscalls() + var futexRow *SyscallSnapshot + for i := range syscalls { + row := &syscalls[i] + if row.TraceID == types.SYS_ENTER_FUTEX { + futexRow = row + break + } + } + if futexRow == nil { + t.Fatal("expected futex syscall row") + } + if futexRow.Count != 3 || futexRow.Errors != 1 { + t.Fatalf("futex row = %+v, want count=3 errors=1", *futexRow) + } + if futexRow.LatencyMinNs != 10 || futexRow.LatencyMaxNs != 50 { + t.Fatalf("futex min/max = %d/%d, want 10/50", futexRow.LatencyMinNs, futexRow.LatencyMaxNs) + } +} diff --git a/internal/statsengine/family.go b/internal/statsengine/family.go index 3206d57..a97332d 100644 --- a/internal/statsengine/family.go +++ b/internal/statsengine/family.go @@ -59,6 +59,30 @@ func (a *familyAccumulator) Add(pair *event.Pair) { } } +func (a *familyAccumulator) AddAggregate(row SyscallAggregate) { + if a == nil || row.TraceID == 0 || row.Count == 0 { + return + } + + family := row.TraceID.Family() + stats := a.byFamily[family] + if stats == nil { + stats = &familyStats{family: family} + a.byFamily[family] = stats + } + + prevCount := stats.count + stats.count += row.Count + stats.errorCount += row.Errors + stats.totalLatency += row.TotalLatencyNs + if prevCount == 0 || row.MinLatencyNs < stats.minLatency { + stats.minLatency = row.MinLatencyNs + } + if row.MaxLatencyNs > stats.maxLatency { + stats.maxLatency = row.MaxLatencyNs + } +} + func (a *familyAccumulator) snapshotInputs() []familySnapshotInput { if a == nil { return nil diff --git a/internal/statsengine/histogram.go b/internal/statsengine/histogram.go index 4a5e3b4..27d3285 100644 --- a/internal/statsengine/histogram.go +++ b/internal/statsengine/histogram.go @@ -47,6 +47,16 @@ func (h *histogram) Increment(durationNs uint64) { h.total++ } +func (h *histogram) AddBucketCounts(counts [histogramBucketCount]uint64) { + if h == nil { + return + } + for i, count := range counts { + h.counts[i] += count + h.total += count + } +} + // Snapshot returns a HistogramSnapshot of the current histogram state. // It panics on build error, which should never happen for a valid histogram. func (h *histogram) Snapshot() HistogramSnapshot { diff --git a/internal/statsengine/syscall.go b/internal/statsengine/syscall.go index d58e8c9..5c85b2a 100644 --- a/internal/statsengine/syscall.go +++ b/internal/statsengine/syscall.go @@ -99,6 +99,29 @@ func (a *syscallAccumulator) Add(pair *event.Pair) { } } +func (a *syscallAccumulator) AddAggregate(row SyscallAggregate) { + if a == nil || row.TraceID == 0 || row.Count == 0 { + return + } + + stats := a.byID[row.TraceID] + if stats == nil { + stats = &syscallStats{traceID: row.TraceID, name: row.TraceID.Name()} + a.byID[row.TraceID] = stats + } + + prevCount := stats.count + stats.count += row.Count + stats.errorCount += row.Errors + stats.totalLatency += row.TotalLatencyNs + if prevCount == 0 || row.MinLatencyNs < stats.minLatency { + stats.minLatency = row.MinLatencyNs + } + if row.MaxLatencyNs > stats.maxLatency { + stats.maxLatency = row.MaxLatencyNs + } +} + // Snapshot returns a slice of SyscallSnapshots for all tracked syscalls. // It panics on build error, which should never happen for a valid accumulator. func (a *syscallAccumulator) Snapshot(elapsed time.Duration) []SyscallSnapshot { |
