diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-12 00:07:43 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-12 00:07:43 +0300 |
| commit | b0b7a4278f5f4a5c4d0e7dc159e826f8c542237f (patch) | |
| tree | 3f010d4369b8603ff28381974bc6e914ad9190d4 | |
| parent | b5bc739c96195d059f5120c61b294f2f33eeada0 (diff) | |
refactor tuiTraceStarterFromRunTrace and runTraceWithContext to comply with 50-line limit
Extract buildTUIRuntime and makeTUIEventLoopConfigurer from the monolithic
tuiTraceStarterFromRunTrace closure, and extract maybePrependFlamegraphConfigure
and finaliseTrace from runTraceWithContext. All functions are now <= 50 lines.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| -rw-r--r-- | internal/ior.go | 221 |
1 files changed, 132 insertions, 89 deletions
diff --git a/internal/ior.go b/internal/ior.go index 354267b..3341a82 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -183,88 +183,121 @@ func shouldRunTraceMode(cfg flags.Config) bool { return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg) } +// tuiRuntime holds all the per-restart state that the TUI trace starter +// allocates and wires into the runtime bindings before each trace goroutine. +type tuiRuntime struct { + engine *statsengine.Engine + streamBuf streamEventSink + streamSrc eventstream.Source + streamSeq *eventstream.Sequencer + liveTrie *flamegraph.LiveTrie + recorder *parquet.Recorder + filterEpoch uint64 +} + +// buildTUIRuntime allocates fresh trace-session state and, when persistent +// runtime bindings exist in ctx, wires them in (reusing the existing stream +// buffer / sequencer and reading the parquet recorder and filter epoch). +func buildTUIRuntime(ctx context.Context, cfg flags.Config) (*tuiRuntime, error) { + rt := &tuiRuntime{ + engine: statsengine.NewEngine(64), + streamSeq: eventstream.NewSequencer(0), + liveTrie: flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField), + } + buf := streamEventSink(eventstream.NewRingBuffer()) + rt.streamBuf = buf + rt.streamSrc = eventstream.Source(buf) + + if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + if persistent := bindings.StreamBuffer(); persistent != nil { + rt.streamSrc = persistent + sink, ok := persistent.(streamEventSink) + if !ok { + return nil, fmt.Errorf("runtime stream source does not support event pushes") + } + rt.streamBuf = sink + } + if persistentSeq := bindings.StreamSequencer(); persistentSeq != nil { + rt.streamSeq = persistentSeq + } + rt.recorder = bindings.Recorder() + rt.filterEpoch = bindings.FilterEpoch() + bindings.SetDashboardSnapshotSource(rt.engine) + bindings.SetEventStreamSource(rt.streamSrc) + bindings.SetLiveTrie(rt.liveTrie) + } + return rt, nil +} + +// makeTUIEventLoopConfigurer returns the func(*eventLoop) callback that wires +// the event loop into the TUI runtime: it sets the initial filter, installs +// the print callback that fans out to engine/stream/trie, and registers the +// live-filter setter so the TUI can swap filters without restarting BPF probes. +func makeTUIEventLoopConfigurer(ctx context.Context, cfg flags.Config, rt *tuiRuntime) func(*eventLoop) { + var recorderWarningOnce sync.Once + return func(el *eventLoop) { + // Seed the event loop's filter from config so subsequent reads via + // el.Filter() see the same filter the trace was started with. + el.SetFilter(cfg.GlobalFilter) + el.printCb = func(ep *event.Pair) { + if !shouldIngestTracePair(el.Filter(), ep) { + ep.Recycle() + return + } + row := eventstream.NewStreamEvent(rt.streamSeq.Next(), ep) + rt.engine.Ingest(ep) + rt.streamBuf.Push(row) + if rt.recorder != nil { + if err := rt.recorder.Record(row, rt.filterEpoch); err != nil { + recorderWarningOnce.Do(func() { + if el.warningCb != nil { + el.warningCb(fmt.Sprintf("Parquet recorder failed: %v", err)) + } + }) + } + } + rt.liveTrie.Ingest(ep) + // Both downstream consumers snapshot the pair synchronously, so + // the pooled pair can be recycled immediately afterwards. + ep.Recycle() + } + el.warningCb = func(message string) { + rt.streamBuf.Push(eventstream.NewWarningEvent(rt.streamSeq.Next(), message)) + } + if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + bindings.SetLiveFilterSetter(el.SetFilter) + } + } +} + +// tuiTraceStarterFromRunTrace returns a tui.TraceStarter that drives a full +// BPF trace session from within the TUI lifecycle. It allocates per-restart +// state via buildTUIRuntime, wires the event loop via makeTUIEventLoopConfigurer, +// and starts the trace in a goroutine, signalling the TUI once BPF probes are +// attached (via startedCh) or returning an error if startup fails. func tuiTraceStarterFromRunTrace( baseCfg flags.Config, startTrace func(context.Context, flags.Config, chan<- struct{}, func(*eventLoop)) error, ) tui.TraceStarter { return func(ctx context.Context) error { - bpf.SetLoggerCbs(bpf.Callbacks{ - Log: func(int, string) {}, - }) + bpf.SetLoggerCbs(bpf.Callbacks{Log: func(int, string) {}}) cfg := baseCfg if filter, ok := tui.TraceFiltersFromContext(ctx); ok { cfg.GlobalFilter = filter.Clone() applyTraceScopeFromGlobalFilter(&cfg, filter) } - engine := statsengine.NewEngine(64) - streamBuf := streamEventSink(eventstream.NewRingBuffer()) - streamSource := eventstream.Source(streamBuf) - streamSeq := eventstream.NewSequencer(0) - liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField) - filterEpoch := uint64(0) - var recorderWarningOnce sync.Once - var recorder *parquet.Recorder - if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { - if persistent := bindings.StreamBuffer(); persistent != nil { - streamSource = persistent - if sink, ok := persistent.(streamEventSink); ok { - streamBuf = sink - } else { - return fmt.Errorf("runtime stream source does not support event pushes") - } - } - if persistentSeq := bindings.StreamSequencer(); persistentSeq != nil { - streamSeq = persistentSeq - } - recorder = bindings.Recorder() - filterEpoch = bindings.FilterEpoch() - bindings.SetDashboardSnapshotSource(engine) - bindings.SetEventStreamSource(streamSource) - bindings.SetLiveTrie(liveTrie) + + rt, err := buildTUIRuntime(ctx, cfg) + if err != nil { + return err } + configureEl := makeTUIEventLoopConfigurer(ctx, cfg, rt) + startedCh := make(chan struct{}) errCh := make(chan error, 1) - go func() { - err := startTrace(ctx, cfg, startedCh, func(el *eventLoop) { - // Seed the eventloop's filter from the config so subsequent - // reads via el.Filter() see the same filter the trace was - // started with. The TUI can later replace it in place via - // runtimeBindings.applyLiveFilter, which calls el.SetFilter. - el.SetFilter(cfg.GlobalFilter) - // Read the live filter on each event so the TUI can swap - // filters in place via runtimeBindings.applyLiveFilter - // without restarting the BPF probes. - el.printCb = func(ep *event.Pair) { - if !shouldIngestTracePair(el.Filter(), ep) { - ep.Recycle() - return - } - row := eventstream.NewStreamEvent(streamSeq.Next(), ep) - engine.Ingest(ep) - streamBuf.Push(row) - if recorder != nil { - if err := recorder.Record(row, filterEpoch); err != nil { - recorderWarningOnce.Do(func() { - if el.warningCb != nil { - el.warningCb(fmt.Sprintf("Parquet recorder failed: %v", err)) - } - }) - } - } - liveTrie.Ingest(ep) - // Both downstream consumers snapshot the pair synchronously, so - // the pooled pair can be recycled immediately afterwards. - ep.Recycle() - } - el.warningCb = func(message string) { - streamBuf.Push(eventstream.NewWarningEvent(streamSeq.Next(), message)) - } - if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { - bindings.SetLiveFilterSetter(el.SetFilter) - } - }) + err := startTrace(ctx, cfg, startedCh, configureEl) if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { bindings.SetLiveFilterSetter(nil) } @@ -387,6 +420,36 @@ func startTraceShutdownWatcher(ctx context.Context, verbose bool, el *eventLoop, }() } +// maybePrependFlamegraphConfigure wraps configure so that, when flamegraph +// output is requested, each event pair is also forwarded to the recorder. +// Returns the (possibly wrapped) configure func and the recorder (or nil). +func maybePrependFlamegraphConfigure(cfg flags.Config, configure func(*eventLoop)) (func(*eventLoop), *flamegraph.Recorder) { + if !cfg.FlamegraphOutput { + return configure, nil + } + recorder := flamegraph.NewRecorder(cfg.OutputName) + recordOutput := func(el *eventLoop) { + el.printCb = func(ep *event.Pair) { + recorder.AddPair(ep) + ep.Recycle() + } + } + return chainEventLoopConfigure(recordOutput, configure), recorder +} + +// finaliseTrace waits for profiling to finish, flushes the flamegraph recorder +// if one was created, and logs the total run duration. +func finaliseTrace(recorder *flamegraph.Recorder, profiling *profilingControl, totalDuration time.Duration, logln func(...any)) error { + <-profiling.done + if recorder != nil { + if err := recorder.Write(); err != nil { + return err + } + } + logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration) + return nil +} + func runTraceWithContext(parentCtx context.Context, cfg flags.Config, started chan<- struct{}, configure func(*eventLoop)) error { if getEUID() != 0 { return errRootPrivilegesRequired @@ -394,10 +457,7 @@ func runTraceWithContext(parentCtx context.Context, cfg flags.Config, started ch verbose := started == nil logln := newLogger(verbose) - var recorder *flamegraph.Recorder - if cfg.FlamegraphOutput { - recorder = flamegraph.NewRecorder(cfg.OutputName) - } + configure, recorder := maybePrependFlamegraphConfigure(cfg, configure) bpfModule, mgr, releaseBindings, err := setupBPFModule(parentCtx, cfg) if err != nil { @@ -431,29 +491,12 @@ func runTraceWithContext(parentCtx context.Context, cfg flags.Config, started ch if err != nil { return err } - if recorder != nil { - recordOutput := func(el *eventLoop) { - el.printCb = func(ep *event.Pair) { - recorder.AddPair(ep) - ep.Recycle() - } - } - configure = chainEventLoopConfigure(recordOutput, configure) - } configureEventLoopOutput(el, mgr, configure) startTraceShutdownWatcher(ctx, verbose, el, profiling, logln) startTime := time.Now() el.run(ctx, ch) - totalDuration := time.Since(startTime) - <-profiling.done - if recorder != nil { - if err := recorder.Write(); err != nil { - return err - } - } - logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration) - return nil + return finaliseTrace(recorder, profiling, time.Since(startTime), logln) } func chainEventLoopConfigure(fns ...func(*eventLoop)) func(*eventLoop) { |
