diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-12 23:54:44 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-12 23:54:44 +0200 |
| commit | 2e401326d7abf687f2f67537cfe1b7f93d548305 (patch) | |
| tree | 027547b0958d1ef1f236e507ae89dee414af204b /internal/ior.go | |
| parent | 767c2b54779cbf81b68217c6e83868cffb6a0965 (diff) | |
feat: persist parquet recording across TUI restarts
Diffstat (limited to 'internal/ior.go')
| -rw-r--r-- | internal/ior.go | 23 |
1 files changed, 21 insertions, 2 deletions
diff --git a/internal/ior.go b/internal/ior.go index ea44fea..12aab7c 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -20,6 +20,7 @@ import ( "ior/internal/flags" "ior/internal/flamegraph" "ior/internal/globalfilter" + "ior/internal/parquet" "ior/internal/probemanager" "ior/internal/statsengine" "ior/internal/tracepoints" @@ -197,7 +198,11 @@ func tuiTraceStarterFromRunTrace( engine := statsengine.NewEngine(64) streamBuf := streamEventSink(eventstream.NewRingBuffer()) streamSource := eventstream.Source(streamBuf) + streamSeq := eventstream.NewSequencer(0) liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField) + filterEpoch := uint64(0) + var recorderWarningOnce sync.Once + var recorder *parquet.Recorder if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { if persistent := bindings.StreamBuffer(); persistent != nil { streamSource = persistent @@ -207,12 +212,16 @@ func tuiTraceStarterFromRunTrace( return fmt.Errorf("runtime stream source does not support event pushes") } } + if persistentSeq := bindings.StreamSequencer(); persistentSeq != nil { + streamSeq = persistentSeq + } + recorder = bindings.Recorder() + filterEpoch = bindings.FilterEpoch() bindings.SetDashboardSnapshotSource(engine) bindings.SetEventStreamSource(streamSource) bindings.SetLiveTrie(liveTrie) } streamEvents := make(chan eventstream.StreamEvent, appconfig.DefaultChannelBufferSize) - streamSeq := eventstream.NewSequencer(0) go func() { for ev := range streamEvents { @@ -230,8 +239,18 @@ func tuiTraceStarterFromRunTrace( ep.Recycle() return } + row := eventstream.NewStreamEvent(streamSeq.Next(), ep) engine.Ingest(ep) - streamEvents <- eventstream.NewStreamEvent(streamSeq.Next(), ep) + streamEvents <- row + if recorder != nil { + if err := recorder.Record(row, filterEpoch); err != nil { + recorderWarningOnce.Do(func() { + if el.warningCb != nil { + el.warningCb(fmt.Sprintf("Parquet recorder failed: %v", err)) + } + }) + } + } liveTrie.Ingest(ep) // Both downstream consumers snapshot the pair synchronously, so // the pooled pair can be recycled immediately afterwards. |
