summaryrefslogtreecommitdiff
path: root/internal/statsengine
diff options
context:
space:
mode:
Diffstat (limited to 'internal/statsengine')
-rw-r--r--internal/statsengine/engine.go18
-rw-r--r--internal/statsengine/engine_test.go51
-rw-r--r--internal/statsengine/family.go121
-rw-r--r--internal/statsengine/snapshot.go70
-rw-r--r--internal/statsengine/snapshot_test.go55
5 files changed, 309 insertions, 6 deletions
diff --git a/internal/statsengine/engine.go b/internal/statsengine/engine.go
index b7d93fa..7d85e96 100644
--- a/internal/statsengine/engine.go
+++ b/internal/statsengine/engine.go
@@ -56,6 +56,7 @@ type Engine struct {
totalGap uint64
syscalls *syscallAccumulator
+ families *familyAccumulator
files *fileRanker
processes *processAccumulator
latencyHist *histogram
@@ -82,6 +83,7 @@ type snapshotInputs struct {
throughputSeries []float64
syscalls []syscallSnapshotInput
+ families []familySnapshotInput
files []fileSnapshotInput
processes []processSnapshotInput
@@ -104,6 +106,7 @@ func newEngineWithClock(topN int, now func() time.Time) *Engine {
startedAt: now(),
topN: topN,
syscalls: newSyscallAccumulator(),
+ families: newFamilyAccumulator(),
files: newFileRankerWithConfig(topN),
processes: newProcessAccumulatorWithConfig(topN),
latencyHist: newHistogram(),
@@ -132,6 +135,7 @@ func (e *Engine) Reset() {
e.totalLatency = 0
e.totalGap = 0
e.syscalls = newSyscallAccumulator()
+ e.families = newFamilyAccumulator()
e.files = newFileRankerWithConfig(e.topN)
e.processes = newProcessAccumulatorWithConfig(e.topN)
e.latencyHist = newHistogram()
@@ -158,6 +162,7 @@ func (e *Engine) Ingest(pair *event.Pair) {
e.updateErrorAndByteClasses(pair)
e.syscalls.Add(pair)
+ e.families.Add(pair)
e.files.Add(pair)
e.processes.Add(pair)
e.latencyHist.Increment(pair.Duration)
@@ -190,6 +195,7 @@ func (e *Engine) updateErrorAndByteClasses(pair *event.Pair) {
// subSnapshots holds the concurrently built per-category snapshot slices.
type subSnapshots struct {
syscalls []SyscallSnapshot
+ families []FamilySnapshot
files []FileSnapshot
processes []ProcessSnapshot
latencyHist HistogramSnapshot
@@ -216,6 +222,7 @@ func (e *Engine) captureSnapshotInputs() snapshotInputs {
gapSeries: e.gapSeries.Values(),
throughputSeries: e.throughputSeries.Values(),
syscalls: e.syscalls.snapshotInputs(),
+ families: e.families.snapshotInputs(),
files: e.files.snapshotInputs(),
processes: e.processes.snapshotInputs(),
latencyHist: e.latencyHist.snapshotInputs(),
@@ -223,7 +230,7 @@ func (e *Engine) captureSnapshotInputs() snapshotInputs {
}
}
-// buildSubSnapshots runs all five per-category snapshot builders concurrently
+// buildSubSnapshots runs all per-category snapshot builders concurrently
// using errgroup so that any error from a sub-builder is captured and returned
// to the caller instead of being silently dropped.
func buildSubSnapshots(in snapshotInputs, elapsed time.Duration) (subSnapshots, error) {
@@ -239,6 +246,11 @@ func buildSubSnapshots(in snapshotInputs, elapsed time.Duration) (subSnapshots,
})
eg.Go(func() error {
var err error
+ ss.families, err = buildFamilySnapshots(in.families, elapsed)
+ return err
+ })
+ eg.Go(func() error {
+ var err error
ss.files, err = buildFileSnapshots(in.files)
return err
})
@@ -303,9 +315,9 @@ func (e *Engine) Snapshot() (*Snapshot, error) {
return nil, err
}
- snap := NewSnapshot(
+ snap := NewSnapshotWithFamilies(
in.latencySeries, in.gapSeries, in.throughputSeries,
- ss.syscalls, ss.files, ss.processes,
+ ss.syscalls, ss.families, ss.files, ss.processes,
ss.latencyHist, ss.gapHist,
)
populateSnapshotFields(&snap, in, elapsed)
diff --git a/internal/statsengine/engine_test.go b/internal/statsengine/engine_test.go
index f714844..9543405 100644
--- a/internal/statsengine/engine_test.go
+++ b/internal/statsengine/engine_test.go
@@ -67,6 +67,13 @@ func TestEngineIngestAndSnapshotIntegration(t *testing.T) {
if len(snap.Syscalls()) != 3 {
t.Fatalf("expected 3 syscall rows, got %d", len(snap.Syscalls()))
}
+ families := familyRowsByName(snap.Families())
+ if len(families) != 1 {
+ t.Fatalf("expected 1 family row, got %d", len(families))
+ }
+ if fs := families["FS"]; fs.Count != 3 || fs.Errors != 1 || fs.Bytes != 170 {
+ t.Fatalf("FS family = %+v, want count=3 errors=1 bytes=170", fs)
+ }
if len(snap.Files()) != 2 {
t.Fatalf("expected top 2 files due to topN=2, got %d", len(snap.Files()))
}
@@ -78,6 +85,50 @@ func TestEngineIngestAndSnapshotIntegration(t *testing.T) {
}
}
+func TestEngineAggregatesSyscallFamilies(t *testing.T) {
+ clock := &fakeClock{now: time.Unix(3000, 0)}
+ engine := newEngineWithClock(10, clock.Now)
+
+ engine.Ingest(newEnginePair(types.SYS_ENTER_EPOLL_WAIT, 0, types.UNCLASSIFIED, "poller", 1, "", 0, 100, 1))
+ engine.Ingest(newEnginePair(types.SYS_ENTER_POLL, -1, types.UNCLASSIFIED, "poller", 1, "", 0, 300, 2))
+ engine.Ingest(newEnginePair(types.SYS_ENTER_GETPID, 0, types.UNCLASSIFIED, "proc", 2, "", 0, 50, 3))
+ engine.Ingest(newEnginePair(types.SYS_ENTER_READ, 4, types.READ_CLASSIFIED, "reader", 3, "/tmp/a", 4, 25, 4))
+ clock.Advance(time.Second)
+
+ snap, err := engine.Snapshot()
+ if err != nil {
+ t.Fatalf("Snapshot() error = %v", err)
+ }
+
+ families := familyRowsByName(snap.Families())
+ polling := families["Polling"]
+ if polling.Count != 2 || polling.Errors != 1 || polling.LatencyMeanNs != 200 {
+ t.Fatalf("Polling family = %+v, want count=2 errors=1 mean=200ns", polling)
+ }
+ if families["Process"].Count != 1 {
+ t.Fatalf("Process family = %+v, want count=1", families["Process"])
+ }
+ if families["FS"].Count != 1 || families["FS"].Bytes != 4 {
+ t.Fatalf("FS family = %+v, want count=1 bytes=4", families["FS"])
+ }
+
+ nonIO := familyRowsByName(snap.NonIOFamilies())
+ if _, ok := nonIO["FS"]; ok {
+ t.Fatalf("NonIOFamilies should not include FS: %+v", nonIO["FS"])
+ }
+ if nonIO["Polling"].Count != 2 || nonIO["Process"].Count != 1 {
+ t.Fatalf("NonIOFamilies missing expected rows: %+v", nonIO)
+ }
+}
+
+func familyRowsByName(rows []FamilySnapshot) map[string]FamilySnapshot {
+ result := make(map[string]FamilySnapshot, len(rows))
+ for _, row := range rows {
+ result[row.Name] = row
+ }
+ return result
+}
+
func TestEngineSnapshotWithNoEvents(t *testing.T) {
clock := &fakeClock{now: time.Unix(2000, 0)}
engine := newEngineWithClock(10, clock.Now)
diff --git a/internal/statsengine/family.go b/internal/statsengine/family.go
new file mode 100644
index 0000000..3206d57
--- /dev/null
+++ b/internal/statsengine/family.go
@@ -0,0 +1,121 @@
+package statsengine
+
+import (
+ "cmp"
+ "slices"
+ "time"
+
+ "ior/internal/event"
+ "ior/internal/types"
+)
+
+type familyAccumulator struct {
+ byFamily map[types.SyscallFamily]*familyStats
+}
+
+type familyStats struct {
+ family types.SyscallFamily
+ count uint64
+ errorCount uint64
+ totalBytes uint64
+ totalLatency uint64
+ minLatency uint64
+ maxLatency uint64
+}
+
+type familySnapshotInput struct {
+ family types.SyscallFamily
+ count uint64
+ errorCount uint64
+ totalBytes uint64
+ totalLatency uint64
+ minLatency uint64
+ maxLatency uint64
+}
+
+func newFamilyAccumulator() *familyAccumulator {
+ return &familyAccumulator{byFamily: make(map[types.SyscallFamily]*familyStats)}
+}
+
+func (a *familyAccumulator) Add(pair *event.Pair) {
+ if a == nil || pair == nil || pair.EnterEv == nil {
+ return
+ }
+
+ family := pair.EnterEv.GetTraceId().Family()
+ stats := a.byFamily[family]
+ if stats == nil {
+ stats = &familyStats{family: family}
+ a.byFamily[family] = stats
+ }
+
+ stats.count++
+ stats.totalBytes += pair.Bytes
+ stats.totalLatency += pair.Duration
+ stats.updateMinMax(pair.Duration)
+
+ if retEv, ok := pair.ExitEv.(*types.RetEvent); ok && retEv.Ret < 0 {
+ stats.errorCount++
+ }
+}
+
+func (a *familyAccumulator) snapshotInputs() []familySnapshotInput {
+ if a == nil {
+ return nil
+ }
+
+ inputs := make([]familySnapshotInput, 0, len(a.byFamily))
+ for _, stats := range a.byFamily {
+ inputs = append(inputs, familySnapshotInput{
+ family: stats.family,
+ count: stats.count,
+ errorCount: stats.errorCount,
+ totalBytes: stats.totalBytes,
+ totalLatency: stats.totalLatency,
+ minLatency: stats.minLatency,
+ maxLatency: stats.maxLatency,
+ })
+ }
+ return inputs
+}
+
+func buildFamilySnapshots(inputs []familySnapshotInput, elapsed time.Duration) ([]FamilySnapshot, error) {
+ rateDiv := elapsed.Seconds()
+ result := make([]FamilySnapshot, 0, len(inputs))
+ for _, in := range inputs {
+ result = append(result, in.toSnapshot(rateDiv))
+ }
+ slices.SortFunc(result, compareFamilyDefault)
+ return result, nil
+}
+
+func (s *familyStats) updateMinMax(duration uint64) {
+ if s.count == 1 || duration < s.minLatency {
+ s.minLatency = duration
+ }
+ if duration > s.maxLatency {
+ s.maxLatency = duration
+ }
+}
+
+func (s familySnapshotInput) toSnapshot(rateDiv float64) FamilySnapshot {
+ return FamilySnapshot{
+ Family: s.family,
+ Name: string(s.family),
+ Count: s.count,
+ RatePerSec: safeRate(s.count, rateDiv),
+ Errors: s.errorCount,
+ Bytes: s.totalBytes,
+ LatencyMinNs: s.minLatency,
+ LatencyMaxNs: s.maxLatency,
+ LatencyMeanNs: float64(s.totalLatency) / float64(maxU64(s.count, 1)),
+ TotalLatencyNs: s.totalLatency,
+ }
+}
+
+func compareFamilyDefault(left, right FamilySnapshot) int {
+ if left.Count != right.Count {
+ return cmp.Compare(right.Count, left.Count)
+ }
+ return cmp.Compare(types.SyscallFamilyRank(left.Family), types.SyscallFamilyRank(right.Family))
+}
diff --git a/internal/statsengine/snapshot.go b/internal/statsengine/snapshot.go
index 7a95ab8..859cd2e 100644
--- a/internal/statsengine/snapshot.go
+++ b/internal/statsengine/snapshot.go
@@ -51,6 +51,7 @@ type Snapshot struct {
throughputSeriesB []float64
syscalls []SyscallSnapshot
+ families []FamilySnapshot
files []FileSnapshot
processes []ProcessSnapshot
@@ -77,6 +78,22 @@ type SyscallSnapshot struct {
LatencyP99Ns uint64
}
+// FamilySnapshot is an aggregated syscall-family row.
+type FamilySnapshot struct {
+ Family types.SyscallFamily
+ Name string
+
+ Count uint64
+ RatePerSec float64
+ Errors uint64
+ Bytes uint64
+
+ LatencyMinNs uint64
+ LatencyMaxNs uint64
+ LatencyMeanNs float64
+ TotalLatencyNs uint64
+}
+
// FileSnapshot is an aggregated per-file ranking entry.
type FileSnapshot struct {
Path string
@@ -129,11 +146,31 @@ func NewSnapshot(
latencyHistogram HistogramSnapshot,
gapHistogram HistogramSnapshot,
) Snapshot {
+ return NewSnapshotWithFamilies(
+ latencySeriesNs, gapSeriesNs, throughputSeriesB,
+ syscalls, nil, files, processes,
+ latencyHistogram, gapHistogram,
+ )
+}
+
+// NewSnapshotWithFamilies creates a snapshot including family aggregate rows.
+func NewSnapshotWithFamilies(
+ latencySeriesNs []float64,
+ gapSeriesNs []float64,
+ throughputSeriesB []float64,
+ syscalls []SyscallSnapshot,
+ families []FamilySnapshot,
+ files []FileSnapshot,
+ processes []ProcessSnapshot,
+ latencyHistogram HistogramSnapshot,
+ gapHistogram HistogramSnapshot,
+) Snapshot {
return Snapshot{
latencySeriesNs: slices.Clone(latencySeriesNs),
gapSeriesNs: slices.Clone(gapSeriesNs),
throughputSeriesB: slices.Clone(throughputSeriesB),
syscalls: slices.Clone(syscalls),
+ families: slices.Clone(families),
files: slices.Clone(files),
processes: slices.Clone(processes),
LatencyHistogram: latencyHistogram.Clone(),
@@ -187,6 +224,39 @@ func (s Snapshot) SyscallsCount() int {
return len(s.syscalls)
}
+// Families returns per-syscall-family snapshot rows.
+// Callers must treat returned data as read-only.
+func (s Snapshot) Families() []FamilySnapshot {
+ return s.families
+}
+
+// FamiliesCount returns number of syscall-family rows without cloning backing slices.
+func (s Snapshot) FamiliesCount() int {
+ return len(s.families)
+}
+
+// NonIOFamilies returns family rows outside the file-system/fd-focused family.
+func (s Snapshot) NonIOFamilies() []FamilySnapshot {
+ rows := make([]FamilySnapshot, 0, len(s.families))
+ for _, row := range s.families {
+ if types.IsNonIOSyscallFamily(row.Family) {
+ rows = append(rows, row)
+ }
+ }
+ return rows
+}
+
+// NonIOFamiliesCount returns number of non-FS syscall-family rows.
+func (s Snapshot) NonIOFamiliesCount() int {
+ count := 0
+ for _, row := range s.families {
+ if types.IsNonIOSyscallFamily(row.Family) {
+ count++
+ }
+ }
+ return count
+}
+
// TopNSyscalls returns at most n per-syscall rows in ranking order.
// Callers must treat returned data as read-only.
func (s Snapshot) TopNSyscalls(n int) []SyscallSnapshot {
diff --git a/internal/statsengine/snapshot_test.go b/internal/statsengine/snapshot_test.go
index 277fa0d..9b54409 100644
--- a/internal/statsengine/snapshot_test.go
+++ b/internal/statsengine/snapshot_test.go
@@ -1,22 +1,28 @@
package statsengine
-import "testing"
+import (
+ "testing"
+
+ "ior/internal/types"
+)
func TestNewSnapshotDefensivelyCopiesSlices(t *testing.T) {
latency := []float64{1, 2, 3}
gap := []float64{4, 5, 6}
throughput := []float64{7, 8, 9}
syscalls := []SyscallSnapshot{{Name: "read", Count: 1}}
+ families := []FamilySnapshot{{Family: types.FamilyPolling, Name: "Polling", Count: 3}}
files := []FileSnapshot{{Path: "/tmp/a", Accesses: 2}}
processes := []ProcessSnapshot{{PID: 10, Comm: "cmd"}}
latencyBuckets := []HistogramBucketSnapshot{{Label: "[0,1)", Count: 3}}
gapBuckets := []HistogramBucketSnapshot{{Label: "[1,10)", Count: 4}}
- s := NewSnapshot(
+ s := NewSnapshotWithFamilies(
latency,
gap,
throughput,
syscalls,
+ families,
files,
processes,
NewHistogramSnapshot(3, latencyBuckets),
@@ -27,6 +33,7 @@ func TestNewSnapshotDefensivelyCopiesSlices(t *testing.T) {
gap[0] = 99
throughput[0] = 99
syscalls[0].Name = "write"
+ families[0].Name = "FS"
files[0].Path = "/tmp/b"
processes[0].Comm = "mutated"
latencyBuckets[0].Count = 99
@@ -44,6 +51,9 @@ func TestNewSnapshotDefensivelyCopiesSlices(t *testing.T) {
if got := s.Syscalls()[0].Name; got != "read" {
t.Fatalf("syscalls mutated through input slice: got %q", got)
}
+ if got := s.Families()[0].Name; got != "Polling" {
+ t.Fatalf("families mutated through input slice: got %q", got)
+ }
if got := s.Files()[0].Path; got != "/tmp/a" {
t.Fatalf("files mutated through input slice: got %q", got)
}
@@ -59,11 +69,12 @@ func TestNewSnapshotDefensivelyCopiesSlices(t *testing.T) {
}
func TestSnapshotAccessorsReturnReadOnlyViews(t *testing.T) {
- s := NewSnapshot(
+ s := NewSnapshotWithFamilies(
[]float64{1},
[]float64{2},
[]float64{3},
[]SyscallSnapshot{{Name: "read"}},
+ []FamilySnapshot{{Family: types.FamilyPolling, Name: "Polling"}},
[]FileSnapshot{{Path: "/tmp/a"}},
[]ProcessSnapshot{{Comm: "cmd"}},
NewHistogramSnapshot(1, []HistogramBucketSnapshot{{Label: "a", Count: 1}}),
@@ -82,6 +93,12 @@ func TestSnapshotAccessorsReturnReadOnlyViews(t *testing.T) {
t.Fatalf("expected accessor to return backing slice view, got %q", got)
}
+ families := s.Families()
+ families[0].Name = "Process"
+ if got := s.Families()[0].Name; got != "Process" {
+ t.Fatalf("expected family accessor to return backing slice view, got %q", got)
+ }
+
buckets := s.LatencyHistogram.Buckets()
buckets[0].Count = 99
if got := s.LatencyHistogram.Buckets()[0].Count; got != 99 {
@@ -97,6 +114,9 @@ func TestNilAccessorsRemainNil(t *testing.T) {
if got := s.Syscalls(); got != nil {
t.Fatalf("expected nil syscalls, got %#v", got)
}
+ if got := s.Families(); got != nil {
+ t.Fatalf("expected nil families, got %#v", got)
+ }
h := HistogramSnapshot{}
if got := h.Buckets(); got != nil {
@@ -104,6 +124,35 @@ func TestNilAccessorsRemainNil(t *testing.T) {
}
}
+func TestSnapshotNonIOFamilies(t *testing.T) {
+ s := NewSnapshotWithFamilies(
+ nil,
+ nil,
+ nil,
+ nil,
+ []FamilySnapshot{
+ {Family: types.FamilyFS, Name: "FS"},
+ {Family: types.FamilyPolling, Name: "Polling"},
+ {Family: types.FamilyProcess, Name: "Process"},
+ },
+ nil,
+ nil,
+ HistogramSnapshot{},
+ HistogramSnapshot{},
+ )
+
+ rows := s.NonIOFamilies()
+ if len(rows) != 2 {
+ t.Fatalf("NonIOFamilies len = %d, want 2", len(rows))
+ }
+ if rows[0].Family == types.FamilyFS || rows[1].Family == types.FamilyFS {
+ t.Fatalf("NonIOFamilies included FS: %+v", rows)
+ }
+ if got := s.NonIOFamiliesCount(); got != 2 {
+ t.Fatalf("NonIOFamiliesCount = %d, want 2", got)
+ }
+}
+
func TestTopNAccessors(t *testing.T) {
s := NewSnapshot(
nil,