diff options
| author | Paul Buetow <paul@buetow.org> | 2026-02-24 12:18:54 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-02-24 12:18:54 +0200 |
| commit | bd3c53086a3fe3ac177f4c656d1e521a2f0595b1 (patch) | |
| tree | 4a1ecc4b62842c6483a939c157c6a4fb5e440cf7 /internal | |
| parent | 76468727f1ef06caefd8fc8c48cf1a12aa414035 (diff) | |
statsengine: compact process accumulator at high cardinality
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/statsengine/engine.go | 2 | ||||
| -rw-r--r-- | internal/statsengine/process.go | 63 | ||||
| -rw-r--r-- | internal/statsengine/process_test.go | 29 |
3 files changed, 91 insertions, 3 deletions
diff --git a/internal/statsengine/engine.go b/internal/statsengine/engine.go index 18f83dd..6b20a14 100644 --- a/internal/statsengine/engine.go +++ b/internal/statsengine/engine.go @@ -74,7 +74,7 @@ func newEngineWithClock(topN int, now func() time.Time) *Engine { startedAt: now(), syscalls: newSyscallAccumulator(), files: newFileRankerWithConfig(topN), - processes: newProcessAccumulator(), + processes: newProcessAccumulatorWithConfig(topN), latencyHist: newHistogram(), gapHist: newHistogram(), latencySeries: newRingTimeSeries(), diff --git a/internal/statsengine/process.go b/internal/statsengine/process.go index a0d29c1..e677744 100644 --- a/internal/statsengine/process.go +++ b/internal/statsengine/process.go @@ -6,8 +6,12 @@ import ( "time" ) +const processRankTopNDefault = 20 + type processAccumulator struct { - byPID map[uint32]*processStats + topN int + maxSeen int + byPID map[uint32]*processStats } type processStats struct { @@ -27,7 +31,28 @@ type processSnapshotInput struct { } func newProcessAccumulator() *processAccumulator { - return &processAccumulator{byPID: make(map[uint32]*processStats)} + return newProcessAccumulatorWithConfig(processRankTopNDefault) +} + +func newProcessAccumulatorWithConfig(topN int) *processAccumulator { + if topN <= 0 { + topN = processRankTopNDefault + } + return newProcessAccumulatorWithLimits(topN, topN*32) +} + +func newProcessAccumulatorWithLimits(topN int, maxSeen int) *processAccumulator { + if topN <= 0 { + topN = processRankTopNDefault + } + if maxSeen < topN { + maxSeen = topN + } + return &processAccumulator{ + topN: topN, + maxSeen: maxSeen, + byPID: make(map[uint32]*processStats), + } } func (a *processAccumulator) Add(pair *event.Pair) { @@ -54,6 +79,7 @@ func (a *processAccumulator) Add(pair *event.Pair) { if pair.Comm != "" { stats.comm = pair.Comm } + a.compactIfNeeded() } func (a *processAccumulator) Snapshot(elapsed time.Duration) []ProcessSnapshot { @@ -100,6 +126,39 @@ func buildProcessSnapshots(inputs []processSnapshotInput, elapsed time.Duration) return result } +func (a *processAccumulator) compactIfNeeded() { + if len(a.byPID) <= a.maxSeen { + return + } + + ordered := make([]*processStats, 0, len(a.byPID)) + for _, stats := range a.byPID { + ordered = append(ordered, stats) + } + sort.Slice(ordered, func(i, j int) bool { + return betterProcessRank(ordered[i], ordered[j]) + }) + if len(ordered) > a.topN { + ordered = ordered[:a.topN] + } + + kept := make(map[uint32]*processStats, len(ordered)) + for _, stats := range ordered { + kept[stats.pid] = stats + } + a.byPID = kept +} + +func betterProcessRank(a, b *processStats) bool { + if a.count != b.count { + return a.count > b.count + } + if a.totalBytes != b.totalBytes { + return a.totalBytes > b.totalBytes + } + return a.pid < b.pid +} + func (s processSnapshotInput) toSnapshot(rateDiv float64) ProcessSnapshot { avg := 0.0 if s.count > 0 { diff --git a/internal/statsengine/process_test.go b/internal/statsengine/process_test.go index add5ae1..aa3c5d2 100644 --- a/internal/statsengine/process_test.go +++ b/internal/statsengine/process_test.go @@ -115,6 +115,35 @@ func TestProcessAccumulatorNilInputs(t *testing.T) { } } +func TestProcessAccumulatorCompactsHighCardinality(t *testing.T) { + acc := newProcessAccumulatorWithLimits(2, 4) + + for i := 0; i < 5; i++ { + acc.Add(newProcessPair(10, "hot-a", 10, 1)) + } + for i := 0; i < 4; i++ { + acc.Add(newProcessPair(20, "hot-b", 10, 1)) + } + acc.Add(newProcessPair(1, "cold-1", 10, 1)) + acc.Add(newProcessPair(2, "cold-2", 10, 1)) + acc.Add(newProcessPair(3, "cold-3", 10, 1)) + + if got := len(acc.byPID); got != 2 { + t.Fatalf("expected compaction to keep topN processes, got %d entries", got) + } + if acc.byPID[10] == nil || acc.byPID[20] == nil { + t.Fatalf("expected hot pids to survive compaction") + } + + snap := acc.Snapshot(time.Second) + if len(snap) != 2 { + t.Fatalf("expected 2 rows after compaction, got %d", len(snap)) + } + if snap[0].PID != 10 || snap[1].PID != 20 { + t.Fatalf("unexpected rank order after compaction: %+v", snap) + } +} + func newProcessPair(pid uint32, comm string, duration uint64, bytes uint64) *event.Pair { return &event.Pair{ EnterEv: &types.RetEvent{Pid: pid}, |
