diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-20 14:22:43 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-20 14:22:43 +0300 |
| commit | f063e626a28339735da583142e5af864a60c2111 (patch) | |
| tree | f5b541d3021cab5a72aeb8a1aee7ef247313ac46 /internal | |
| parent | 13d2c3ec8deb759308a8f0a28374470ca9bb1e9d (diff) | |
test(task-17): assert aggregate-only stats surfacing path
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/ior_mode_test.go | 103 |
1 files changed, 95 insertions, 8 deletions
diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go index 8a152cc..bd0adf1 100644 --- a/internal/ior_mode_test.go +++ b/internal/ior_mode_test.go @@ -19,6 +19,7 @@ import ( "ior/internal/globalfilter" "ior/internal/parquet" "ior/internal/runtime" + "ior/internal/statsengine" "ior/internal/streamrow" "ior/internal/types" @@ -886,15 +887,95 @@ func TestTuiTraceStarterAppliesLiveFilterSwapInPlace(t *testing.T) { } } +func TestTuiTraceStarterSurfacesAggregateOnlySyscallInSnapshot(t *testing.T) { + bindings := &traceRuntimeBindingsStub{ + streamBuffer: streamrow.NewRingBuffer(), + streamSeq: streamrow.NewSequencer(0), + } + base := flags.NewFlags() + base.SyscallSamplingRates["openat"] = 0 + + starter := tuiTraceStarterFromRunTrace( + base, + func(_ context.Context, _ flags.Config, started chan<- struct{}, configure func(*eventLoop)) error { + el := &eventLoop{} + configure(el) + if el.aggregateSink == nil { + return errors.New("aggregate sink not wired") + } + + // Simulate a sampled ring-buffer path where openat=0 suppresses + // openat pair rows but aggregate drains still report openat counts. + el.printCb(testTracePairWithTraceIDs(1, "ioworkload", types.SYS_ENTER_CLOSE, types.SYS_EXIT_CLOSE)) + el.aggregateSink.IngestSyscallAggregates([]statsengine.SyscallAggregate{ + { + TraceID: types.SYS_ENTER_OPENAT, + Count: 7, + TotalLatencyNs: 70, + MinLatencyNs: 10, + MaxLatencyNs: 10, + }, + }) + close(started) + return nil + }, + ) + + ctx := runtime.ContextWithRuntimeBindings(context.Background(), bindings) + if err := starter(ctx); err != nil { + t.Fatalf("starter() error = %v", err) + } + + waitForStreamRows(t, bindings.streamBuffer, 1) + rows := bindings.streamBuffer.Snapshot() + if rows[0].Syscall != "close" { + t.Fatalf("stream syscall = %q, want close", rows[0].Syscall) + } + for _, row := range rows { + if row.Syscall == "openat" { + t.Fatalf("did not expect openat in stream rows under aggregate-only path") + } + } + + if bindings.snapshotSource == nil { + t.Fatalf("expected dashboard snapshot source to be wired") + } + snap, err := bindings.snapshotSource.Snapshot() + if err != nil { + t.Fatalf("snapshot() error = %v", err) + } + if snap == nil { + t.Fatalf("expected non-nil snapshot") + } + + openatCount := uint64(0) + closeCount := uint64(0) + for _, row := range snap.Syscalls() { + switch row.TraceID { + case types.SYS_ENTER_OPENAT: + openatCount = row.Count + case types.SYS_ENTER_CLOSE: + closeCount = row.Count + } + } + if openatCount != 7 { + t.Fatalf("snapshot openat count = %d, want 7 from aggregate ingest", openatCount) + } + if closeCount != 1 { + t.Fatalf("snapshot close count = %d, want 1 from stream pair", closeCount) + } +} + // traceRuntimeBindingsStub is a test double for runtime.TraceRuntimeBindings // that records injected stream sources and exposes the live-filter setter for // assertions. type traceRuntimeBindingsStub struct { - streamBuffer *streamrow.RingBuffer - streamSource runtime.StreamSource - streamSeq *streamrow.Sequencer - recorder *parquet.Recorder - filterEpoch uint64 + streamBuffer *streamrow.RingBuffer + streamSource runtime.StreamSource + snapshotSource runtime.SnapshotSource + streamSeq *streamrow.Sequencer + recorder *parquet.Recorder + filterEpoch uint64 // mu guards liveFilterSetter, which is mutated from the trace-starter // goroutine (via SetLiveFilterSetter) and read from the test goroutine // when invoking the in-place swap. @@ -902,7 +983,9 @@ type traceRuntimeBindingsStub struct { liveFilterSetter func(globalfilter.Filter) } -func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(runtime.SnapshotSource) {} +func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(source runtime.SnapshotSource) { + b.snapshotSource = source +} func (b *traceRuntimeBindingsStub) SetEventStreamSource(source runtime.StreamSource) { b.streamSource = source @@ -941,8 +1024,12 @@ func (b *traceRuntimeBindingsStub) FilterEpoch() uint64 { } func testTracePair(seq uint64, comm string) *event.Pair { - enter := &types.OpenEvent{TraceId: types.SYS_ENTER_OPENAT, Time: seq * 10, Pid: 42, Tid: 84} - exit := &types.RetEvent{TraceId: types.SYS_EXIT_OPENAT, Time: seq*10 + 1, Ret: int64(seq), Pid: 42, Tid: 84} + return testTracePairWithTraceIDs(seq, comm, types.SYS_ENTER_OPENAT, types.SYS_EXIT_OPENAT) +} + +func testTracePairWithTraceIDs(seq uint64, comm string, enterID types.TraceId, exitID types.TraceId) *event.Pair { + enter := &types.OpenEvent{TraceId: enterID, Time: seq * 10, Pid: 42, Tid: 84} + exit := &types.RetEvent{TraceId: exitID, Time: seq*10 + 1, Ret: int64(seq), Pid: 42, Tid: 84} pair := event.NewPair(enter) pair.ExitEv = exit pair.File = file.NewFd(int32(seq), "/tmp/test", 0) |
