From bd3c53086a3fe3ac177f4c656d1e521a2f0595b1 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 24 Feb 2026 12:18:54 +0200 Subject: statsengine: compact process accumulator at high cardinality --- internal/statsengine/process.go | 63 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) (limited to 'internal/statsengine/process.go') diff --git a/internal/statsengine/process.go b/internal/statsengine/process.go index a0d29c1..e677744 100644 --- a/internal/statsengine/process.go +++ b/internal/statsengine/process.go @@ -6,8 +6,12 @@ import ( "time" ) +const processRankTopNDefault = 20 + type processAccumulator struct { - byPID map[uint32]*processStats + topN int + maxSeen int + byPID map[uint32]*processStats } type processStats struct { @@ -27,7 +31,28 @@ type processSnapshotInput struct { } func newProcessAccumulator() *processAccumulator { - return &processAccumulator{byPID: make(map[uint32]*processStats)} + return newProcessAccumulatorWithConfig(processRankTopNDefault) +} + +func newProcessAccumulatorWithConfig(topN int) *processAccumulator { + if topN <= 0 { + topN = processRankTopNDefault + } + return newProcessAccumulatorWithLimits(topN, topN*32) +} + +func newProcessAccumulatorWithLimits(topN int, maxSeen int) *processAccumulator { + if topN <= 0 { + topN = processRankTopNDefault + } + if maxSeen < topN { + maxSeen = topN + } + return &processAccumulator{ + topN: topN, + maxSeen: maxSeen, + byPID: make(map[uint32]*processStats), + } } func (a *processAccumulator) Add(pair *event.Pair) { @@ -54,6 +79,7 @@ func (a *processAccumulator) Add(pair *event.Pair) { if pair.Comm != "" { stats.comm = pair.Comm } + a.compactIfNeeded() } func (a *processAccumulator) Snapshot(elapsed time.Duration) []ProcessSnapshot { @@ -100,6 +126,39 @@ func buildProcessSnapshots(inputs []processSnapshotInput, elapsed time.Duration) return result } +func (a *processAccumulator) compactIfNeeded() { + if len(a.byPID) <= a.maxSeen { + return + } + + ordered := make([]*processStats, 0, len(a.byPID)) + for _, stats := range a.byPID { + ordered = append(ordered, stats) + } + sort.Slice(ordered, func(i, j int) bool { + return betterProcessRank(ordered[i], ordered[j]) + }) + if len(ordered) > a.topN { + ordered = ordered[:a.topN] + } + + kept := make(map[uint32]*processStats, len(ordered)) + for _, stats := range ordered { + kept[stats.pid] = stats + } + a.byPID = kept +} + +func betterProcessRank(a, b *processStats) bool { + if a.count != b.count { + return a.count > b.count + } + if a.totalBytes != b.totalBytes { + return a.totalBytes > b.totalBytes + } + return a.pid < b.pid +} + func (s processSnapshotInput) toSnapshot(rateDiv float64) ProcessSnapshot { avg := 0.0 if s.count > 0 { -- cgit v1.2.3