summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/ior.go221
1 files changed, 132 insertions, 89 deletions
diff --git a/internal/ior.go b/internal/ior.go
index 354267b..3341a82 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -183,88 +183,121 @@ func shouldRunTraceMode(cfg flags.Config) bool {
return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg)
}
+// tuiRuntime holds all the per-restart state that the TUI trace starter
+// allocates and wires into the runtime bindings before each trace goroutine.
+type tuiRuntime struct {
+ engine *statsengine.Engine
+ streamBuf streamEventSink
+ streamSrc eventstream.Source
+ streamSeq *eventstream.Sequencer
+ liveTrie *flamegraph.LiveTrie
+ recorder *parquet.Recorder
+ filterEpoch uint64
+}
+
+// buildTUIRuntime allocates fresh trace-session state and, when persistent
+// runtime bindings exist in ctx, wires them in (reusing the existing stream
+// buffer / sequencer and reading the parquet recorder and filter epoch).
+func buildTUIRuntime(ctx context.Context, cfg flags.Config) (*tuiRuntime, error) {
+ rt := &tuiRuntime{
+ engine: statsengine.NewEngine(64),
+ streamSeq: eventstream.NewSequencer(0),
+ liveTrie: flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField),
+ }
+ buf := streamEventSink(eventstream.NewRingBuffer())
+ rt.streamBuf = buf
+ rt.streamSrc = eventstream.Source(buf)
+
+ if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok {
+ if persistent := bindings.StreamBuffer(); persistent != nil {
+ rt.streamSrc = persistent
+ sink, ok := persistent.(streamEventSink)
+ if !ok {
+ return nil, fmt.Errorf("runtime stream source does not support event pushes")
+ }
+ rt.streamBuf = sink
+ }
+ if persistentSeq := bindings.StreamSequencer(); persistentSeq != nil {
+ rt.streamSeq = persistentSeq
+ }
+ rt.recorder = bindings.Recorder()
+ rt.filterEpoch = bindings.FilterEpoch()
+ bindings.SetDashboardSnapshotSource(rt.engine)
+ bindings.SetEventStreamSource(rt.streamSrc)
+ bindings.SetLiveTrie(rt.liveTrie)
+ }
+ return rt, nil
+}
+
+// makeTUIEventLoopConfigurer returns the func(*eventLoop) callback that wires
+// the event loop into the TUI runtime: it sets the initial filter, installs
+// the print callback that fans out to engine/stream/trie, and registers the
+// live-filter setter so the TUI can swap filters without restarting BPF probes.
+func makeTUIEventLoopConfigurer(ctx context.Context, cfg flags.Config, rt *tuiRuntime) func(*eventLoop) {
+ var recorderWarningOnce sync.Once
+ return func(el *eventLoop) {
+ // Seed the event loop's filter from config so subsequent reads via
+ // el.Filter() see the same filter the trace was started with.
+ el.SetFilter(cfg.GlobalFilter)
+ el.printCb = func(ep *event.Pair) {
+ if !shouldIngestTracePair(el.Filter(), ep) {
+ ep.Recycle()
+ return
+ }
+ row := eventstream.NewStreamEvent(rt.streamSeq.Next(), ep)
+ rt.engine.Ingest(ep)
+ rt.streamBuf.Push(row)
+ if rt.recorder != nil {
+ if err := rt.recorder.Record(row, rt.filterEpoch); err != nil {
+ recorderWarningOnce.Do(func() {
+ if el.warningCb != nil {
+ el.warningCb(fmt.Sprintf("Parquet recorder failed: %v", err))
+ }
+ })
+ }
+ }
+ rt.liveTrie.Ingest(ep)
+ // Both downstream consumers snapshot the pair synchronously, so
+ // the pooled pair can be recycled immediately afterwards.
+ ep.Recycle()
+ }
+ el.warningCb = func(message string) {
+ rt.streamBuf.Push(eventstream.NewWarningEvent(rt.streamSeq.Next(), message))
+ }
+ if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok {
+ bindings.SetLiveFilterSetter(el.SetFilter)
+ }
+ }
+}
+
+// tuiTraceStarterFromRunTrace returns a tui.TraceStarter that drives a full
+// BPF trace session from within the TUI lifecycle. It allocates per-restart
+// state via buildTUIRuntime, wires the event loop via makeTUIEventLoopConfigurer,
+// and starts the trace in a goroutine, signalling the TUI once BPF probes are
+// attached (via startedCh) or returning an error if startup fails.
func tuiTraceStarterFromRunTrace(
baseCfg flags.Config,
startTrace func(context.Context, flags.Config, chan<- struct{}, func(*eventLoop)) error,
) tui.TraceStarter {
return func(ctx context.Context) error {
- bpf.SetLoggerCbs(bpf.Callbacks{
- Log: func(int, string) {},
- })
+ bpf.SetLoggerCbs(bpf.Callbacks{Log: func(int, string) {}})
cfg := baseCfg
if filter, ok := tui.TraceFiltersFromContext(ctx); ok {
cfg.GlobalFilter = filter.Clone()
applyTraceScopeFromGlobalFilter(&cfg, filter)
}
- engine := statsengine.NewEngine(64)
- streamBuf := streamEventSink(eventstream.NewRingBuffer())
- streamSource := eventstream.Source(streamBuf)
- streamSeq := eventstream.NewSequencer(0)
- liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField)
- filterEpoch := uint64(0)
- var recorderWarningOnce sync.Once
- var recorder *parquet.Recorder
- if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok {
- if persistent := bindings.StreamBuffer(); persistent != nil {
- streamSource = persistent
- if sink, ok := persistent.(streamEventSink); ok {
- streamBuf = sink
- } else {
- return fmt.Errorf("runtime stream source does not support event pushes")
- }
- }
- if persistentSeq := bindings.StreamSequencer(); persistentSeq != nil {
- streamSeq = persistentSeq
- }
- recorder = bindings.Recorder()
- filterEpoch = bindings.FilterEpoch()
- bindings.SetDashboardSnapshotSource(engine)
- bindings.SetEventStreamSource(streamSource)
- bindings.SetLiveTrie(liveTrie)
+
+ rt, err := buildTUIRuntime(ctx, cfg)
+ if err != nil {
+ return err
}
+ configureEl := makeTUIEventLoopConfigurer(ctx, cfg, rt)
+
startedCh := make(chan struct{})
errCh := make(chan error, 1)
-
go func() {
- err := startTrace(ctx, cfg, startedCh, func(el *eventLoop) {
- // Seed the eventloop's filter from the config so subsequent
- // reads via el.Filter() see the same filter the trace was
- // started with. The TUI can later replace it in place via
- // runtimeBindings.applyLiveFilter, which calls el.SetFilter.
- el.SetFilter(cfg.GlobalFilter)
- // Read the live filter on each event so the TUI can swap
- // filters in place via runtimeBindings.applyLiveFilter
- // without restarting the BPF probes.
- el.printCb = func(ep *event.Pair) {
- if !shouldIngestTracePair(el.Filter(), ep) {
- ep.Recycle()
- return
- }
- row := eventstream.NewStreamEvent(streamSeq.Next(), ep)
- engine.Ingest(ep)
- streamBuf.Push(row)
- if recorder != nil {
- if err := recorder.Record(row, filterEpoch); err != nil {
- recorderWarningOnce.Do(func() {
- if el.warningCb != nil {
- el.warningCb(fmt.Sprintf("Parquet recorder failed: %v", err))
- }
- })
- }
- }
- liveTrie.Ingest(ep)
- // Both downstream consumers snapshot the pair synchronously, so
- // the pooled pair can be recycled immediately afterwards.
- ep.Recycle()
- }
- el.warningCb = func(message string) {
- streamBuf.Push(eventstream.NewWarningEvent(streamSeq.Next(), message))
- }
- if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok {
- bindings.SetLiveFilterSetter(el.SetFilter)
- }
- })
+ err := startTrace(ctx, cfg, startedCh, configureEl)
if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok {
bindings.SetLiveFilterSetter(nil)
}
@@ -387,6 +420,36 @@ func startTraceShutdownWatcher(ctx context.Context, verbose bool, el *eventLoop,
}()
}
+// maybePrependFlamegraphConfigure wraps configure so that, when flamegraph
+// output is requested, each event pair is also forwarded to the recorder.
+// Returns the (possibly wrapped) configure func and the recorder (or nil).
+func maybePrependFlamegraphConfigure(cfg flags.Config, configure func(*eventLoop)) (func(*eventLoop), *flamegraph.Recorder) {
+ if !cfg.FlamegraphOutput {
+ return configure, nil
+ }
+ recorder := flamegraph.NewRecorder(cfg.OutputName)
+ recordOutput := func(el *eventLoop) {
+ el.printCb = func(ep *event.Pair) {
+ recorder.AddPair(ep)
+ ep.Recycle()
+ }
+ }
+ return chainEventLoopConfigure(recordOutput, configure), recorder
+}
+
+// finaliseTrace waits for profiling to finish, flushes the flamegraph recorder
+// if one was created, and logs the total run duration.
+func finaliseTrace(recorder *flamegraph.Recorder, profiling *profilingControl, totalDuration time.Duration, logln func(...any)) error {
+ <-profiling.done
+ if recorder != nil {
+ if err := recorder.Write(); err != nil {
+ return err
+ }
+ }
+ logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration)
+ return nil
+}
+
func runTraceWithContext(parentCtx context.Context, cfg flags.Config, started chan<- struct{}, configure func(*eventLoop)) error {
if getEUID() != 0 {
return errRootPrivilegesRequired
@@ -394,10 +457,7 @@ func runTraceWithContext(parentCtx context.Context, cfg flags.Config, started ch
verbose := started == nil
logln := newLogger(verbose)
- var recorder *flamegraph.Recorder
- if cfg.FlamegraphOutput {
- recorder = flamegraph.NewRecorder(cfg.OutputName)
- }
+ configure, recorder := maybePrependFlamegraphConfigure(cfg, configure)
bpfModule, mgr, releaseBindings, err := setupBPFModule(parentCtx, cfg)
if err != nil {
@@ -431,29 +491,12 @@ func runTraceWithContext(parentCtx context.Context, cfg flags.Config, started ch
if err != nil {
return err
}
- if recorder != nil {
- recordOutput := func(el *eventLoop) {
- el.printCb = func(ep *event.Pair) {
- recorder.AddPair(ep)
- ep.Recycle()
- }
- }
- configure = chainEventLoopConfigure(recordOutput, configure)
- }
configureEventLoopOutput(el, mgr, configure)
startTraceShutdownWatcher(ctx, verbose, el, profiling, logln)
startTime := time.Now()
el.run(ctx, ch)
- totalDuration := time.Since(startTime)
- <-profiling.done
- if recorder != nil {
- if err := recorder.Write(); err != nil {
- return err
- }
- }
- logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration)
- return nil
+ return finaliseTrace(recorder, profiling, time.Since(startTime), logln)
}
func chainEventLoopConfigure(fns ...func(*eventLoop)) func(*eventLoop) {