diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-18 19:13:59 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-18 19:13:59 +0300 |
| commit | 65599ad9b87b1c61cb6d8232200da88952370e96 (patch) | |
| tree | 862e20468835255ed06544a2df2470678d3b97dc /internal/statsengine | |
| parent | a92cb0283b1ba8735a6697a8f94911397534131f (diff) | |
t6 add syscall family dashboard aggregation
Diffstat (limited to 'internal/statsengine')
| -rw-r--r-- | internal/statsengine/engine.go | 18 | ||||
| -rw-r--r-- | internal/statsengine/engine_test.go | 51 | ||||
| -rw-r--r-- | internal/statsengine/family.go | 121 | ||||
| -rw-r--r-- | internal/statsengine/snapshot.go | 70 | ||||
| -rw-r--r-- | internal/statsengine/snapshot_test.go | 55 |
5 files changed, 309 insertions, 6 deletions
diff --git a/internal/statsengine/engine.go b/internal/statsengine/engine.go index b7d93fa..7d85e96 100644 --- a/internal/statsengine/engine.go +++ b/internal/statsengine/engine.go @@ -56,6 +56,7 @@ type Engine struct { totalGap uint64 syscalls *syscallAccumulator + families *familyAccumulator files *fileRanker processes *processAccumulator latencyHist *histogram @@ -82,6 +83,7 @@ type snapshotInputs struct { throughputSeries []float64 syscalls []syscallSnapshotInput + families []familySnapshotInput files []fileSnapshotInput processes []processSnapshotInput @@ -104,6 +106,7 @@ func newEngineWithClock(topN int, now func() time.Time) *Engine { startedAt: now(), topN: topN, syscalls: newSyscallAccumulator(), + families: newFamilyAccumulator(), files: newFileRankerWithConfig(topN), processes: newProcessAccumulatorWithConfig(topN), latencyHist: newHistogram(), @@ -132,6 +135,7 @@ func (e *Engine) Reset() { e.totalLatency = 0 e.totalGap = 0 e.syscalls = newSyscallAccumulator() + e.families = newFamilyAccumulator() e.files = newFileRankerWithConfig(e.topN) e.processes = newProcessAccumulatorWithConfig(e.topN) e.latencyHist = newHistogram() @@ -158,6 +162,7 @@ func (e *Engine) Ingest(pair *event.Pair) { e.updateErrorAndByteClasses(pair) e.syscalls.Add(pair) + e.families.Add(pair) e.files.Add(pair) e.processes.Add(pair) e.latencyHist.Increment(pair.Duration) @@ -190,6 +195,7 @@ func (e *Engine) updateErrorAndByteClasses(pair *event.Pair) { // subSnapshots holds the concurrently built per-category snapshot slices. type subSnapshots struct { syscalls []SyscallSnapshot + families []FamilySnapshot files []FileSnapshot processes []ProcessSnapshot latencyHist HistogramSnapshot @@ -216,6 +222,7 @@ func (e *Engine) captureSnapshotInputs() snapshotInputs { gapSeries: e.gapSeries.Values(), throughputSeries: e.throughputSeries.Values(), syscalls: e.syscalls.snapshotInputs(), + families: e.families.snapshotInputs(), files: e.files.snapshotInputs(), processes: e.processes.snapshotInputs(), latencyHist: e.latencyHist.snapshotInputs(), @@ -223,7 +230,7 @@ func (e *Engine) captureSnapshotInputs() snapshotInputs { } } -// buildSubSnapshots runs all five per-category snapshot builders concurrently +// buildSubSnapshots runs all per-category snapshot builders concurrently // using errgroup so that any error from a sub-builder is captured and returned // to the caller instead of being silently dropped. func buildSubSnapshots(in snapshotInputs, elapsed time.Duration) (subSnapshots, error) { @@ -239,6 +246,11 @@ func buildSubSnapshots(in snapshotInputs, elapsed time.Duration) (subSnapshots, }) eg.Go(func() error { var err error + ss.families, err = buildFamilySnapshots(in.families, elapsed) + return err + }) + eg.Go(func() error { + var err error ss.files, err = buildFileSnapshots(in.files) return err }) @@ -303,9 +315,9 @@ func (e *Engine) Snapshot() (*Snapshot, error) { return nil, err } - snap := NewSnapshot( + snap := NewSnapshotWithFamilies( in.latencySeries, in.gapSeries, in.throughputSeries, - ss.syscalls, ss.files, ss.processes, + ss.syscalls, ss.families, ss.files, ss.processes, ss.latencyHist, ss.gapHist, ) populateSnapshotFields(&snap, in, elapsed) diff --git a/internal/statsengine/engine_test.go b/internal/statsengine/engine_test.go index f714844..9543405 100644 --- a/internal/statsengine/engine_test.go +++ b/internal/statsengine/engine_test.go @@ -67,6 +67,13 @@ func TestEngineIngestAndSnapshotIntegration(t *testing.T) { if len(snap.Syscalls()) != 3 { t.Fatalf("expected 3 syscall rows, got %d", len(snap.Syscalls())) } + families := familyRowsByName(snap.Families()) + if len(families) != 1 { + t.Fatalf("expected 1 family row, got %d", len(families)) + } + if fs := families["FS"]; fs.Count != 3 || fs.Errors != 1 || fs.Bytes != 170 { + t.Fatalf("FS family = %+v, want count=3 errors=1 bytes=170", fs) + } if len(snap.Files()) != 2 { t.Fatalf("expected top 2 files due to topN=2, got %d", len(snap.Files())) } @@ -78,6 +85,50 @@ func TestEngineIngestAndSnapshotIntegration(t *testing.T) { } } +func TestEngineAggregatesSyscallFamilies(t *testing.T) { + clock := &fakeClock{now: time.Unix(3000, 0)} + engine := newEngineWithClock(10, clock.Now) + + engine.Ingest(newEnginePair(types.SYS_ENTER_EPOLL_WAIT, 0, types.UNCLASSIFIED, "poller", 1, "", 0, 100, 1)) + engine.Ingest(newEnginePair(types.SYS_ENTER_POLL, -1, types.UNCLASSIFIED, "poller", 1, "", 0, 300, 2)) + engine.Ingest(newEnginePair(types.SYS_ENTER_GETPID, 0, types.UNCLASSIFIED, "proc", 2, "", 0, 50, 3)) + engine.Ingest(newEnginePair(types.SYS_ENTER_READ, 4, types.READ_CLASSIFIED, "reader", 3, "/tmp/a", 4, 25, 4)) + clock.Advance(time.Second) + + snap, err := engine.Snapshot() + if err != nil { + t.Fatalf("Snapshot() error = %v", err) + } + + families := familyRowsByName(snap.Families()) + polling := families["Polling"] + if polling.Count != 2 || polling.Errors != 1 || polling.LatencyMeanNs != 200 { + t.Fatalf("Polling family = %+v, want count=2 errors=1 mean=200ns", polling) + } + if families["Process"].Count != 1 { + t.Fatalf("Process family = %+v, want count=1", families["Process"]) + } + if families["FS"].Count != 1 || families["FS"].Bytes != 4 { + t.Fatalf("FS family = %+v, want count=1 bytes=4", families["FS"]) + } + + nonIO := familyRowsByName(snap.NonIOFamilies()) + if _, ok := nonIO["FS"]; ok { + t.Fatalf("NonIOFamilies should not include FS: %+v", nonIO["FS"]) + } + if nonIO["Polling"].Count != 2 || nonIO["Process"].Count != 1 { + t.Fatalf("NonIOFamilies missing expected rows: %+v", nonIO) + } +} + +func familyRowsByName(rows []FamilySnapshot) map[string]FamilySnapshot { + result := make(map[string]FamilySnapshot, len(rows)) + for _, row := range rows { + result[row.Name] = row + } + return result +} + func TestEngineSnapshotWithNoEvents(t *testing.T) { clock := &fakeClock{now: time.Unix(2000, 0)} engine := newEngineWithClock(10, clock.Now) diff --git a/internal/statsengine/family.go b/internal/statsengine/family.go new file mode 100644 index 0000000..3206d57 --- /dev/null +++ b/internal/statsengine/family.go @@ -0,0 +1,121 @@ +package statsengine + +import ( + "cmp" + "slices" + "time" + + "ior/internal/event" + "ior/internal/types" +) + +type familyAccumulator struct { + byFamily map[types.SyscallFamily]*familyStats +} + +type familyStats struct { + family types.SyscallFamily + count uint64 + errorCount uint64 + totalBytes uint64 + totalLatency uint64 + minLatency uint64 + maxLatency uint64 +} + +type familySnapshotInput struct { + family types.SyscallFamily + count uint64 + errorCount uint64 + totalBytes uint64 + totalLatency uint64 + minLatency uint64 + maxLatency uint64 +} + +func newFamilyAccumulator() *familyAccumulator { + return &familyAccumulator{byFamily: make(map[types.SyscallFamily]*familyStats)} +} + +func (a *familyAccumulator) Add(pair *event.Pair) { + if a == nil || pair == nil || pair.EnterEv == nil { + return + } + + family := pair.EnterEv.GetTraceId().Family() + stats := a.byFamily[family] + if stats == nil { + stats = &familyStats{family: family} + a.byFamily[family] = stats + } + + stats.count++ + stats.totalBytes += pair.Bytes + stats.totalLatency += pair.Duration + stats.updateMinMax(pair.Duration) + + if retEv, ok := pair.ExitEv.(*types.RetEvent); ok && retEv.Ret < 0 { + stats.errorCount++ + } +} + +func (a *familyAccumulator) snapshotInputs() []familySnapshotInput { + if a == nil { + return nil + } + + inputs := make([]familySnapshotInput, 0, len(a.byFamily)) + for _, stats := range a.byFamily { + inputs = append(inputs, familySnapshotInput{ + family: stats.family, + count: stats.count, + errorCount: stats.errorCount, + totalBytes: stats.totalBytes, + totalLatency: stats.totalLatency, + minLatency: stats.minLatency, + maxLatency: stats.maxLatency, + }) + } + return inputs +} + +func buildFamilySnapshots(inputs []familySnapshotInput, elapsed time.Duration) ([]FamilySnapshot, error) { + rateDiv := elapsed.Seconds() + result := make([]FamilySnapshot, 0, len(inputs)) + for _, in := range inputs { + result = append(result, in.toSnapshot(rateDiv)) + } + slices.SortFunc(result, compareFamilyDefault) + return result, nil +} + +func (s *familyStats) updateMinMax(duration uint64) { + if s.count == 1 || duration < s.minLatency { + s.minLatency = duration + } + if duration > s.maxLatency { + s.maxLatency = duration + } +} + +func (s familySnapshotInput) toSnapshot(rateDiv float64) FamilySnapshot { + return FamilySnapshot{ + Family: s.family, + Name: string(s.family), + Count: s.count, + RatePerSec: safeRate(s.count, rateDiv), + Errors: s.errorCount, + Bytes: s.totalBytes, + LatencyMinNs: s.minLatency, + LatencyMaxNs: s.maxLatency, + LatencyMeanNs: float64(s.totalLatency) / float64(maxU64(s.count, 1)), + TotalLatencyNs: s.totalLatency, + } +} + +func compareFamilyDefault(left, right FamilySnapshot) int { + if left.Count != right.Count { + return cmp.Compare(right.Count, left.Count) + } + return cmp.Compare(types.SyscallFamilyRank(left.Family), types.SyscallFamilyRank(right.Family)) +} diff --git a/internal/statsengine/snapshot.go b/internal/statsengine/snapshot.go index 7a95ab8..859cd2e 100644 --- a/internal/statsengine/snapshot.go +++ b/internal/statsengine/snapshot.go @@ -51,6 +51,7 @@ type Snapshot struct { throughputSeriesB []float64 syscalls []SyscallSnapshot + families []FamilySnapshot files []FileSnapshot processes []ProcessSnapshot @@ -77,6 +78,22 @@ type SyscallSnapshot struct { LatencyP99Ns uint64 } +// FamilySnapshot is an aggregated syscall-family row. +type FamilySnapshot struct { + Family types.SyscallFamily + Name string + + Count uint64 + RatePerSec float64 + Errors uint64 + Bytes uint64 + + LatencyMinNs uint64 + LatencyMaxNs uint64 + LatencyMeanNs float64 + TotalLatencyNs uint64 +} + // FileSnapshot is an aggregated per-file ranking entry. type FileSnapshot struct { Path string @@ -129,11 +146,31 @@ func NewSnapshot( latencyHistogram HistogramSnapshot, gapHistogram HistogramSnapshot, ) Snapshot { + return NewSnapshotWithFamilies( + latencySeriesNs, gapSeriesNs, throughputSeriesB, + syscalls, nil, files, processes, + latencyHistogram, gapHistogram, + ) +} + +// NewSnapshotWithFamilies creates a snapshot including family aggregate rows. +func NewSnapshotWithFamilies( + latencySeriesNs []float64, + gapSeriesNs []float64, + throughputSeriesB []float64, + syscalls []SyscallSnapshot, + families []FamilySnapshot, + files []FileSnapshot, + processes []ProcessSnapshot, + latencyHistogram HistogramSnapshot, + gapHistogram HistogramSnapshot, +) Snapshot { return Snapshot{ latencySeriesNs: slices.Clone(latencySeriesNs), gapSeriesNs: slices.Clone(gapSeriesNs), throughputSeriesB: slices.Clone(throughputSeriesB), syscalls: slices.Clone(syscalls), + families: slices.Clone(families), files: slices.Clone(files), processes: slices.Clone(processes), LatencyHistogram: latencyHistogram.Clone(), @@ -187,6 +224,39 @@ func (s Snapshot) SyscallsCount() int { return len(s.syscalls) } +// Families returns per-syscall-family snapshot rows. +// Callers must treat returned data as read-only. +func (s Snapshot) Families() []FamilySnapshot { + return s.families +} + +// FamiliesCount returns number of syscall-family rows without cloning backing slices. +func (s Snapshot) FamiliesCount() int { + return len(s.families) +} + +// NonIOFamilies returns family rows outside the file-system/fd-focused family. +func (s Snapshot) NonIOFamilies() []FamilySnapshot { + rows := make([]FamilySnapshot, 0, len(s.families)) + for _, row := range s.families { + if types.IsNonIOSyscallFamily(row.Family) { + rows = append(rows, row) + } + } + return rows +} + +// NonIOFamiliesCount returns number of non-FS syscall-family rows. +func (s Snapshot) NonIOFamiliesCount() int { + count := 0 + for _, row := range s.families { + if types.IsNonIOSyscallFamily(row.Family) { + count++ + } + } + return count +} + // TopNSyscalls returns at most n per-syscall rows in ranking order. // Callers must treat returned data as read-only. func (s Snapshot) TopNSyscalls(n int) []SyscallSnapshot { diff --git a/internal/statsengine/snapshot_test.go b/internal/statsengine/snapshot_test.go index 277fa0d..9b54409 100644 --- a/internal/statsengine/snapshot_test.go +++ b/internal/statsengine/snapshot_test.go @@ -1,22 +1,28 @@ package statsengine -import "testing" +import ( + "testing" + + "ior/internal/types" +) func TestNewSnapshotDefensivelyCopiesSlices(t *testing.T) { latency := []float64{1, 2, 3} gap := []float64{4, 5, 6} throughput := []float64{7, 8, 9} syscalls := []SyscallSnapshot{{Name: "read", Count: 1}} + families := []FamilySnapshot{{Family: types.FamilyPolling, Name: "Polling", Count: 3}} files := []FileSnapshot{{Path: "/tmp/a", Accesses: 2}} processes := []ProcessSnapshot{{PID: 10, Comm: "cmd"}} latencyBuckets := []HistogramBucketSnapshot{{Label: "[0,1)", Count: 3}} gapBuckets := []HistogramBucketSnapshot{{Label: "[1,10)", Count: 4}} - s := NewSnapshot( + s := NewSnapshotWithFamilies( latency, gap, throughput, syscalls, + families, files, processes, NewHistogramSnapshot(3, latencyBuckets), @@ -27,6 +33,7 @@ func TestNewSnapshotDefensivelyCopiesSlices(t *testing.T) { gap[0] = 99 throughput[0] = 99 syscalls[0].Name = "write" + families[0].Name = "FS" files[0].Path = "/tmp/b" processes[0].Comm = "mutated" latencyBuckets[0].Count = 99 @@ -44,6 +51,9 @@ func TestNewSnapshotDefensivelyCopiesSlices(t *testing.T) { if got := s.Syscalls()[0].Name; got != "read" { t.Fatalf("syscalls mutated through input slice: got %q", got) } + if got := s.Families()[0].Name; got != "Polling" { + t.Fatalf("families mutated through input slice: got %q", got) + } if got := s.Files()[0].Path; got != "/tmp/a" { t.Fatalf("files mutated through input slice: got %q", got) } @@ -59,11 +69,12 @@ func TestNewSnapshotDefensivelyCopiesSlices(t *testing.T) { } func TestSnapshotAccessorsReturnReadOnlyViews(t *testing.T) { - s := NewSnapshot( + s := NewSnapshotWithFamilies( []float64{1}, []float64{2}, []float64{3}, []SyscallSnapshot{{Name: "read"}}, + []FamilySnapshot{{Family: types.FamilyPolling, Name: "Polling"}}, []FileSnapshot{{Path: "/tmp/a"}}, []ProcessSnapshot{{Comm: "cmd"}}, NewHistogramSnapshot(1, []HistogramBucketSnapshot{{Label: "a", Count: 1}}), @@ -82,6 +93,12 @@ func TestSnapshotAccessorsReturnReadOnlyViews(t *testing.T) { t.Fatalf("expected accessor to return backing slice view, got %q", got) } + families := s.Families() + families[0].Name = "Process" + if got := s.Families()[0].Name; got != "Process" { + t.Fatalf("expected family accessor to return backing slice view, got %q", got) + } + buckets := s.LatencyHistogram.Buckets() buckets[0].Count = 99 if got := s.LatencyHistogram.Buckets()[0].Count; got != 99 { @@ -97,6 +114,9 @@ func TestNilAccessorsRemainNil(t *testing.T) { if got := s.Syscalls(); got != nil { t.Fatalf("expected nil syscalls, got %#v", got) } + if got := s.Families(); got != nil { + t.Fatalf("expected nil families, got %#v", got) + } h := HistogramSnapshot{} if got := h.Buckets(); got != nil { @@ -104,6 +124,35 @@ func TestNilAccessorsRemainNil(t *testing.T) { } } +func TestSnapshotNonIOFamilies(t *testing.T) { + s := NewSnapshotWithFamilies( + nil, + nil, + nil, + nil, + []FamilySnapshot{ + {Family: types.FamilyFS, Name: "FS"}, + {Family: types.FamilyPolling, Name: "Polling"}, + {Family: types.FamilyProcess, Name: "Process"}, + }, + nil, + nil, + HistogramSnapshot{}, + HistogramSnapshot{}, + ) + + rows := s.NonIOFamilies() + if len(rows) != 2 { + t.Fatalf("NonIOFamilies len = %d, want 2", len(rows)) + } + if rows[0].Family == types.FamilyFS || rows[1].Family == types.FamilyFS { + t.Fatalf("NonIOFamilies included FS: %+v", rows) + } + if got := s.NonIOFamiliesCount(); got != 2 { + t.Fatalf("NonIOFamiliesCount = %d, want 2", got) + } +} + func TestTopNAccessors(t *testing.T) { s := NewSnapshot( nil, |
