summaryrefslogtreecommitdiff
path: root/internal/ior_mode_test.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-12 23:54:44 +0200
committerPaul Buetow <paul@buetow.org>2026-03-12 23:54:44 +0200
commit2e401326d7abf687f2f67537cfe1b7f93d548305 (patch)
tree027547b0958d1ef1f236e507ae89dee414af204b /internal/ior_mode_test.go
parent767c2b54779cbf81b68217c6e83868cffb6a0965 (diff)
feat: persist parquet recording across TUI restarts
Diffstat (limited to 'internal/ior_mode_test.go')
-rw-r--r--internal/ior_mode_test.go175
1 files changed, 175 insertions, 0 deletions
diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go
index 8fbc79c..08452bb 100644
--- a/internal/ior_mode_test.go
+++ b/internal/ior_mode_test.go
@@ -5,6 +5,9 @@ import (
"context"
"encoding/json"
"errors"
+ "io"
+ "os"
+ "path/filepath"
"testing"
"testing/synctest"
"time"
@@ -13,8 +16,13 @@ import (
"ior/internal/file"
"ior/internal/flags"
"ior/internal/globalfilter"
+ "ior/internal/parquet"
"ior/internal/tui"
+ "ior/internal/tui/eventstream"
+ flamegraphtui "ior/internal/tui/flamegraph"
"ior/internal/types"
+
+ parquetgo "github.com/parquet-go/parquet-go"
)
func TestShouldRunTraceMode(t *testing.T) {
@@ -552,3 +560,170 @@ func TestTuiTraceStarterFromRunTraceRespectsCancel(t *testing.T) {
t.Fatalf("expected context canceled, got %v", err)
}
}
+
+func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) {
+ recorder := parquet.NewRecorder(parquet.RecorderConfig{
+ BatchSize: 1,
+ FlushInterval: time.Hour,
+ })
+ if err := recorder.Start(filepath.Join(t.TempDir(), "trace"), parquet.StartOptions{
+ Metadata: parquet.FileMetadata{Mode: "tui"},
+ }); err != nil {
+ t.Fatalf("recorder.Start() error = %v", err)
+ }
+
+ bindings := &traceRuntimeBindingsStub{
+ streamBuffer: eventstream.NewRingBuffer(),
+ streamSeq: eventstream.NewSequencer(0),
+ recorder: recorder,
+ }
+ base := flags.NewFlags()
+ base.GlobalFilter = globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "keep"}}
+
+ runs := [][]*event.Pair{
+ {
+ testTracePair(1, "keep"),
+ testTracePair(99, "drop"),
+ },
+ {
+ testTracePair(2, "keep"),
+ },
+ }
+ runIndex := 0
+ starter := tuiTraceStarterFromRunTrace(
+ base,
+ func(_ context.Context, _ flags.Config, started chan<- struct{}, configure func(*eventLoop)) error {
+ el := &eventLoop{}
+ configure(el)
+ for _, pair := range runs[runIndex] {
+ el.printCb(pair)
+ }
+ runIndex++
+ close(started)
+ return nil
+ },
+ )
+
+ ctx := tui.ContextWithRuntimeBindings(context.Background(), bindings)
+ if err := starter(ctx); err != nil {
+ t.Fatalf("first starter() error = %v", err)
+ }
+ waitForStreamRows(t, bindings.streamBuffer, 1)
+
+ bindings.filterEpoch = 1
+ if err := starter(ctx); err != nil {
+ t.Fatalf("second starter() error = %v", err)
+ }
+ waitForStreamRows(t, bindings.streamBuffer, 2)
+
+ if err := recorder.Stop(); err != nil {
+ t.Fatalf("recorder.Stop() error = %v", err)
+ }
+
+ status := recorder.Status()
+ if status.LastError != nil {
+ t.Fatalf("recorder status error = %v, want nil", status.LastError)
+ }
+
+ got := readRecordedParquet(t, status.Path)
+ if len(got) != 2 {
+ t.Fatalf("recorded rows = %d, want 2", len(got))
+ }
+ if got[0].Seq != 1 || got[1].Seq != 2 {
+ t.Fatalf("recorded seq = %d,%d, want 1,2", got[0].Seq, got[1].Seq)
+ }
+ if got[0].FilterEpoch != 0 || got[1].FilterEpoch != 1 {
+ t.Fatalf("recorded filter epochs = %d,%d, want 0,1", got[0].FilterEpoch, got[1].FilterEpoch)
+ }
+ if snapshot := bindings.streamBuffer.Snapshot(); len(snapshot) != 2 {
+ t.Fatalf("stream buffer rows = %d, want 2", len(snapshot))
+ }
+}
+
+type traceRuntimeBindingsStub struct {
+ streamBuffer *eventstream.RingBuffer
+ streamSource eventstream.Source
+ streamSeq *eventstream.Sequencer
+ recorder *parquet.Recorder
+ filterEpoch uint64
+}
+
+func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(tui.SnapshotSource) {}
+
+func (b *traceRuntimeBindingsStub) SetEventStreamSource(source eventstream.Source) {
+ b.streamSource = source
+}
+
+func (b *traceRuntimeBindingsStub) SetLiveTrie(flamegraphtui.LiveTrieSource) {}
+
+func (b *traceRuntimeBindingsStub) SetProbeManager(tui.ProbeManager) {}
+
+func (b *traceRuntimeBindingsStub) StreamBuffer() eventstream.Source {
+ return b.streamBuffer
+}
+
+func (b *traceRuntimeBindingsStub) Recorder() *parquet.Recorder {
+ return b.recorder
+}
+
+func (b *traceRuntimeBindingsStub) StreamSequencer() *eventstream.Sequencer {
+ return b.streamSeq
+}
+
+func (b *traceRuntimeBindingsStub) FilterEpoch() uint64 {
+ return b.filterEpoch
+}
+
+func testTracePair(seq uint64, comm string) *event.Pair {
+ enter := &types.OpenEvent{TraceId: types.SYS_ENTER_OPENAT, Time: seq * 10, Pid: 42, Tid: 84}
+ exit := &types.RetEvent{TraceId: types.SYS_EXIT_OPENAT, Time: seq*10 + 1, Ret: int64(seq), Pid: 42, Tid: 84}
+ pair := event.NewPair(enter)
+ pair.ExitEv = exit
+ pair.File = file.NewFd(int32(seq), "/tmp/test", 0)
+ pair.Comm = comm
+ pair.Duration = seq
+ pair.DurationToPrev = seq + 1
+ pair.Bytes = seq + 2
+ return pair
+}
+
+func waitForStreamRows(t *testing.T, buffer *eventstream.RingBuffer, want int) {
+ t.Helper()
+ deadline := time.Now().Add(2 * time.Second)
+ for time.Now().Before(deadline) {
+ if buffer.Len() == want {
+ return
+ }
+ time.Sleep(time.Millisecond)
+ }
+ t.Fatalf("stream buffer len = %d, want %d", buffer.Len(), want)
+}
+
+func readRecordedParquet(t *testing.T, path string) []parquet.Record {
+ t.Helper()
+
+ f, err := os.Open(path)
+ if err != nil {
+ t.Fatalf("open parquet %q: %v", path, err)
+ }
+ defer f.Close()
+
+ reader := parquetgo.NewGenericReader[parquet.Record](f)
+ defer reader.Close()
+
+ var rows []parquet.Record
+ buf := make([]parquet.Record, 4)
+ for {
+ n, err := reader.Read(buf)
+ if n > 0 {
+ rows = append(rows, buf[:n]...)
+ }
+ if err == nil {
+ continue
+ }
+ if err == io.EOF {
+ return rows
+ }
+ t.Fatalf("read parquet rows: %v", err)
+ }
+}