diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-12 23:54:44 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-12 23:54:44 +0200 |
| commit | 2e401326d7abf687f2f67537cfe1b7f93d548305 (patch) | |
| tree | 027547b0958d1ef1f236e507ae89dee414af204b /internal/ior_mode_test.go | |
| parent | 767c2b54779cbf81b68217c6e83868cffb6a0965 (diff) | |
feat: persist parquet recording across TUI restarts
Diffstat (limited to 'internal/ior_mode_test.go')
| -rw-r--r-- | internal/ior_mode_test.go | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go index 8fbc79c..08452bb 100644 --- a/internal/ior_mode_test.go +++ b/internal/ior_mode_test.go @@ -5,6 +5,9 @@ import ( "context" "encoding/json" "errors" + "io" + "os" + "path/filepath" "testing" "testing/synctest" "time" @@ -13,8 +16,13 @@ import ( "ior/internal/file" "ior/internal/flags" "ior/internal/globalfilter" + "ior/internal/parquet" "ior/internal/tui" + "ior/internal/tui/eventstream" + flamegraphtui "ior/internal/tui/flamegraph" "ior/internal/types" + + parquetgo "github.com/parquet-go/parquet-go" ) func TestShouldRunTraceMode(t *testing.T) { @@ -552,3 +560,170 @@ func TestTuiTraceStarterFromRunTraceRespectsCancel(t *testing.T) { t.Fatalf("expected context canceled, got %v", err) } } + +func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) { + recorder := parquet.NewRecorder(parquet.RecorderConfig{ + BatchSize: 1, + FlushInterval: time.Hour, + }) + if err := recorder.Start(filepath.Join(t.TempDir(), "trace"), parquet.StartOptions{ + Metadata: parquet.FileMetadata{Mode: "tui"}, + }); err != nil { + t.Fatalf("recorder.Start() error = %v", err) + } + + bindings := &traceRuntimeBindingsStub{ + streamBuffer: eventstream.NewRingBuffer(), + streamSeq: eventstream.NewSequencer(0), + recorder: recorder, + } + base := flags.NewFlags() + base.GlobalFilter = globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "keep"}} + + runs := [][]*event.Pair{ + { + testTracePair(1, "keep"), + testTracePair(99, "drop"), + }, + { + testTracePair(2, "keep"), + }, + } + runIndex := 0 + starter := tuiTraceStarterFromRunTrace( + base, + func(_ context.Context, _ flags.Config, started chan<- struct{}, configure func(*eventLoop)) error { + el := &eventLoop{} + configure(el) + for _, pair := range runs[runIndex] { + el.printCb(pair) + } + runIndex++ + close(started) + return nil + }, + ) + + ctx := tui.ContextWithRuntimeBindings(context.Background(), bindings) + if err := starter(ctx); err != nil { + t.Fatalf("first starter() error = %v", err) + } + waitForStreamRows(t, bindings.streamBuffer, 1) + + bindings.filterEpoch = 1 + if err := starter(ctx); err != nil { + t.Fatalf("second starter() error = %v", err) + } + waitForStreamRows(t, bindings.streamBuffer, 2) + + if err := recorder.Stop(); err != nil { + t.Fatalf("recorder.Stop() error = %v", err) + } + + status := recorder.Status() + if status.LastError != nil { + t.Fatalf("recorder status error = %v, want nil", status.LastError) + } + + got := readRecordedParquet(t, status.Path) + if len(got) != 2 { + t.Fatalf("recorded rows = %d, want 2", len(got)) + } + if got[0].Seq != 1 || got[1].Seq != 2 { + t.Fatalf("recorded seq = %d,%d, want 1,2", got[0].Seq, got[1].Seq) + } + if got[0].FilterEpoch != 0 || got[1].FilterEpoch != 1 { + t.Fatalf("recorded filter epochs = %d,%d, want 0,1", got[0].FilterEpoch, got[1].FilterEpoch) + } + if snapshot := bindings.streamBuffer.Snapshot(); len(snapshot) != 2 { + t.Fatalf("stream buffer rows = %d, want 2", len(snapshot)) + } +} + +type traceRuntimeBindingsStub struct { + streamBuffer *eventstream.RingBuffer + streamSource eventstream.Source + streamSeq *eventstream.Sequencer + recorder *parquet.Recorder + filterEpoch uint64 +} + +func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(tui.SnapshotSource) {} + +func (b *traceRuntimeBindingsStub) SetEventStreamSource(source eventstream.Source) { + b.streamSource = source +} + +func (b *traceRuntimeBindingsStub) SetLiveTrie(flamegraphtui.LiveTrieSource) {} + +func (b *traceRuntimeBindingsStub) SetProbeManager(tui.ProbeManager) {} + +func (b *traceRuntimeBindingsStub) StreamBuffer() eventstream.Source { + return b.streamBuffer +} + +func (b *traceRuntimeBindingsStub) Recorder() *parquet.Recorder { + return b.recorder +} + +func (b *traceRuntimeBindingsStub) StreamSequencer() *eventstream.Sequencer { + return b.streamSeq +} + +func (b *traceRuntimeBindingsStub) FilterEpoch() uint64 { + return b.filterEpoch +} + +func testTracePair(seq uint64, comm string) *event.Pair { + enter := &types.OpenEvent{TraceId: types.SYS_ENTER_OPENAT, Time: seq * 10, Pid: 42, Tid: 84} + exit := &types.RetEvent{TraceId: types.SYS_EXIT_OPENAT, Time: seq*10 + 1, Ret: int64(seq), Pid: 42, Tid: 84} + pair := event.NewPair(enter) + pair.ExitEv = exit + pair.File = file.NewFd(int32(seq), "/tmp/test", 0) + pair.Comm = comm + pair.Duration = seq + pair.DurationToPrev = seq + 1 + pair.Bytes = seq + 2 + return pair +} + +func waitForStreamRows(t *testing.T, buffer *eventstream.RingBuffer, want int) { + t.Helper() + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if buffer.Len() == want { + return + } + time.Sleep(time.Millisecond) + } + t.Fatalf("stream buffer len = %d, want %d", buffer.Len(), want) +} + +func readRecordedParquet(t *testing.T, path string) []parquet.Record { + t.Helper() + + f, err := os.Open(path) + if err != nil { + t.Fatalf("open parquet %q: %v", path, err) + } + defer f.Close() + + reader := parquetgo.NewGenericReader[parquet.Record](f) + defer reader.Close() + + var rows []parquet.Record + buf := make([]parquet.Record, 4) + for { + n, err := reader.Read(buf) + if n > 0 { + rows = append(rows, buf[:n]...) + } + if err == nil { + continue + } + if err == io.EOF { + return rows + } + t.Fatalf("read parquet rows: %v", err) + } +} |
