summaryrefslogtreecommitdiff
path: root/internal/ior.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-12 23:54:44 +0200
committerPaul Buetow <paul@buetow.org>2026-03-12 23:54:44 +0200
commit2e401326d7abf687f2f67537cfe1b7f93d548305 (patch)
tree027547b0958d1ef1f236e507ae89dee414af204b /internal/ior.go
parent767c2b54779cbf81b68217c6e83868cffb6a0965 (diff)
feat: persist parquet recording across TUI restarts
Diffstat (limited to 'internal/ior.go')
-rw-r--r--internal/ior.go23
1 files changed, 21 insertions, 2 deletions
diff --git a/internal/ior.go b/internal/ior.go
index ea44fea..12aab7c 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -20,6 +20,7 @@ import (
"ior/internal/flags"
"ior/internal/flamegraph"
"ior/internal/globalfilter"
+ "ior/internal/parquet"
"ior/internal/probemanager"
"ior/internal/statsengine"
"ior/internal/tracepoints"
@@ -197,7 +198,11 @@ func tuiTraceStarterFromRunTrace(
engine := statsengine.NewEngine(64)
streamBuf := streamEventSink(eventstream.NewRingBuffer())
streamSource := eventstream.Source(streamBuf)
+ streamSeq := eventstream.NewSequencer(0)
liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField)
+ filterEpoch := uint64(0)
+ var recorderWarningOnce sync.Once
+ var recorder *parquet.Recorder
if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok {
if persistent := bindings.StreamBuffer(); persistent != nil {
streamSource = persistent
@@ -207,12 +212,16 @@ func tuiTraceStarterFromRunTrace(
return fmt.Errorf("runtime stream source does not support event pushes")
}
}
+ if persistentSeq := bindings.StreamSequencer(); persistentSeq != nil {
+ streamSeq = persistentSeq
+ }
+ recorder = bindings.Recorder()
+ filterEpoch = bindings.FilterEpoch()
bindings.SetDashboardSnapshotSource(engine)
bindings.SetEventStreamSource(streamSource)
bindings.SetLiveTrie(liveTrie)
}
streamEvents := make(chan eventstream.StreamEvent, appconfig.DefaultChannelBufferSize)
- streamSeq := eventstream.NewSequencer(0)
go func() {
for ev := range streamEvents {
@@ -230,8 +239,18 @@ func tuiTraceStarterFromRunTrace(
ep.Recycle()
return
}
+ row := eventstream.NewStreamEvent(streamSeq.Next(), ep)
engine.Ingest(ep)
- streamEvents <- eventstream.NewStreamEvent(streamSeq.Next(), ep)
+ streamEvents <- row
+ if recorder != nil {
+ if err := recorder.Record(row, filterEpoch); err != nil {
+ recorderWarningOnce.Do(func() {
+ if el.warningCb != nil {
+ el.warningCb(fmt.Sprintf("Parquet recorder failed: %v", err))
+ }
+ })
+ }
+ }
liveTrie.Ingest(ep)
// Both downstream consumers snapshot the pair synchronously, so
// the pooled pair can be recycled immediately afterwards.