summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-02-23 23:08:40 +0200
committerPaul Buetow <paul@buetow.org>2026-02-23 23:08:40 +0200
commit9c04c55b443e5a22cc34cc24e09f10fe84d5e999 (patch)
treec13497da30c0465aaed55b636ffb0ee99ca709a2
parent4b00bd30a8c1247f5dfee77277fdd4b438750bd0 (diff)
task 305: add top-N file ranker with bounded cardinality
-rw-r--r--internal/statsengine/filerank.go207
-rw-r--r--internal/statsengine/filerank_test.go125
2 files changed, 332 insertions, 0 deletions
diff --git a/internal/statsengine/filerank.go b/internal/statsengine/filerank.go
new file mode 100644
index 0000000..9054ff7
--- /dev/null
+++ b/internal/statsengine/filerank.go
@@ -0,0 +1,207 @@
+package statsengine
+
+import (
+ "container/heap"
+ "ior/internal/event"
+ "ior/internal/types"
+ "sort"
+)
+
+const fileRankTopNDefault = 20
+
+type fileRanker struct {
+ topN int
+ maxSeen int
+ byPath map[string]*fileRankStats
+ topHeap fileRankHeap
+}
+
+type fileRankStats struct {
+ path string
+ accesses uint64
+ bytesRead uint64
+ bytesWritten uint64
+ totalLatency uint64
+ maxLatency uint64
+ heapIndex int
+}
+
+type fileRankHeap []*fileRankStats
+
+func newFileRanker() *fileRanker {
+ return newFileRankerWithConfig(fileRankTopNDefault)
+}
+
+func newFileRankerWithConfig(topN int) *fileRanker {
+ if topN <= 0 {
+ topN = fileRankTopNDefault
+ }
+ return newFileRankerWithLimits(topN, topN*32)
+}
+
+func newFileRankerWithLimits(topN int, maxSeen int) *fileRanker {
+ if topN <= 0 {
+ topN = fileRankTopNDefault
+ }
+ if maxSeen < topN {
+ maxSeen = topN
+ }
+
+ r := &fileRanker{
+ topN: topN,
+ maxSeen: maxSeen,
+ byPath: make(map[string]*fileRankStats),
+ }
+ heap.Init(&r.topHeap)
+ return r
+}
+
+func (r *fileRanker) Add(pair *event.Pair) {
+ if r == nil || pair == nil || pair.File == nil {
+ return
+ }
+
+ path := pair.File.Name()
+ if path == "" || path == "N:file" {
+ return
+ }
+
+ stats := r.byPath[path]
+ if stats == nil {
+ stats = &fileRankStats{path: path, heapIndex: -1}
+ r.byPath[path] = stats
+ }
+
+ stats.accesses++
+ stats.totalLatency += pair.Duration
+ if pair.Duration > stats.maxLatency {
+ stats.maxLatency = pair.Duration
+ }
+ r.addBytes(stats, pair)
+ r.updateHeap(stats)
+ r.compactIfNeeded()
+}
+
+func (r *fileRanker) Snapshot() []FileSnapshot {
+ if r == nil {
+ return nil
+ }
+
+ out := make([]FileSnapshot, 0, len(r.topHeap))
+ for _, stats := range r.topHeap {
+ out = append(out, stats.snapshot())
+ }
+
+ sort.Slice(out, func(i, j int) bool {
+ if out[i].Accesses != out[j].Accesses {
+ return out[i].Accesses > out[j].Accesses
+ }
+ return out[i].Path < out[j].Path
+ })
+
+ return out
+}
+
+func (r *fileRanker) addBytes(stats *fileRankStats, pair *event.Pair) {
+ retEv, ok := pair.ExitEv.(*types.RetEvent)
+ if !ok {
+ return
+ }
+
+ switch retEv.RetType {
+ case types.READ_CLASSIFIED:
+ stats.bytesRead += pair.Bytes
+ case types.WRITE_CLASSIFIED:
+ stats.bytesWritten += pair.Bytes
+ case types.TRANSFER_CLASSIFIED:
+ // Transfer syscalls move bytes in both directions from a file-centric view.
+ stats.bytesRead += pair.Bytes
+ stats.bytesWritten += pair.Bytes
+ }
+}
+
+func (r *fileRanker) updateHeap(stats *fileRankStats) {
+ if stats.heapIndex >= 0 {
+ heap.Fix(&r.topHeap, stats.heapIndex)
+ return
+ }
+
+ if len(r.topHeap) < r.topN {
+ heap.Push(&r.topHeap, stats)
+ return
+ }
+
+ worst := r.topHeap[0]
+ if !betterFileRank(stats, worst) {
+ return
+ }
+
+ heap.Pop(&r.topHeap)
+ heap.Push(&r.topHeap, stats)
+}
+
+func (r *fileRanker) compactIfNeeded() {
+ if len(r.byPath) <= r.maxSeen {
+ return
+ }
+
+ // Keep only currently top-ranked paths once cardinality crosses the guard.
+ kept := make(map[string]*fileRankStats, len(r.topHeap))
+ for _, stats := range r.topHeap {
+ kept[stats.path] = stats
+ }
+ r.byPath = kept
+}
+
+func (s *fileRankStats) snapshot() FileSnapshot {
+ avg := 0.0
+ if s.accesses > 0 {
+ avg = float64(s.totalLatency) / float64(s.accesses)
+ }
+
+ return FileSnapshot{
+ Path: s.path,
+ Accesses: s.accesses,
+ BytesRead: s.bytesRead,
+ BytesWritten: s.bytesWritten,
+ AvgLatencyNs: avg,
+ MaxLatencyNs: s.maxLatency,
+ }
+}
+
+func betterFileRank(a, b *fileRankStats) bool {
+ if a.accesses != b.accesses {
+ return a.accesses > b.accesses
+ }
+ return a.path < b.path
+}
+
+func (h fileRankHeap) Len() int {
+ return len(h)
+}
+
+func (h fileRankHeap) Less(i, j int) bool {
+ // Keep the worst-ranked item at root for O(log N) eviction.
+ return betterFileRank(h[j], h[i])
+}
+
+func (h fileRankHeap) Swap(i, j int) {
+ h[i], h[j] = h[j], h[i]
+ h[i].heapIndex = i
+ h[j].heapIndex = j
+}
+
+func (h *fileRankHeap) Push(x any) {
+ stats := x.(*fileRankStats)
+ stats.heapIndex = len(*h)
+ *h = append(*h, stats)
+}
+
+func (h *fileRankHeap) Pop() any {
+ old := *h
+ n := len(old)
+ stats := old[n-1]
+ stats.heapIndex = -1
+ *h = old[:n-1]
+ return stats
+}
diff --git a/internal/statsengine/filerank_test.go b/internal/statsengine/filerank_test.go
new file mode 100644
index 0000000..26a0b23
--- /dev/null
+++ b/internal/statsengine/filerank_test.go
@@ -0,0 +1,125 @@
+package statsengine
+
+import (
+ "fmt"
+ "ior/internal/event"
+ "ior/internal/file"
+ "ior/internal/types"
+ "reflect"
+ "testing"
+)
+
+func TestFileRankerHeapEviction(t *testing.T) {
+ r := newFileRankerWithConfig(2)
+
+ r.Add(newFilePair("/a", 10, 1, types.READ_CLASSIFIED))
+ r.Add(newFilePair("/b", 10, 1, types.READ_CLASSIFIED))
+ r.Add(newFilePair("/b", 10, 1, types.READ_CLASSIFIED))
+ r.Add(newFilePair("/c", 10, 1, types.READ_CLASSIFIED))
+ r.Add(newFilePair("/c", 10, 1, types.READ_CLASSIFIED))
+ r.Add(newFilePair("/c", 10, 1, types.READ_CLASSIFIED))
+
+ got := paths(r.Snapshot())
+ want := []string{"/c", "/b"}
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("unexpected top-n paths: got %v want %v", got, want)
+ }
+}
+
+func TestFileRankerRankingCorrectness(t *testing.T) {
+ r := newFileRankerWithConfig(3)
+
+ r.Add(newFilePair("/read", 20, 11, types.READ_CLASSIFIED))
+ r.Add(newFilePair("/read", 10, 9, types.READ_CLASSIFIED))
+ r.Add(newFilePair("/write", 25, 7, types.WRITE_CLASSIFIED))
+ r.Add(newFilePair("/write", 15, 3, types.WRITE_CLASSIFIED))
+ r.Add(newFilePair("/xfer", 40, 5, types.TRANSFER_CLASSIFIED))
+ r.Add(newFilePair("/xfer", 60, 6, types.TRANSFER_CLASSIFIED))
+
+ snap := r.Snapshot()
+ if len(snap) != 3 {
+ t.Fatalf("expected 3 rows, got %d", len(snap))
+ }
+
+ gotOrder := paths(snap)
+ wantOrder := []string{"/read", "/write", "/xfer"}
+ if !reflect.DeepEqual(gotOrder, wantOrder) {
+ t.Fatalf("unexpected ranking order: got %v want %v", gotOrder, wantOrder)
+ }
+
+ byPath := make(map[string]FileSnapshot, len(snap))
+ for _, row := range snap {
+ byPath[row.Path] = row
+ }
+
+ readRow := byPath["/read"]
+ if readRow.Accesses != 2 || readRow.BytesRead != 20 || readRow.BytesWritten != 0 || readRow.MaxLatencyNs != 20 {
+ t.Fatalf("unexpected /read stats: %+v", readRow)
+ }
+ if readRow.AvgLatencyNs != 15 {
+ t.Fatalf("unexpected /read avg latency: %v", readRow.AvgLatencyNs)
+ }
+
+ writeRow := byPath["/write"]
+ if writeRow.BytesRead != 0 || writeRow.BytesWritten != 10 {
+ t.Fatalf("unexpected /write bytes split: %+v", writeRow)
+ }
+
+ xferRow := byPath["/xfer"]
+ if xferRow.BytesRead != 11 || xferRow.BytesWritten != 11 {
+ t.Fatalf("unexpected /xfer bytes split: %+v", xferRow)
+ }
+ if xferRow.AvgLatencyNs != 50 {
+ t.Fatalf("unexpected /xfer avg latency: %v", xferRow.AvgLatencyNs)
+ }
+}
+
+func TestFileRankerIgnoresInvalidFileData(t *testing.T) {
+ r := newFileRankerWithConfig(3)
+
+ r.Add(nil)
+ r.Add(&event.Pair{})
+ r.Add(&event.Pair{File: file.NewFd(1, "", -1), Duration: 10, Bytes: 1, ExitEv: &types.RetEvent{RetType: types.READ_CLASSIFIED}})
+
+ if got := r.Snapshot(); len(got) != 0 {
+ t.Fatalf("expected empty snapshot for invalid rows, got %+v", got)
+ }
+}
+
+func TestFileRankerCompactsHighCardinality(t *testing.T) {
+ r := newFileRankerWithLimits(3, 5)
+
+ // Make one file clearly hot so it must survive compaction.
+ for i := 0; i < 10; i++ {
+ r.Add(newFilePair("/hot", 10, 1, types.READ_CLASSIFIED))
+ }
+ for i := 0; i < 200; i++ {
+ r.Add(newFilePair(fmt.Sprintf("/tmp/file-%d", i), 1, 1, types.READ_CLASSIFIED))
+ }
+
+ if len(r.byPath) > r.maxSeen {
+ t.Fatalf("cardinality guard failed: byPath=%d maxSeen=%d", len(r.byPath), r.maxSeen)
+ }
+
+ snap := r.Snapshot()
+ if len(snap) == 0 || snap[0].Path != "/hot" {
+ t.Fatalf("expected hot file to remain top-ranked after compaction, got %+v", snap)
+ }
+}
+
+func newFilePair(path string, duration uint64, bytes uint64, retType uint32) *event.Pair {
+ return &event.Pair{
+ File: file.NewFd(3, path, -1),
+ Duration: duration,
+ Bytes: bytes,
+ ExitEv: &types.RetEvent{RetType: retType},
+ }
+}
+
+func paths(rows []FileSnapshot) []string {
+ out := make([]string, 0, len(rows))
+ for _, row := range rows {
+ out = append(out, row.Path)
+ }
+ return out
+}