summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/statsengine/process.go89
-rw-r--r--internal/statsengine/process_test.go125
2 files changed, 214 insertions, 0 deletions
diff --git a/internal/statsengine/process.go b/internal/statsengine/process.go
new file mode 100644
index 0000000..296312b
--- /dev/null
+++ b/internal/statsengine/process.go
@@ -0,0 +1,89 @@
+package statsengine
+
+import (
+ "ior/internal/event"
+ "sort"
+ "time"
+)
+
+type processAccumulator struct {
+ byPID map[uint32]*processStats
+}
+
+type processStats struct {
+ pid uint32
+ comm string
+ count uint64
+ totalBytes uint64
+ totalLatency uint64
+}
+
+func newProcessAccumulator() *processAccumulator {
+ return &processAccumulator{byPID: make(map[uint32]*processStats)}
+}
+
+func (a *processAccumulator) Add(pair *event.Pair) {
+ if a == nil || pair == nil || pair.EnterEv == nil {
+ return
+ }
+
+ pid := pair.EnterEv.GetPid()
+ stats := a.byPID[pid]
+ if stats == nil {
+ stats = &processStats{pid: pid}
+ a.byPID[pid] = stats
+ }
+ if pair.Comm != "" && stats.comm != "" && stats.comm != pair.Comm {
+ // Best-effort PID reuse handling: when command name changes for an
+ // existing PID, treat it as a new process lifetime and reset counters.
+ stats = &processStats{pid: pid}
+ a.byPID[pid] = stats
+ }
+
+ stats.count++
+ stats.totalBytes += pair.Bytes
+ stats.totalLatency += pair.Duration
+ if pair.Comm != "" {
+ stats.comm = pair.Comm
+ }
+}
+
+func (a *processAccumulator) Snapshot(elapsed time.Duration) []ProcessSnapshot {
+ if a == nil {
+ return nil
+ }
+
+ rateDiv := elapsed.Seconds()
+ result := make([]ProcessSnapshot, 0, len(a.byPID))
+ for _, stats := range a.byPID {
+ result = append(result, stats.toSnapshot(rateDiv))
+ }
+
+ sort.Slice(result, func(i, j int) bool {
+ if result[i].Syscalls != result[j].Syscalls {
+ return result[i].Syscalls > result[j].Syscalls
+ }
+ if result[i].Bytes != result[j].Bytes {
+ return result[i].Bytes > result[j].Bytes
+ }
+ return result[i].PID < result[j].PID
+ })
+
+ return result
+}
+
+func (s *processStats) toSnapshot(rateDiv float64) ProcessSnapshot {
+ avg := 0.0
+ if s.count > 0 {
+ avg = float64(s.totalLatency) / float64(s.count)
+ }
+
+ return ProcessSnapshot{
+ PID: s.pid,
+ Comm: s.comm,
+ Syscalls: s.count,
+ RatePerSec: safeRate(s.count, rateDiv),
+ Bytes: s.totalBytes,
+ AvgLatencyNs: avg,
+ }
+}
diff --git a/internal/statsengine/process_test.go b/internal/statsengine/process_test.go
new file mode 100644
index 0000000..add5ae1
--- /dev/null
+++ b/internal/statsengine/process_test.go
@@ -0,0 +1,125 @@
+package statsengine
+
+import (
+ "ior/internal/event"
+ "ior/internal/types"
+ "math"
+ "testing"
+ "time"
+)
+
+func TestProcessAccumulatorBasicStats(t *testing.T) {
+ acc := newProcessAccumulator()
+
+ acc.Add(newProcessPair(2000, "proc-a", 10, 100))
+ acc.Add(newProcessPair(2000, "proc-a", 30, 50))
+ acc.Add(newProcessPair(3000, "proc-b", 20, 25))
+
+ snap := acc.Snapshot(2 * time.Second)
+ if len(snap) != 2 {
+ t.Fatalf("expected 2 process snapshots, got %d", len(snap))
+ }
+
+ p0 := snap[0]
+ if p0.PID != 2000 {
+ t.Fatalf("expected pid 2000 first, got %d", p0.PID)
+ }
+ if p0.Comm != "proc-a" {
+ t.Fatalf("unexpected comm: got %q", p0.Comm)
+ }
+ if p0.Syscalls != 2 || p0.Bytes != 150 {
+ t.Fatalf("unexpected count/bytes: %+v", p0)
+ }
+ if p0.AvgLatencyNs != 20 {
+ t.Fatalf("unexpected avg latency: got %v", p0.AvgLatencyNs)
+ }
+ if math.Abs(p0.RatePerSec-1.0) > 1e-9 {
+ t.Fatalf("unexpected rate: got %v", p0.RatePerSec)
+ }
+
+ p1 := snap[1]
+ if p1.PID != 3000 || p1.Comm != "proc-b" || p1.Syscalls != 1 {
+ t.Fatalf("unexpected second row: %+v", p1)
+ }
+}
+
+func TestProcessAccumulatorSortsBySyscallsBytesPid(t *testing.T) {
+ acc := newProcessAccumulator()
+
+ acc.Add(newProcessPair(20, "b", 10, 2))
+ acc.Add(newProcessPair(10, "a", 10, 3))
+ acc.Add(newProcessPair(10, "a", 10, 3))
+ acc.Add(newProcessPair(20, "b", 10, 2))
+
+ snap := acc.Snapshot(1 * time.Second)
+ if len(snap) != 2 {
+ t.Fatalf("expected 2 process snapshots, got %d", len(snap))
+ }
+
+ if snap[0].PID != 10 {
+ t.Fatalf("expected pid 10 first by bytes tie-breaker, got %d", snap[0].PID)
+ }
+ if snap[1].PID != 20 {
+ t.Fatalf("expected pid 20 second, got %d", snap[1].PID)
+ }
+}
+
+func TestProcessAccumulatorCommUpdateAndZeroRate(t *testing.T) {
+ acc := newProcessAccumulator()
+
+ acc.Add(newProcessPair(7, "", 10, 1))
+ acc.Add(newProcessPair(7, "worker", 20, 2))
+ snap := acc.Snapshot(0)
+
+ if len(snap) != 1 {
+ t.Fatalf("expected 1 snapshot row, got %d", len(snap))
+ }
+ if snap[0].Comm != "worker" {
+ t.Fatalf("expected comm to be updated, got %q", snap[0].Comm)
+ }
+ if snap[0].RatePerSec != 0 {
+ t.Fatalf("expected zero rate on zero elapsed, got %v", snap[0].RatePerSec)
+ }
+}
+
+func TestProcessAccumulatorResetsOnCommChangeForSamePID(t *testing.T) {
+ acc := newProcessAccumulator()
+
+ acc.Add(newProcessPair(42, "old", 100, 10))
+ acc.Add(newProcessPair(42, "new", 200, 20))
+
+ snap := acc.Snapshot(time.Second)
+ if len(snap) != 1 {
+ t.Fatalf("expected 1 snapshot row, got %d", len(snap))
+ }
+ if snap[0].Comm != "new" {
+ t.Fatalf("expected new comm after reset, got %q", snap[0].Comm)
+ }
+ if snap[0].Syscalls != 1 || snap[0].Bytes != 20 || snap[0].AvgLatencyNs != 200 {
+ t.Fatalf("expected counters to reset on comm change, got %+v", snap[0])
+ }
+}
+
+func TestProcessAccumulatorNilInputs(t *testing.T) {
+ var acc *processAccumulator
+ acc.Add(nil)
+ if got := acc.Snapshot(time.Second); got != nil {
+ t.Fatalf("expected nil snapshot from nil accumulator, got %#v", got)
+ }
+
+ acc = newProcessAccumulator()
+ acc.Add(nil)
+ acc.Add(&event.Pair{})
+ if got := acc.Snapshot(time.Second); len(got) != 0 {
+ t.Fatalf("expected empty snapshot, got %#v", got)
+ }
+}
+
+func newProcessPair(pid uint32, comm string, duration uint64, bytes uint64) *event.Pair {
+ return &event.Pair{
+ EnterEv: &types.RetEvent{Pid: pid},
+ Comm: comm,
+ Duration: duration,
+ Bytes: bytes,
+ }
+}