summaryrefslogtreecommitdiff
path: root/internal/statsengine
diff options
context:
space:
mode:
Diffstat (limited to 'internal/statsengine')
-rw-r--r--internal/statsengine/aggregate.go46
-rw-r--r--internal/statsengine/aggregate_test.go57
-rw-r--r--internal/statsengine/family.go24
-rw-r--r--internal/statsengine/histogram.go10
-rw-r--r--internal/statsengine/syscall.go23
5 files changed, 160 insertions, 0 deletions
diff --git a/internal/statsengine/aggregate.go b/internal/statsengine/aggregate.go
new file mode 100644
index 0000000..03dad5d
--- /dev/null
+++ b/internal/statsengine/aggregate.go
@@ -0,0 +1,46 @@
+package statsengine
+
+import "ior/internal/types"
+
+// SyscallAggregate is the kernel-side aggregate for one sys_enter trace ID.
+type SyscallAggregate struct {
+ TraceID types.TraceId
+ Count uint64
+ Errors uint64
+ TotalLatencyNs uint64
+ MinLatencyNs uint64
+ MaxLatencyNs uint64
+ LatencyHistogramNs [8]uint64
+}
+
+// IngestSyscallAggregates folds kernel aggregate rows into the engine.
+func (e *Engine) IngestSyscallAggregates(rows []SyscallAggregate) {
+ if e == nil || len(rows) == 0 {
+ return
+ }
+
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ now := e.now()
+ var batchLatency uint64
+ var batchCount uint64
+ for _, row := range rows {
+ if row.Count == 0 {
+ continue
+ }
+
+ e.totalSyscalls += row.Count
+ e.totalErrors += row.Errors
+ e.totalLatency += row.TotalLatencyNs
+ e.syscalls.AddAggregate(row)
+ e.families.AddAggregate(row)
+ e.latencyHist.AddBucketCounts(row.LatencyHistogramNs)
+
+ batchLatency += row.TotalLatencyNs
+ batchCount += row.Count
+ }
+ if batchCount > 0 {
+ e.latencySeries.Add(float64(batchLatency)/float64(batchCount), now)
+ }
+}
diff --git a/internal/statsengine/aggregate_test.go b/internal/statsengine/aggregate_test.go
new file mode 100644
index 0000000..0d11214
--- /dev/null
+++ b/internal/statsengine/aggregate_test.go
@@ -0,0 +1,57 @@
+package statsengine
+
+import (
+ "testing"
+
+ "ior/internal/types"
+)
+
+func TestIngestSyscallAggregatesUpdatesSnapshot(t *testing.T) {
+ engine := NewEngine(DefaultTopN)
+ engine.IngestSyscallAggregates([]SyscallAggregate{
+ {
+ TraceID: types.SYS_ENTER_FUTEX,
+ Count: 3,
+ Errors: 1,
+ TotalLatencyNs: 90,
+ MinLatencyNs: 10,
+ MaxLatencyNs: 50,
+ LatencyHistogramNs: [8]uint64{
+ 1, 1, 1, 0, 0, 0, 0, 0,
+ },
+ },
+ })
+
+ snap, err := engine.Snapshot()
+ if err != nil {
+ t.Fatalf("snapshot error: %v", err)
+ }
+ if snap.TotalSyscalls != 3 {
+ t.Fatalf("TotalSyscalls = %d, want 3", snap.TotalSyscalls)
+ }
+ if snap.TotalErrors != 1 {
+ t.Fatalf("TotalErrors = %d, want 1", snap.TotalErrors)
+ }
+ if snap.LatencyHistogram.Total != 3 {
+ t.Fatalf("LatencyHistogram.Total = %d, want 3", snap.LatencyHistogram.Total)
+ }
+
+ syscalls := snap.Syscalls()
+ var futexRow *SyscallSnapshot
+ for i := range syscalls {
+ row := &syscalls[i]
+ if row.TraceID == types.SYS_ENTER_FUTEX {
+ futexRow = row
+ break
+ }
+ }
+ if futexRow == nil {
+ t.Fatal("expected futex syscall row")
+ }
+ if futexRow.Count != 3 || futexRow.Errors != 1 {
+ t.Fatalf("futex row = %+v, want count=3 errors=1", *futexRow)
+ }
+ if futexRow.LatencyMinNs != 10 || futexRow.LatencyMaxNs != 50 {
+ t.Fatalf("futex min/max = %d/%d, want 10/50", futexRow.LatencyMinNs, futexRow.LatencyMaxNs)
+ }
+}
diff --git a/internal/statsengine/family.go b/internal/statsengine/family.go
index 3206d57..a97332d 100644
--- a/internal/statsengine/family.go
+++ b/internal/statsengine/family.go
@@ -59,6 +59,30 @@ func (a *familyAccumulator) Add(pair *event.Pair) {
}
}
+func (a *familyAccumulator) AddAggregate(row SyscallAggregate) {
+ if a == nil || row.TraceID == 0 || row.Count == 0 {
+ return
+ }
+
+ family := row.TraceID.Family()
+ stats := a.byFamily[family]
+ if stats == nil {
+ stats = &familyStats{family: family}
+ a.byFamily[family] = stats
+ }
+
+ prevCount := stats.count
+ stats.count += row.Count
+ stats.errorCount += row.Errors
+ stats.totalLatency += row.TotalLatencyNs
+ if prevCount == 0 || row.MinLatencyNs < stats.minLatency {
+ stats.minLatency = row.MinLatencyNs
+ }
+ if row.MaxLatencyNs > stats.maxLatency {
+ stats.maxLatency = row.MaxLatencyNs
+ }
+}
+
func (a *familyAccumulator) snapshotInputs() []familySnapshotInput {
if a == nil {
return nil
diff --git a/internal/statsengine/histogram.go b/internal/statsengine/histogram.go
index 4a5e3b4..27d3285 100644
--- a/internal/statsengine/histogram.go
+++ b/internal/statsengine/histogram.go
@@ -47,6 +47,16 @@ func (h *histogram) Increment(durationNs uint64) {
h.total++
}
+func (h *histogram) AddBucketCounts(counts [histogramBucketCount]uint64) {
+ if h == nil {
+ return
+ }
+ for i, count := range counts {
+ h.counts[i] += count
+ h.total += count
+ }
+}
+
// Snapshot returns a HistogramSnapshot of the current histogram state.
// It panics on build error, which should never happen for a valid histogram.
func (h *histogram) Snapshot() HistogramSnapshot {
diff --git a/internal/statsengine/syscall.go b/internal/statsengine/syscall.go
index d58e8c9..5c85b2a 100644
--- a/internal/statsengine/syscall.go
+++ b/internal/statsengine/syscall.go
@@ -99,6 +99,29 @@ func (a *syscallAccumulator) Add(pair *event.Pair) {
}
}
+func (a *syscallAccumulator) AddAggregate(row SyscallAggregate) {
+ if a == nil || row.TraceID == 0 || row.Count == 0 {
+ return
+ }
+
+ stats := a.byID[row.TraceID]
+ if stats == nil {
+ stats = &syscallStats{traceID: row.TraceID, name: row.TraceID.Name()}
+ a.byID[row.TraceID] = stats
+ }
+
+ prevCount := stats.count
+ stats.count += row.Count
+ stats.errorCount += row.Errors
+ stats.totalLatency += row.TotalLatencyNs
+ if prevCount == 0 || row.MinLatencyNs < stats.minLatency {
+ stats.minLatency = row.MinLatencyNs
+ }
+ if row.MaxLatencyNs > stats.maxLatency {
+ stats.maxLatency = row.MaxLatencyNs
+ }
+}
+
// Snapshot returns a slice of SyscallSnapshots for all tracked syscalls.
// It panics on build error, which should never happen for a valid accumulator.
func (a *syscallAccumulator) Snapshot(elapsed time.Duration) []SyscallSnapshot {