summaryrefslogtreecommitdiff
path: root/internal/statsengine
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-02-24 12:18:54 +0200
committerPaul Buetow <paul@buetow.org>2026-02-24 12:18:54 +0200
commitbd3c53086a3fe3ac177f4c656d1e521a2f0595b1 (patch)
tree4a1ecc4b62842c6483a939c157c6a4fb5e440cf7 /internal/statsengine
parent76468727f1ef06caefd8fc8c48cf1a12aa414035 (diff)
statsengine: compact process accumulator at high cardinality
Diffstat (limited to 'internal/statsengine')
-rw-r--r--internal/statsengine/engine.go2
-rw-r--r--internal/statsengine/process.go63
-rw-r--r--internal/statsengine/process_test.go29
3 files changed, 91 insertions, 3 deletions
diff --git a/internal/statsengine/engine.go b/internal/statsengine/engine.go
index 18f83dd..6b20a14 100644
--- a/internal/statsengine/engine.go
+++ b/internal/statsengine/engine.go
@@ -74,7 +74,7 @@ func newEngineWithClock(topN int, now func() time.Time) *Engine {
startedAt: now(),
syscalls: newSyscallAccumulator(),
files: newFileRankerWithConfig(topN),
- processes: newProcessAccumulator(),
+ processes: newProcessAccumulatorWithConfig(topN),
latencyHist: newHistogram(),
gapHist: newHistogram(),
latencySeries: newRingTimeSeries(),
diff --git a/internal/statsengine/process.go b/internal/statsengine/process.go
index a0d29c1..e677744 100644
--- a/internal/statsengine/process.go
+++ b/internal/statsengine/process.go
@@ -6,8 +6,12 @@ import (
"time"
)
+const processRankTopNDefault = 20
+
type processAccumulator struct {
- byPID map[uint32]*processStats
+ topN int
+ maxSeen int
+ byPID map[uint32]*processStats
}
type processStats struct {
@@ -27,7 +31,28 @@ type processSnapshotInput struct {
}
func newProcessAccumulator() *processAccumulator {
- return &processAccumulator{byPID: make(map[uint32]*processStats)}
+ return newProcessAccumulatorWithConfig(processRankTopNDefault)
+}
+
+func newProcessAccumulatorWithConfig(topN int) *processAccumulator {
+ if topN <= 0 {
+ topN = processRankTopNDefault
+ }
+ return newProcessAccumulatorWithLimits(topN, topN*32)
+}
+
+func newProcessAccumulatorWithLimits(topN int, maxSeen int) *processAccumulator {
+ if topN <= 0 {
+ topN = processRankTopNDefault
+ }
+ if maxSeen < topN {
+ maxSeen = topN
+ }
+ return &processAccumulator{
+ topN: topN,
+ maxSeen: maxSeen,
+ byPID: make(map[uint32]*processStats),
+ }
}
func (a *processAccumulator) Add(pair *event.Pair) {
@@ -54,6 +79,7 @@ func (a *processAccumulator) Add(pair *event.Pair) {
if pair.Comm != "" {
stats.comm = pair.Comm
}
+ a.compactIfNeeded()
}
func (a *processAccumulator) Snapshot(elapsed time.Duration) []ProcessSnapshot {
@@ -100,6 +126,39 @@ func buildProcessSnapshots(inputs []processSnapshotInput, elapsed time.Duration)
return result
}
+func (a *processAccumulator) compactIfNeeded() {
+ if len(a.byPID) <= a.maxSeen {
+ return
+ }
+
+ ordered := make([]*processStats, 0, len(a.byPID))
+ for _, stats := range a.byPID {
+ ordered = append(ordered, stats)
+ }
+ sort.Slice(ordered, func(i, j int) bool {
+ return betterProcessRank(ordered[i], ordered[j])
+ })
+ if len(ordered) > a.topN {
+ ordered = ordered[:a.topN]
+ }
+
+ kept := make(map[uint32]*processStats, len(ordered))
+ for _, stats := range ordered {
+ kept[stats.pid] = stats
+ }
+ a.byPID = kept
+}
+
+func betterProcessRank(a, b *processStats) bool {
+ if a.count != b.count {
+ return a.count > b.count
+ }
+ if a.totalBytes != b.totalBytes {
+ return a.totalBytes > b.totalBytes
+ }
+ return a.pid < b.pid
+}
+
func (s processSnapshotInput) toSnapshot(rateDiv float64) ProcessSnapshot {
avg := 0.0
if s.count > 0 {
diff --git a/internal/statsengine/process_test.go b/internal/statsengine/process_test.go
index add5ae1..aa3c5d2 100644
--- a/internal/statsengine/process_test.go
+++ b/internal/statsengine/process_test.go
@@ -115,6 +115,35 @@ func TestProcessAccumulatorNilInputs(t *testing.T) {
}
}
+func TestProcessAccumulatorCompactsHighCardinality(t *testing.T) {
+ acc := newProcessAccumulatorWithLimits(2, 4)
+
+ for i := 0; i < 5; i++ {
+ acc.Add(newProcessPair(10, "hot-a", 10, 1))
+ }
+ for i := 0; i < 4; i++ {
+ acc.Add(newProcessPair(20, "hot-b", 10, 1))
+ }
+ acc.Add(newProcessPair(1, "cold-1", 10, 1))
+ acc.Add(newProcessPair(2, "cold-2", 10, 1))
+ acc.Add(newProcessPair(3, "cold-3", 10, 1))
+
+ if got := len(acc.byPID); got != 2 {
+ t.Fatalf("expected compaction to keep topN processes, got %d entries", got)
+ }
+ if acc.byPID[10] == nil || acc.byPID[20] == nil {
+ t.Fatalf("expected hot pids to survive compaction")
+ }
+
+ snap := acc.Snapshot(time.Second)
+ if len(snap) != 2 {
+ t.Fatalf("expected 2 rows after compaction, got %d", len(snap))
+ }
+ if snap[0].PID != 10 || snap[1].PID != 20 {
+ t.Fatalf("unexpected rank order after compaction: %+v", snap)
+ }
+}
+
func newProcessPair(pid uint32, comm string, duration uint64, bytes uint64) *event.Pair {
return &event.Pair{
EnterEv: &types.RetEvent{Pid: pid},