diff options
| author | Paul Buetow <paul@buetow.org> | 2026-02-23 23:08:40 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-02-23 23:08:40 +0200 |
| commit | 9c04c55b443e5a22cc34cc24e09f10fe84d5e999 (patch) | |
| tree | c13497da30c0465aaed55b636ffb0ee99ca709a2 /internal/statsengine | |
| parent | 4b00bd30a8c1247f5dfee77277fdd4b438750bd0 (diff) | |
task 305: add top-N file ranker with bounded cardinality
Diffstat (limited to 'internal/statsengine')
| -rw-r--r-- | internal/statsengine/filerank.go | 207 | ||||
| -rw-r--r-- | internal/statsengine/filerank_test.go | 125 |
2 files changed, 332 insertions, 0 deletions
diff --git a/internal/statsengine/filerank.go b/internal/statsengine/filerank.go new file mode 100644 index 0000000..9054ff7 --- /dev/null +++ b/internal/statsengine/filerank.go @@ -0,0 +1,207 @@ +package statsengine + +import ( + "container/heap" + "ior/internal/event" + "ior/internal/types" + "sort" +) + +const fileRankTopNDefault = 20 + +type fileRanker struct { + topN int + maxSeen int + byPath map[string]*fileRankStats + topHeap fileRankHeap +} + +type fileRankStats struct { + path string + accesses uint64 + bytesRead uint64 + bytesWritten uint64 + totalLatency uint64 + maxLatency uint64 + heapIndex int +} + +type fileRankHeap []*fileRankStats + +func newFileRanker() *fileRanker { + return newFileRankerWithConfig(fileRankTopNDefault) +} + +func newFileRankerWithConfig(topN int) *fileRanker { + if topN <= 0 { + topN = fileRankTopNDefault + } + return newFileRankerWithLimits(topN, topN*32) +} + +func newFileRankerWithLimits(topN int, maxSeen int) *fileRanker { + if topN <= 0 { + topN = fileRankTopNDefault + } + if maxSeen < topN { + maxSeen = topN + } + + r := &fileRanker{ + topN: topN, + maxSeen: maxSeen, + byPath: make(map[string]*fileRankStats), + } + heap.Init(&r.topHeap) + return r +} + +func (r *fileRanker) Add(pair *event.Pair) { + if r == nil || pair == nil || pair.File == nil { + return + } + + path := pair.File.Name() + if path == "" || path == "N:file" { + return + } + + stats := r.byPath[path] + if stats == nil { + stats = &fileRankStats{path: path, heapIndex: -1} + r.byPath[path] = stats + } + + stats.accesses++ + stats.totalLatency += pair.Duration + if pair.Duration > stats.maxLatency { + stats.maxLatency = pair.Duration + } + r.addBytes(stats, pair) + r.updateHeap(stats) + r.compactIfNeeded() +} + +func (r *fileRanker) Snapshot() []FileSnapshot { + if r == nil { + return nil + } + + out := make([]FileSnapshot, 0, len(r.topHeap)) + for _, stats := range r.topHeap { + out = append(out, stats.snapshot()) + } + + sort.Slice(out, func(i, j int) bool { + if out[i].Accesses != out[j].Accesses { + return out[i].Accesses > out[j].Accesses + } + return out[i].Path < out[j].Path + }) + + return out +} + +func (r *fileRanker) addBytes(stats *fileRankStats, pair *event.Pair) { + retEv, ok := pair.ExitEv.(*types.RetEvent) + if !ok { + return + } + + switch retEv.RetType { + case types.READ_CLASSIFIED: + stats.bytesRead += pair.Bytes + case types.WRITE_CLASSIFIED: + stats.bytesWritten += pair.Bytes + case types.TRANSFER_CLASSIFIED: + // Transfer syscalls move bytes in both directions from a file-centric view. + stats.bytesRead += pair.Bytes + stats.bytesWritten += pair.Bytes + } +} + +func (r *fileRanker) updateHeap(stats *fileRankStats) { + if stats.heapIndex >= 0 { + heap.Fix(&r.topHeap, stats.heapIndex) + return + } + + if len(r.topHeap) < r.topN { + heap.Push(&r.topHeap, stats) + return + } + + worst := r.topHeap[0] + if !betterFileRank(stats, worst) { + return + } + + heap.Pop(&r.topHeap) + heap.Push(&r.topHeap, stats) +} + +func (r *fileRanker) compactIfNeeded() { + if len(r.byPath) <= r.maxSeen { + return + } + + // Keep only currently top-ranked paths once cardinality crosses the guard. + kept := make(map[string]*fileRankStats, len(r.topHeap)) + for _, stats := range r.topHeap { + kept[stats.path] = stats + } + r.byPath = kept +} + +func (s *fileRankStats) snapshot() FileSnapshot { + avg := 0.0 + if s.accesses > 0 { + avg = float64(s.totalLatency) / float64(s.accesses) + } + + return FileSnapshot{ + Path: s.path, + Accesses: s.accesses, + BytesRead: s.bytesRead, + BytesWritten: s.bytesWritten, + AvgLatencyNs: avg, + MaxLatencyNs: s.maxLatency, + } +} + +func betterFileRank(a, b *fileRankStats) bool { + if a.accesses != b.accesses { + return a.accesses > b.accesses + } + return a.path < b.path +} + +func (h fileRankHeap) Len() int { + return len(h) +} + +func (h fileRankHeap) Less(i, j int) bool { + // Keep the worst-ranked item at root for O(log N) eviction. + return betterFileRank(h[j], h[i]) +} + +func (h fileRankHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].heapIndex = i + h[j].heapIndex = j +} + +func (h *fileRankHeap) Push(x any) { + stats := x.(*fileRankStats) + stats.heapIndex = len(*h) + *h = append(*h, stats) +} + +func (h *fileRankHeap) Pop() any { + old := *h + n := len(old) + stats := old[n-1] + stats.heapIndex = -1 + *h = old[:n-1] + return stats +} diff --git a/internal/statsengine/filerank_test.go b/internal/statsengine/filerank_test.go new file mode 100644 index 0000000..26a0b23 --- /dev/null +++ b/internal/statsengine/filerank_test.go @@ -0,0 +1,125 @@ +package statsengine + +import ( + "fmt" + "ior/internal/event" + "ior/internal/file" + "ior/internal/types" + "reflect" + "testing" +) + +func TestFileRankerHeapEviction(t *testing.T) { + r := newFileRankerWithConfig(2) + + r.Add(newFilePair("/a", 10, 1, types.READ_CLASSIFIED)) + r.Add(newFilePair("/b", 10, 1, types.READ_CLASSIFIED)) + r.Add(newFilePair("/b", 10, 1, types.READ_CLASSIFIED)) + r.Add(newFilePair("/c", 10, 1, types.READ_CLASSIFIED)) + r.Add(newFilePair("/c", 10, 1, types.READ_CLASSIFIED)) + r.Add(newFilePair("/c", 10, 1, types.READ_CLASSIFIED)) + + got := paths(r.Snapshot()) + want := []string{"/c", "/b"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("unexpected top-n paths: got %v want %v", got, want) + } +} + +func TestFileRankerRankingCorrectness(t *testing.T) { + r := newFileRankerWithConfig(3) + + r.Add(newFilePair("/read", 20, 11, types.READ_CLASSIFIED)) + r.Add(newFilePair("/read", 10, 9, types.READ_CLASSIFIED)) + r.Add(newFilePair("/write", 25, 7, types.WRITE_CLASSIFIED)) + r.Add(newFilePair("/write", 15, 3, types.WRITE_CLASSIFIED)) + r.Add(newFilePair("/xfer", 40, 5, types.TRANSFER_CLASSIFIED)) + r.Add(newFilePair("/xfer", 60, 6, types.TRANSFER_CLASSIFIED)) + + snap := r.Snapshot() + if len(snap) != 3 { + t.Fatalf("expected 3 rows, got %d", len(snap)) + } + + gotOrder := paths(snap) + wantOrder := []string{"/read", "/write", "/xfer"} + if !reflect.DeepEqual(gotOrder, wantOrder) { + t.Fatalf("unexpected ranking order: got %v want %v", gotOrder, wantOrder) + } + + byPath := make(map[string]FileSnapshot, len(snap)) + for _, row := range snap { + byPath[row.Path] = row + } + + readRow := byPath["/read"] + if readRow.Accesses != 2 || readRow.BytesRead != 20 || readRow.BytesWritten != 0 || readRow.MaxLatencyNs != 20 { + t.Fatalf("unexpected /read stats: %+v", readRow) + } + if readRow.AvgLatencyNs != 15 { + t.Fatalf("unexpected /read avg latency: %v", readRow.AvgLatencyNs) + } + + writeRow := byPath["/write"] + if writeRow.BytesRead != 0 || writeRow.BytesWritten != 10 { + t.Fatalf("unexpected /write bytes split: %+v", writeRow) + } + + xferRow := byPath["/xfer"] + if xferRow.BytesRead != 11 || xferRow.BytesWritten != 11 { + t.Fatalf("unexpected /xfer bytes split: %+v", xferRow) + } + if xferRow.AvgLatencyNs != 50 { + t.Fatalf("unexpected /xfer avg latency: %v", xferRow.AvgLatencyNs) + } +} + +func TestFileRankerIgnoresInvalidFileData(t *testing.T) { + r := newFileRankerWithConfig(3) + + r.Add(nil) + r.Add(&event.Pair{}) + r.Add(&event.Pair{File: file.NewFd(1, "", -1), Duration: 10, Bytes: 1, ExitEv: &types.RetEvent{RetType: types.READ_CLASSIFIED}}) + + if got := r.Snapshot(); len(got) != 0 { + t.Fatalf("expected empty snapshot for invalid rows, got %+v", got) + } +} + +func TestFileRankerCompactsHighCardinality(t *testing.T) { + r := newFileRankerWithLimits(3, 5) + + // Make one file clearly hot so it must survive compaction. + for i := 0; i < 10; i++ { + r.Add(newFilePair("/hot", 10, 1, types.READ_CLASSIFIED)) + } + for i := 0; i < 200; i++ { + r.Add(newFilePair(fmt.Sprintf("/tmp/file-%d", i), 1, 1, types.READ_CLASSIFIED)) + } + + if len(r.byPath) > r.maxSeen { + t.Fatalf("cardinality guard failed: byPath=%d maxSeen=%d", len(r.byPath), r.maxSeen) + } + + snap := r.Snapshot() + if len(snap) == 0 || snap[0].Path != "/hot" { + t.Fatalf("expected hot file to remain top-ranked after compaction, got %+v", snap) + } +} + +func newFilePair(path string, duration uint64, bytes uint64, retType uint32) *event.Pair { + return &event.Pair{ + File: file.NewFd(3, path, -1), + Duration: duration, + Bytes: bytes, + ExitEv: &types.RetEvent{RetType: retType}, + } +} + +func paths(rows []FileSnapshot) []string { + out := make([]string, 0, len(rows)) + for _, row := range rows { + out = append(out, row.Path) + } + return out +} |
