diff options
| -rw-r--r-- | cmd/ior/main.go | 10 | ||||
| -rw-r--r-- | internal/bench_pipeline_test.go | 8 | ||||
| -rw-r--r-- | internal/ior.go | 103 | ||||
| -rw-r--r-- | internal/ior_bpfsetup.go | 4 | ||||
| -rw-r--r-- | internal/ior_mode_test.go | 82 | ||||
| -rw-r--r-- | internal/runtime/runtime.go | 161 | ||||
| -rw-r--r-- | internal/streamrow/ringbuffer.go | 79 | ||||
| -rw-r--r-- | internal/tui/eventstream/ringbuffer.go | 72 | ||||
| -rw-r--r-- | internal/tui/tui.go | 154 |
9 files changed, 449 insertions, 224 deletions
diff --git a/cmd/ior/main.go b/cmd/ior/main.go index ebd3ef3..9c5bac1 100644 --- a/cmd/ior/main.go +++ b/cmd/ior/main.go @@ -7,6 +7,7 @@ import ( "ior/internal" "ior/internal/flags" + "ior/internal/tui" ) // main is the entry point for the application. It checks if the OS is Linux, @@ -32,6 +33,15 @@ func main() { return } + // Wire the concrete TUI runner functions into the core internal package. + // This is the only place that imports both internal and internal/tui, which + // breaks the cycle: internal no longer imports internal/tui. + internal.SetTUIRunners( + tui.RunWithTraceStarterConfig, + tui.RunTestFlamesWithTraceStarterConfig, + tui.RunTestFlamesWithTraceStarterConfig, // same runner; starter differs (static vs live) + ) + // Run the internal logic of the application. if err := internal.Run(cfg); err != nil { fmt.Printf("Failed to run: %v\n", err) diff --git a/internal/bench_pipeline_test.go b/internal/bench_pipeline_test.go index b520c9c..7967e3a 100644 --- a/internal/bench_pipeline_test.go +++ b/internal/bench_pipeline_test.go @@ -11,7 +11,7 @@ import ( "ior/internal/flamegraph" "ior/internal/parquet" "ior/internal/statsengine" - "ior/internal/tui/eventstream" + "ior/internal/streamrow" ) const ( @@ -198,8 +198,8 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n preseedBenchComms(el, numThreads) engine := statsengine.NewEngine(64) - streamBuf := eventstream.NewRingBuffer() - streamSeq := eventstream.NewSequencer(0) + streamBuf := streamrow.NewRingBuffer() + streamSeq := streamrow.NewSequencer(0) liveTrie := flamegraph.NewLiveTrie([]string{"comm", "tracepoint", "path"}, "count") recorder := parquet.NewRecorder(parquet.RecorderConfig{}) @@ -212,7 +212,7 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n var recordErr error el.printCb = func(ep *event.Pair) { - row := eventstream.NewStreamEvent(streamSeq.Next(), ep) + row := streamrow.New(streamSeq.Next(), ep) engine.Ingest(ep) streamBuf.Push(row) if recordErr == nil { diff --git a/internal/ior.go b/internal/ior.go index 3341a82..b6070dc 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -16,30 +16,52 @@ import ( "ior/internal/globalfilter" "ior/internal/parquet" "ior/internal/probemanager" + "ior/internal/runtime" "ior/internal/statsengine" - "ior/internal/tui" - "ior/internal/tui/eventstream" + "ior/internal/streamrow" bpf "github.com/aquasecurity/libbpfgo" ) +// tuiRunFunc is the function type for launching the TUI with a given config +// and trace starter. Concrete implementations live in the tui layer; they are +// injected at startup via SetTUIRunners so that the core package (internal) +// never imports the TUI layer. +type tuiRunFunc func(flags.Config, runtime.TraceStarter) error + var ( - runTraceFn = runTrace - runParquetFn = runHeadlessParquet - runTraceWithContextFn = runTraceWithContext - runTUIFn = tui.RunWithTraceStarterConfig - runTUITestFlamesFn = tui.RunTestFlamesWithTraceStarterConfig - runTUITestLiveFlamesFn = tui.RunTestFlamesWithTraceStarterConfig // same runner; starter differs (static vs live) + runTraceFn = runTrace + runParquetFn = runHeadlessParquet + runTraceWithContextFn = runTraceWithContext + // runTUIFn, runTUITestFlamesFn, runTUITestLiveFlamesFn are injected by + // main (via SetTUIRunners) before Run is called. They default to nil so + // that test files can replace individual runners without importing tui. + runTUIFn tuiRunFunc + runTUITestFlamesFn tuiRunFunc + runTUITestLiveFlamesFn tuiRunFunc getEUID = os.Geteuid errRootPrivilegesRequired = errors.New("tracing requires root privileges (run with sudo)") ) -type streamEventSink interface { - eventstream.Source - Push(eventstream.StreamEvent) +// SetTUIRunners injects the concrete TUI runner functions from the cmd layer +// so the core internal package does not need to import the TUI packages. +// This must be called before Run when running in TUI mode. +func SetTUIRunners( + runTUI tuiRunFunc, + runTUITestFlames tuiRunFunc, + runTUITestLiveFlames tuiRunFunc, +) { + runTUIFn = runTUI + runTUITestFlamesFn = runTUITestFlames + runTUITestLiveFlamesFn = runTUITestLiveFlames } +// streamEventSink is the write-side contract for the stream ring buffer used +// by the TUI trace starter. It is identical to runtime.EventSink but defined +// here to avoid a second import alias at the call sites below. +type streamEventSink = runtime.EventSink + // Run is the main entry point for the ior binary. // cfg must be provided by the caller; it should not be fetched from the global singleton here. func Run(cfg flags.Config) error { @@ -110,11 +132,13 @@ func validateRunConfig(cfg flags.Config) error { return nil } -func tuiTestFlamesStarter(cfg flags.Config) tui.TraceStarter { +// tuiTestFlamesStarter returns a TraceStarter that seeds static test flame data +// into the runtime bindings without starting BPF tracing. +func tuiTestFlamesStarter(cfg flags.Config) runtime.TraceStarter { return func(ctx context.Context) error { engine, streamBuf, liveTrie := buildTestFlamesRuntime(cfg) // Only setter methods are needed here; use the narrower publisher interface. - if bindings, ok := tui.RuntimePublisherFromContext(ctx); ok { + if bindings, ok := runtime.RuntimePublisherFromContext(ctx); ok { bindings.SetDashboardSnapshotSource(engine) bindings.SetEventStreamSource(streamBuf) bindings.SetLiveTrie(liveTrie) @@ -123,11 +147,13 @@ func tuiTestFlamesStarter(cfg flags.Config) tui.TraceStarter { } } -func tuiTestLiveFlamesStarter(cfg flags.Config) tui.TraceStarter { +// tuiTestLiveFlamesStarter returns a TraceStarter that seeds a continuously +// updating synthetic flame data source into the runtime bindings. +func tuiTestLiveFlamesStarter(cfg flags.Config) runtime.TraceStarter { return func(ctx context.Context) error { engine, streamBuf, liveTrie := buildTestLiveFlamesRuntime(ctx, cfg) // Only setter methods are needed here; use the narrower publisher interface. - if bindings, ok := tui.RuntimePublisherFromContext(ctx); ok { + if bindings, ok := runtime.RuntimePublisherFromContext(ctx); ok { bindings.SetDashboardSnapshotSource(engine) bindings.SetEventStreamSource(streamBuf) bindings.SetLiveTrie(liveTrie) @@ -136,17 +162,21 @@ func tuiTestLiveFlamesStarter(cfg flags.Config) tui.TraceStarter { } } -func buildTestFlamesRuntime(cfg flags.Config) (*statsengine.Engine, *eventstream.RingBuffer, *flamegraph.LiveTrie) { +// buildTestFlamesRuntime allocates a stats engine, stream buffer, and seeded +// live trie for static test-flames mode. +func buildTestFlamesRuntime(cfg flags.Config) (*statsengine.Engine, *streamrow.RingBuffer, *flamegraph.LiveTrie) { engine := statsengine.NewEngine(64) - streamBuf := eventstream.NewRingBuffer() + streamBuf := streamrow.NewRingBuffer() liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField) flamegraph.SeedTestFlameData(liveTrie) return engine, streamBuf, liveTrie } -func buildTestLiveFlamesRuntime(ctx context.Context, cfg flags.Config) (*statsengine.Engine, *eventstream.RingBuffer, *flamegraph.LiveTrie) { +// buildTestLiveFlamesRuntime allocates a stats engine, stream buffer, and live +// trie for live test-flames mode, then launches a goroutine to update the trie. +func buildTestLiveFlamesRuntime(ctx context.Context, cfg flags.Config) (*statsengine.Engine, *streamrow.RingBuffer, *flamegraph.LiveTrie) { engine := statsengine.NewEngine(64) - streamBuf := eventstream.NewRingBuffer() + streamBuf := streamrow.NewRingBuffer() liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField) flamegraph.SeedTestLiveFlameData(liveTrie, 0) @@ -188,8 +218,8 @@ func shouldRunTraceMode(cfg flags.Config) bool { type tuiRuntime struct { engine *statsengine.Engine streamBuf streamEventSink - streamSrc eventstream.Source - streamSeq *eventstream.Sequencer + streamSrc runtime.StreamSource + streamSeq *streamrow.Sequencer liveTrie *flamegraph.LiveTrie recorder *parquet.Recorder filterEpoch uint64 @@ -201,14 +231,14 @@ type tuiRuntime struct { func buildTUIRuntime(ctx context.Context, cfg flags.Config) (*tuiRuntime, error) { rt := &tuiRuntime{ engine: statsengine.NewEngine(64), - streamSeq: eventstream.NewSequencer(0), + streamSeq: streamrow.NewSequencer(0), liveTrie: flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField), } - buf := streamEventSink(eventstream.NewRingBuffer()) + buf := streamrow.NewRingBuffer() rt.streamBuf = buf - rt.streamSrc = eventstream.Source(buf) + rt.streamSrc = buf - if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + if bindings, ok := runtime.RuntimeBindingsFromContext(ctx); ok { if persistent := bindings.StreamBuffer(); persistent != nil { rt.streamSrc = persistent sink, ok := persistent.(streamEventSink) @@ -244,7 +274,7 @@ func makeTUIEventLoopConfigurer(ctx context.Context, cfg flags.Config, rt *tuiRu ep.Recycle() return } - row := eventstream.NewStreamEvent(rt.streamSeq.Next(), ep) + row := streamrow.New(rt.streamSeq.Next(), ep) rt.engine.Ingest(ep) rt.streamBuf.Push(row) if rt.recorder != nil { @@ -262,28 +292,29 @@ func makeTUIEventLoopConfigurer(ctx context.Context, cfg flags.Config, rt *tuiRu ep.Recycle() } el.warningCb = func(message string) { - rt.streamBuf.Push(eventstream.NewWarningEvent(rt.streamSeq.Next(), message)) + rt.streamBuf.Push(streamrow.NewWarning(rt.streamSeq.Next(), message)) } - if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + if bindings, ok := runtime.RuntimeBindingsFromContext(ctx); ok { bindings.SetLiveFilterSetter(el.SetFilter) } } } -// tuiTraceStarterFromRunTrace returns a tui.TraceStarter that drives a full -// BPF trace session from within the TUI lifecycle. It allocates per-restart -// state via buildTUIRuntime, wires the event loop via makeTUIEventLoopConfigurer, -// and starts the trace in a goroutine, signalling the TUI once BPF probes are -// attached (via startedCh) or returning an error if startup fails. +// tuiTraceStarterFromRunTrace returns a runtime.TraceStarter that drives a +// full BPF trace session from within the TUI lifecycle. It allocates +// per-restart state via buildTUIRuntime, wires the event loop via +// makeTUIEventLoopConfigurer, and starts the trace in a goroutine, signalling +// the TUI once BPF probes are attached (via startedCh) or returning an error +// if startup fails. func tuiTraceStarterFromRunTrace( baseCfg flags.Config, startTrace func(context.Context, flags.Config, chan<- struct{}, func(*eventLoop)) error, -) tui.TraceStarter { +) runtime.TraceStarter { return func(ctx context.Context) error { bpf.SetLoggerCbs(bpf.Callbacks{Log: func(int, string) {}}) cfg := baseCfg - if filter, ok := tui.TraceFiltersFromContext(ctx); ok { + if filter, ok := runtime.TraceFiltersFromContext(ctx); ok { cfg.GlobalFilter = filter.Clone() applyTraceScopeFromGlobalFilter(&cfg, filter) } @@ -298,7 +329,7 @@ func tuiTraceStarterFromRunTrace( errCh := make(chan error, 1) go func() { err := startTrace(ctx, cfg, startedCh, configureEl) - if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + if bindings, ok := runtime.RuntimeBindingsFromContext(ctx); ok { bindings.SetLiveFilterSetter(nil) } errCh <- err diff --git a/internal/ior_bpfsetup.go b/internal/ior_bpfsetup.go index 9ab9186..d5f3be8 100644 --- a/internal/ior_bpfsetup.go +++ b/internal/ior_bpfsetup.go @@ -8,8 +8,8 @@ import ( appconfig "ior/internal/config" "ior/internal/flags" "ior/internal/probemanager" + "ior/internal/runtime" "ior/internal/tracepoints" - "ior/internal/tui" bpf "github.com/aquasecurity/libbpfgo" ) @@ -80,7 +80,7 @@ func setupBPFModule(parentCtx context.Context, cfg flags.Config) (*bpf.Module, * } // setupBPFModule only injects the probe manager; it does not read TUI state, // so RuntimePublisher is the correct narrower interface to use here. - if bindings, ok := tui.RuntimePublisherFromContext(parentCtx); ok { + if bindings, ok := runtime.RuntimePublisherFromContext(parentCtx); ok { bindings.SetProbeManager(mgr) releaseBindings = func() { bindings.SetProbeManager(nil) } } diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go index 876b374..0697ada 100644 --- a/internal/ior_mode_test.go +++ b/internal/ior_mode_test.go @@ -18,9 +18,8 @@ import ( "ior/internal/flags" "ior/internal/globalfilter" "ior/internal/parquet" - "ior/internal/tui" - "ior/internal/tui/eventstream" - flamegraphtui "ior/internal/tui/flamegraph" + "ior/internal/runtime" + "ior/internal/streamrow" "ior/internal/types" parquetgo "github.com/parquet-go/parquet-go" @@ -117,15 +116,15 @@ func TestDispatchRunUsesTraceModeWhenRequested(t *testing.T) { t.Fatalf("runParquetFn should not be called in plain trace mode") return nil } - runTUIFn = func(flags.Config, tui.TraceStarter) error { + runTUIFn = func(flags.Config, runtime.TraceStarter) error { tuiCalled = true return nil } - runTUITestFlamesFn = func(flags.Config, tui.TraceStarter) error { + runTUITestFlamesFn = func(flags.Config, runtime.TraceStarter) error { t.Fatalf("runTUITestFlamesFn should not be called in trace mode") return nil } - runTUITestLiveFlamesFn = func(flags.Config, tui.TraceStarter) error { + runTUITestLiveFlamesFn = func(flags.Config, runtime.TraceStarter) error { t.Fatalf("runTUITestLiveFlamesFn should not be called in trace mode") return nil } @@ -170,15 +169,15 @@ func TestDispatchRunUsesHeadlessParquetModeWhenRequested(t *testing.T) { parquetCalled = true return nil } - runTUIFn = func(flags.Config, tui.TraceStarter) error { + runTUIFn = func(flags.Config, runtime.TraceStarter) error { tuiCalled = true return nil } - runTUITestFlamesFn = func(flags.Config, tui.TraceStarter) error { + runTUITestFlamesFn = func(flags.Config, runtime.TraceStarter) error { t.Fatalf("runTUITestFlamesFn should not be called in parquet mode") return nil } - runTUITestLiveFlamesFn = func(flags.Config, tui.TraceStarter) error { + runTUITestLiveFlamesFn = func(flags.Config, runtime.TraceStarter) error { t.Fatalf("runTUITestLiveFlamesFn should not be called in parquet mode") return nil } @@ -219,15 +218,15 @@ func TestDispatchRunUsesTUIWhenOnlyPprofEnabled(t *testing.T) { traceCalled = true return nil } - runTUIFn = func(flags.Config, tui.TraceStarter) error { + runTUIFn = func(flags.Config, runtime.TraceStarter) error { tuiCalled = true return nil } - runTUITestFlamesFn = func(flags.Config, tui.TraceStarter) error { + runTUITestFlamesFn = func(flags.Config, runtime.TraceStarter) error { t.Fatalf("runTUITestFlamesFn should not be called for regular TUI mode") return nil } - runTUITestLiveFlamesFn = func(flags.Config, tui.TraceStarter) error { + runTUITestLiveFlamesFn = func(flags.Config, runtime.TraceStarter) error { t.Fatalf("runTUITestLiveFlamesFn should not be called for regular TUI mode") return nil } @@ -270,7 +269,7 @@ func TestDispatchRunUsesTUIStarterWhenNotPlain(t *testing.T) { } tuiCalled := false - runTUIFn = func(_ flags.Config, starter tui.TraceStarter) error { + runTUIFn = func(_ flags.Config, starter runtime.TraceStarter) error { tuiCalled = true if starter == nil { t.Fatalf("expected non-nil starter") @@ -280,11 +279,11 @@ func TestDispatchRunUsesTUIStarterWhenNotPlain(t *testing.T) { } return nil } - runTUITestFlamesFn = func(flags.Config, tui.TraceStarter) error { + runTUITestFlamesFn = func(flags.Config, runtime.TraceStarter) error { t.Fatalf("runTUITestFlamesFn should not be called for normal starter path") return nil } - runTUITestLiveFlamesFn = func(flags.Config, tui.TraceStarter) error { + runTUITestLiveFlamesFn = func(flags.Config, runtime.TraceStarter) error { t.Fatalf("runTUITestLiveFlamesFn should not be called for normal starter path") return nil } @@ -323,18 +322,18 @@ func TestDispatchRunUsesTestFlamesModeWhenRequested(t *testing.T) { traceCalled = true return nil } - runTUIFn = func(flags.Config, tui.TraceStarter) error { + runTUIFn = func(flags.Config, runtime.TraceStarter) error { regularTUICalled = true return nil } - runTUITestFlamesFn = func(_ flags.Config, starter tui.TraceStarter) error { + runTUITestFlamesFn = func(_ flags.Config, starter runtime.TraceStarter) error { testFlamesCalled = true if starter == nil { t.Fatalf("expected non-nil starter for test flames mode") } return starter(context.Background()) } - runTUITestLiveFlamesFn = func(flags.Config, tui.TraceStarter) error { + runTUITestLiveFlamesFn = func(flags.Config, runtime.TraceStarter) error { t.Fatalf("runTUITestLiveFlamesFn should not be called for --testflames") return nil } @@ -373,15 +372,15 @@ func TestDispatchRunUsesTestLiveFlamesModeWhenRequested(t *testing.T) { traceCalled = true return nil } - runTUIFn = func(flags.Config, tui.TraceStarter) error { + runTUIFn = func(flags.Config, runtime.TraceStarter) error { regularTUICalled = true return nil } - runTUITestFlamesFn = func(flags.Config, tui.TraceStarter) error { + runTUITestFlamesFn = func(flags.Config, runtime.TraceStarter) error { t.Fatalf("runTUITestFlamesFn should not be called for --testliveflames") return nil } - runTUITestLiveFlamesFn = func(_ flags.Config, starter tui.TraceStarter) error { + runTUITestLiveFlamesFn = func(_ flags.Config, starter runtime.TraceStarter) error { testLiveFlamesCalled = true if starter == nil { t.Fatalf("expected non-nil starter for test live flames mode") @@ -577,7 +576,7 @@ func TestTuiTraceStarterFromRunTraceUsesContextFilters(t *testing.T) { }, ) - ctx := tui.ContextWithTraceFilters(context.Background(), globalfilter.Filter{ + ctx := runtime.ContextWithTraceFilters(context.Background(), globalfilter.Filter{ PID: &globalfilter.NumericFilter{Op: globalfilter.OpEq, Value: 2222}, TID: &globalfilter.NumericFilter{Op: globalfilter.OpEq, Value: 3333}, Comm: &globalfilter.StringFilter{Pattern: "nginx"}, @@ -766,8 +765,8 @@ func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) } bindings := &traceRuntimeBindingsStub{ - streamBuffer: eventstream.NewRingBuffer(), - streamSeq: eventstream.NewSequencer(0), + streamBuffer: streamrow.NewRingBuffer(), + streamSeq: streamrow.NewSequencer(0), recorder: recorder, } base := flags.NewFlags() @@ -797,7 +796,7 @@ func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) }, ) - ctx := tui.ContextWithRuntimeBindings(context.Background(), bindings) + ctx := runtime.ContextWithRuntimeBindings(context.Background(), bindings) if err := starter(ctx); err != nil { t.Fatalf("first starter() error = %v", err) } @@ -839,8 +838,8 @@ func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) // printCb admits, without any restart of the trace pipeline. func TestTuiTraceStarterAppliesLiveFilterSwapInPlace(t *testing.T) { bindings := &traceRuntimeBindingsStub{ - streamBuffer: eventstream.NewRingBuffer(), - streamSeq: eventstream.NewSequencer(0), + streamBuffer: streamrow.NewRingBuffer(), + streamSeq: streamrow.NewSequencer(0), } base := flags.NewFlags() base.GlobalFilter = globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "keep"}} @@ -864,7 +863,7 @@ func TestTuiTraceStarterAppliesLiveFilterSwapInPlace(t *testing.T) { }, ) - ctx := tui.ContextWithRuntimeBindings(context.Background(), bindings) + ctx := runtime.ContextWithRuntimeBindings(context.Background(), bindings) starterErr := make(chan error, 1) go func() { starterErr <- starter(ctx) }() @@ -898,28 +897,31 @@ func TestTuiTraceStarterAppliesLiveFilterSwapInPlace(t *testing.T) { } } +// traceRuntimeBindingsStub is a test double for runtime.TraceRuntimeBindings +// that records injected stream sources and exposes the live-filter setter for +// assertions. type traceRuntimeBindingsStub struct { - streamBuffer *eventstream.RingBuffer - streamSource eventstream.Source - streamSeq *eventstream.Sequencer + streamBuffer *streamrow.RingBuffer + streamSource runtime.StreamSource + streamSeq *streamrow.Sequencer recorder *parquet.Recorder filterEpoch uint64 // mu guards liveFilterSetter, which is mutated from the trace-starter - // goroutine (via SetLiveFilterSetter) and read from the test - // goroutine when invoking the in-place swap. + // goroutine (via SetLiveFilterSetter) and read from the test goroutine + // when invoking the in-place swap. mu sync.Mutex liveFilterSetter func(globalfilter.Filter) } -func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(tui.SnapshotSource) {} +func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(runtime.SnapshotSource) {} -func (b *traceRuntimeBindingsStub) SetEventStreamSource(source eventstream.Source) { +func (b *traceRuntimeBindingsStub) SetEventStreamSource(source runtime.StreamSource) { b.streamSource = source } -func (b *traceRuntimeBindingsStub) SetLiveTrie(flamegraphtui.LiveTrieSource) {} +func (b *traceRuntimeBindingsStub) SetLiveTrie(runtime.LiveTrieSource) {} -func (b *traceRuntimeBindingsStub) SetProbeManager(tui.ProbeManager) {} +func (b *traceRuntimeBindingsStub) SetProbeManager(runtime.ProbeManager) {} func (b *traceRuntimeBindingsStub) SetLiveFilterSetter(setter func(globalfilter.Filter)) { b.mu.Lock() @@ -933,7 +935,7 @@ func (b *traceRuntimeBindingsStub) currentLiveFilterSetter() func(globalfilter.F return b.liveFilterSetter } -func (b *traceRuntimeBindingsStub) StreamBuffer() eventstream.Source { +func (b *traceRuntimeBindingsStub) StreamBuffer() runtime.StreamSource { return b.streamBuffer } @@ -941,7 +943,7 @@ func (b *traceRuntimeBindingsStub) Recorder() *parquet.Recorder { return b.recorder } -func (b *traceRuntimeBindingsStub) StreamSequencer() *eventstream.Sequencer { +func (b *traceRuntimeBindingsStub) StreamSequencer() *streamrow.Sequencer { return b.streamSeq } @@ -962,7 +964,7 @@ func testTracePair(seq uint64, comm string) *event.Pair { return pair } -func waitForStreamRows(t *testing.T, buffer *eventstream.RingBuffer, want int) { +func waitForStreamRows(t *testing.T, buffer *streamrow.RingBuffer, want int) { t.Helper() deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) { diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go new file mode 100644 index 0000000..a9e959a --- /dev/null +++ b/internal/runtime/runtime.go @@ -0,0 +1,161 @@ +// Package runtime defines the shared interface contract between the core tracing +// engine (internal) and the TUI layer (internal/tui). By placing these +// interfaces in a neutral sub-package, neither layer imports the other; instead +// both depend on runtime. +package runtime + +import ( + "context" + + "ior/internal/flamegraph" + "ior/internal/globalfilter" + "ior/internal/parquet" + "ior/internal/probemanager" + "ior/internal/statsengine" + "ior/internal/streamrow" +) + +// TraceStarter starts tracing and returns when startup succeeds or fails. +// Long-lived tracing work must continue in background goroutines. +type TraceStarter func(context.Context) error + +// StreamSource is the minimal stream-buffer contract needed by the tracing +// engine and the TUI stream view. It mirrors eventstream.Source but is defined +// here so the core package need not import internal/tui/eventstream. +type StreamSource interface { + Len() int + Snapshot() []streamrow.Row +} + +// EventSink is the write side of the stream buffer: the tracing engine pushes +// events, the TUI reads them via StreamSource. Embedding StreamSource keeps the +// two sides co-located while allowing callers to hold only the read interface. +type EventSink interface { + StreamSource + Push(streamrow.Row) +} + +// SnapshotSource provides statsengine snapshots for the TUI dashboard. +// The core tracing engine passes a *statsengine.Engine; the TUI stores it +// behind this interface so the dashboard can retrieve live snapshots. +type SnapshotSource interface { + Snapshot() *statsengine.Snapshot +} + +// LiveTrieSource is the minimal flamegraph-trie contract needed by the tracing +// engine and the flamegraph TUI model. It mirrors the interface defined in +// internal/tui/flamegraph but lives here so the core package need not import +// that TUI sub-package. Both interfaces are satisfied by *flamegraph.LiveTrie. +type LiveTrieSource interface { + Fields() []string + CountField() string + Reconfigure([]string) error + SetCountField(string) error + Reset() + Version() uint64 + SnapshotJSON() ([]byte, uint64) + SnapshotTree() (*flamegraph.SnapshotNode, uint64) +} + +// ProbeManager exposes runtime probe controls to the TUI probes modal. +// *probemanager.Manager implements this interface. +type ProbeManager interface { + States() []probemanager.ProbeState + Toggle(syscall string) error + ActiveCount() (int, int) +} + +// RuntimePublisher is the write side of the TUI runtime contract. +// A trace starter calls these methods to inject live data into the active TUI. +type RuntimePublisher interface { + // SetDashboardSnapshotSource wires the stats engine into the dashboard. + SetDashboardSnapshotSource(source SnapshotSource) + // SetEventStreamSource wires the stream buffer into the TUI stream view. + SetEventStreamSource(source StreamSource) + // SetLiveTrie wires the live flamegraph trie into the TUI flamegraph view. + SetLiveTrie(liveTrie LiveTrieSource) + // SetProbeManager wires the BPF probe manager into the TUI probes modal. + SetProbeManager(manager ProbeManager) + // SetLiveFilterSetter registers (or, with nil, unregisters) a callback that + // applies a new global filter to the running trace pipeline in-place without + // restarting BPF probes. The trace starter passes its eventloop's SetFilter; + // the TUI calls it on every filter change. + SetLiveFilterSetter(setter func(globalfilter.Filter)) +} + +// RuntimeState is the read side of the TUI runtime contract. +// A trace starter calls these methods to obtain persistent state owned by the TUI. +type RuntimeState interface { + // StreamBuffer returns the TUI-owned ring buffer used for stream events. + StreamBuffer() StreamSource + // Recorder returns the parquet recorder for optional stream recording. + Recorder() *parquet.Recorder + // StreamSequencer returns the shared monotonic sequence counter for stream rows. + StreamSequencer() *streamrow.Sequencer + // FilterEpoch returns the current filter epoch used for parquet recording. + FilterEpoch() uint64 +} + +// TraceRuntimeBindings composes RuntimePublisher and RuntimeState so a trace +// starter can both inject live data and read persistent TUI-owned state. +type TraceRuntimeBindings interface { + RuntimePublisher + RuntimeState +} + +// --- context key types and helpers --- + +// runtimeBindingsKey is an unexported context key for runtime bindings. +type runtimeBindingsKey struct{} + +// traceFiltersKey is an unexported context key for trace filter values. +type traceFiltersKey struct{} + +// traceFilters wraps a cloned filter stored on the context by the TUI model. +type traceFilters struct { + filter globalfilter.Filter +} + +// ContextWithRuntimeBindings stores trace runtime bindings on the context so +// a trace starter can retrieve them via RuntimeBindingsFromContext. +func ContextWithRuntimeBindings(ctx context.Context, bindings TraceRuntimeBindings) context.Context { + return context.WithValue(ctx, runtimeBindingsKey{}, bindings) +} + +// RuntimeBindingsFromContext returns the full TraceRuntimeBindings when the +// context was created by the TUI. Use RuntimePublisherFromContext when only +// write access is needed. +func RuntimeBindingsFromContext(ctx context.Context) (TraceRuntimeBindings, bool) { + bindings, ok := ctx.Value(runtimeBindingsKey{}).(TraceRuntimeBindings) + if !ok || bindings == nil { + return nil, false + } + return bindings, true +} + +// RuntimePublisherFromContext returns only the RuntimePublisher side of the TUI +// bindings. Use this when the caller only injects data and does not need to +// read persistent TUI state. +func RuntimePublisherFromContext(ctx context.Context) (RuntimePublisher, bool) { + bindings, ok := ctx.Value(runtimeBindingsKey{}).(RuntimePublisher) + if !ok || bindings == nil { + return nil, false + } + return bindings, true +} + +// ContextWithTraceFilters stores the active trace filters on the context so +// a trace starter can retrieve them via TraceFiltersFromContext. +func ContextWithTraceFilters(ctx context.Context, filter globalfilter.Filter) context.Context { + filters := traceFilters{filter: filter.Clone()} + return context.WithValue(ctx, traceFiltersKey{}, filters) +} + +// TraceFiltersFromContext returns the active trace filters when provided by the TUI model. +func TraceFiltersFromContext(ctx context.Context) (globalfilter.Filter, bool) { + filters, ok := ctx.Value(traceFiltersKey{}).(traceFilters) + if !ok { + return globalfilter.Filter{}, false + } + return filters.filter.Clone(), true +} diff --git a/internal/streamrow/ringbuffer.go b/internal/streamrow/ringbuffer.go new file mode 100644 index 0000000..7820502 --- /dev/null +++ b/internal/streamrow/ringbuffer.go @@ -0,0 +1,79 @@ +package streamrow + +import "sync" + +const RingBufferCapacity = 10000 + +// RingBuffer is a fixed-capacity circular buffer of stream rows used by the +// tracing engine (write side) and the TUI stream view (read side). It +// satisfies the runtime.EventSink interface (Push + Len + Snapshot). +type RingBuffer struct { + mu sync.RWMutex + buf []Row + start int + size int + totalPushed uint64 +} + +// NewRingBuffer allocates an empty RingBuffer with the default capacity. +func NewRingBuffer() *RingBuffer { + return &RingBuffer{buf: make([]Row, RingBufferCapacity)} +} + +// Push appends a row to the ring buffer, overwriting the oldest entry when full. +func (r *RingBuffer) Push(ev Row) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.size < RingBufferCapacity { + idx := (r.start + r.size) % RingBufferCapacity + r.buf[idx] = ev + r.size++ + } else { + r.buf[r.start] = ev + r.start = (r.start + 1) % RingBufferCapacity + } + r.totalPushed++ +} + +// Snapshot returns a copy of all rows in insertion order. +func (r *RingBuffer) Snapshot() []Row { + r.mu.RLock() + defer r.mu.RUnlock() + + if r.size == 0 { + return make([]Row, 0) + } + + out := make([]Row, r.size) + for i := 0; i < r.size; i++ { + out[i] = r.buf[(r.start+i)%RingBufferCapacity] + } + return out +} + +// Len returns the current number of rows in the buffer. +func (r *RingBuffer) Len() int { + r.mu.RLock() + defer r.mu.RUnlock() + return r.size +} + +// TotalPushed returns the total number of rows pushed since construction or +// the last Reset, including those that have been overwritten. +func (r *RingBuffer) TotalPushed() uint64 { + r.mu.RLock() + defer r.mu.RUnlock() + return r.totalPushed +} + +// Reset clears all rows and resets the total-pushed counter. +func (r *RingBuffer) Reset() { + r.mu.Lock() + defer r.mu.Unlock() + + clear(r.buf) + r.start = 0 + r.size = 0 + r.totalPushed = 0 +} diff --git a/internal/tui/eventstream/ringbuffer.go b/internal/tui/eventstream/ringbuffer.go index 87dacae..8644b42 100644 --- a/internal/tui/eventstream/ringbuffer.go +++ b/internal/tui/eventstream/ringbuffer.go @@ -1,69 +1,17 @@ package eventstream -import "sync" +import "ior/internal/streamrow" -const ringBufferCapacity = 10000 +// RingBuffer is a type alias for streamrow.RingBuffer. The concrete +// implementation lives in the lower-level streamrow package so the core +// tracing engine can use it without importing internal/tui/eventstream. +type RingBuffer = streamrow.RingBuffer -type RingBuffer struct { - mu sync.RWMutex - buf []StreamEvent - start int - size int - totalPushed uint64 -} +// ringBufferCapacity mirrors the capacity constant for use in this package +// (e.g. stream table rendering). +const ringBufferCapacity = streamrow.RingBufferCapacity +// NewRingBuffer allocates an empty RingBuffer with the default capacity. func NewRingBuffer() *RingBuffer { - return &RingBuffer{buf: make([]StreamEvent, ringBufferCapacity)} -} - -func (r *RingBuffer) Push(ev StreamEvent) { - r.mu.Lock() - defer r.mu.Unlock() - - if r.size < ringBufferCapacity { - idx := (r.start + r.size) % ringBufferCapacity - r.buf[idx] = ev - r.size++ - } else { - r.buf[r.start] = ev - r.start = (r.start + 1) % ringBufferCapacity - } - r.totalPushed++ -} - -func (r *RingBuffer) Snapshot() []StreamEvent { - r.mu.RLock() - defer r.mu.RUnlock() - - if r.size == 0 { - return make([]StreamEvent, 0) - } - - out := make([]StreamEvent, r.size) - for i := 0; i < r.size; i++ { - out[i] = r.buf[(r.start+i)%ringBufferCapacity] - } - return out -} - -func (r *RingBuffer) Len() int { - r.mu.RLock() - defer r.mu.RUnlock() - return r.size -} - -func (r *RingBuffer) TotalPushed() uint64 { - r.mu.RLock() - defer r.mu.RUnlock() - return r.totalPushed -} - -func (r *RingBuffer) Reset() { - r.mu.Lock() - defer r.mu.Unlock() - - clear(r.buf) - r.start = 0 - r.size = 0 - r.totalPushed = 0 + return streamrow.NewRingBuffer() } diff --git a/internal/tui/tui.go b/internal/tui/tui.go index 997fe22..06fafea 100644 --- a/internal/tui/tui.go +++ b/internal/tui/tui.go @@ -14,13 +14,12 @@ import ( "ior/internal/flags" "ior/internal/globalfilter" "ior/internal/parquet" - "ior/internal/probemanager" + "ior/internal/runtime" "ior/internal/statsengine" common "ior/internal/tui/common" dashboardui "ior/internal/tui/dashboard" "ior/internal/tui/eventstream" tuiexport "ior/internal/tui/export" - flamegraphtui "ior/internal/tui/flamegraph" "ior/internal/tui/messages" "ior/internal/tui/pidpicker" "ior/internal/tui/probes" @@ -43,70 +42,61 @@ const ( ) // TraceStarter starts tracing and returns when startup succeeds or fails. +// It is a type alias for runtime.TraceStarter so TUI callers need not import +// the runtime package directly. // Long-lived tracing work should continue in background goroutines. -type TraceStarter func(context.Context) error +type TraceStarter = runtime.TraceStarter // SnapshotSource provides dashboard snapshots for TUI rendering. -type SnapshotSource interface { - Snapshot() *statsengine.Snapshot -} +// It is a type alias for runtime.SnapshotSource. +type SnapshotSource = runtime.SnapshotSource // ProbeManager exposes runtime probe controls to TUI layers. -type ProbeManager interface { - States() []probemanager.ProbeState - Toggle(syscall string) error - ActiveCount() (int, int) -} +// It is a type alias for runtime.ProbeManager. +type ProbeManager = runtime.ProbeManager // RuntimePublisher is the write side of the TUI runtime contract. -// A trace starter calls these methods to inject live data into the active TUI. -type RuntimePublisher interface { - SetDashboardSnapshotSource(source SnapshotSource) - SetEventStreamSource(source eventstream.Source) - SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource) - SetProbeManager(manager ProbeManager) - // SetLiveFilterSetter registers (or, with nil, unregisters) a callback that - // applies a new global filter to the running trace pipeline in place. The - // trace starter passes its eventloop's SetFilter; the TUI calls it on every - // filter change to avoid restarting the BPF probes. - SetLiveFilterSetter(setter func(globalfilter.Filter)) -} +// It is a type alias for runtime.RuntimePublisher; the runtime package owns +// the canonical definition so the core tracing layer can depend on it without +// importing internal/tui. +type RuntimePublisher = runtime.RuntimePublisher // RuntimeState is the read side of the TUI runtime contract. -// A trace starter calls these methods to obtain persistent state owned by the TUI. -type RuntimeState interface { - StreamBuffer() eventstream.Source - Recorder() *parquet.Recorder - StreamSequencer() *eventstream.Sequencer - FilterEpoch() uint64 -} +// It is a type alias for runtime.RuntimeState. +type RuntimeState = runtime.RuntimeState // TraceRuntimeBindings composes RuntimePublisher and RuntimeState so a trace // starter can both inject live data and read persistent TUI-owned state. -type TraceRuntimeBindings interface { - RuntimePublisher - RuntimeState -} - -type runtimeBindingsContextKey struct{} -type traceFiltersContextKey struct{} +// It is a type alias for runtime.TraceRuntimeBindings. +type TraceRuntimeBindings = runtime.TraceRuntimeBindings +// runtimeBindings is the TUI-owned concrete implementation of +// runtime.TraceRuntimeBindings. It guards all fields with a read-write mutex so +// the trace starter goroutine and the Bubble Tea update loop can safely exchange +// live data. type runtimeBindings struct { mu sync.RWMutex - snapshotSource SnapshotSource - streamSource eventstream.Source - streamBuffer *eventstream.RingBuffer - streamSeq *eventstream.Sequencer - recorder *parquet.Recorder - liveTrieSource flamegraphtui.LiveTrieSource - probeManager ProbeManager + // snapshotSource is the stats engine injected by the trace starter. + snapshotSource runtime.SnapshotSource + // streamSource is the active read-side source (may be swapped on reset). + streamSource runtime.StreamSource + // streamBuffer is the TUI-owned ring buffer; it always satisfies both + // runtime.StreamSource (Len/Snapshot) and runtime.EventSink (Push). + streamBuffer *eventstream.RingBuffer + // streamSeq is the shared monotonic counter for stream row sequencing. + streamSeq *eventstream.Sequencer + // recorder handles optional parquet stream recording. + recorder *parquet.Recorder + // liveTrieSource is the flamegraph trie injected by the trace starter. + liveTrieSource runtime.LiveTrieSource + // probeManager is the BPF probe manager injected by the trace starter. + probeManager runtime.ProbeManager + // liveFilterSetter, when non-nil, applies filter changes to the running + // event loop in-place so BPF probes need not be restarted. liveFilterSetter func(globalfilter.Filter) - filterEpoch atomic.Uint64 -} - -type traceFilters struct { - filter globalfilter.Filter + // filterEpoch increments on every filter change and is stored in parquet rows. + filterEpoch atomic.Uint64 } func newRuntimeBindings() *runtimeBindings { @@ -119,52 +109,62 @@ func newRuntimeBindings() *runtimeBindings { } } -func (r *runtimeBindings) SetDashboardSnapshotSource(source SnapshotSource) { +// SetDashboardSnapshotSource wires the stats engine into the dashboard. +func (r *runtimeBindings) SetDashboardSnapshotSource(source runtime.SnapshotSource) { r.mu.Lock() r.snapshotSource = source r.mu.Unlock() } -func (r *runtimeBindings) SetEventStreamSource(source eventstream.Source) { +// SetEventStreamSource wires the stream buffer into the TUI stream view. +func (r *runtimeBindings) SetEventStreamSource(source runtime.StreamSource) { r.mu.Lock() r.streamSource = source r.mu.Unlock() } -func (r *runtimeBindings) StreamBuffer() eventstream.Source { +// StreamBuffer returns the TUI-owned ring buffer, which satisfies runtime.StreamSource. +func (r *runtimeBindings) StreamBuffer() runtime.StreamSource { r.mu.RLock() defer r.mu.RUnlock() return r.streamBuffer } +// Recorder returns the parquet recorder for optional stream recording. func (r *runtimeBindings) Recorder() *parquet.Recorder { r.mu.RLock() defer r.mu.RUnlock() return r.recorder } +// StreamSequencer returns the shared monotonic counter for stream row sequencing. func (r *runtimeBindings) StreamSequencer() *eventstream.Sequencer { r.mu.RLock() defer r.mu.RUnlock() return r.streamSeq } +// FilterEpoch returns the current filter epoch used for parquet recording. func (r *runtimeBindings) FilterEpoch() uint64 { return r.filterEpoch.Load() } -func (r *runtimeBindings) SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource) { +// SetLiveTrie wires the live flamegraph trie into the TUI flamegraph view. +func (r *runtimeBindings) SetLiveTrie(liveTrie runtime.LiveTrieSource) { r.mu.Lock() r.liveTrieSource = liveTrie r.mu.Unlock() } -func (r *runtimeBindings) SetProbeManager(manager ProbeManager) { +// SetProbeManager wires the BPF probe manager into the TUI probes modal. +func (r *runtimeBindings) SetProbeManager(manager runtime.ProbeManager) { r.mu.Lock() r.probeManager = manager r.mu.Unlock() } +// SetLiveFilterSetter registers (or, with nil, unregisters) the live filter +// callback so the TUI can update the running trace pipeline in-place. func (r *runtimeBindings) SetLiveFilterSetter(setter func(globalfilter.Filter)) { r.mu.Lock() r.liveFilterSetter = setter @@ -186,25 +186,29 @@ func (r *runtimeBindings) applyLiveFilter(filter globalfilter.Filter) bool { return true } -func (r *runtimeBindings) dashboardSnapshotSource() SnapshotSource { +// dashboardSnapshotSource returns the currently wired stats engine source. +func (r *runtimeBindings) dashboardSnapshotSource() runtime.SnapshotSource { r.mu.RLock() defer r.mu.RUnlock() return r.snapshotSource } -func (r *runtimeBindings) eventStreamSource() eventstream.Source { +// eventStreamSource returns the currently active stream read source. +func (r *runtimeBindings) eventStreamSource() runtime.StreamSource { r.mu.RLock() defer r.mu.RUnlock() return r.streamSource } -func (r *runtimeBindings) liveTrie() flamegraphtui.LiveTrieSource { +// liveTrie returns the currently wired flamegraph trie source. +func (r *runtimeBindings) liveTrie() runtime.LiveTrieSource { r.mu.RLock() defer r.mu.RUnlock() return r.liveTrieSource } -func (r *runtimeBindings) currentProbeManager() ProbeManager { +// currentProbeManager returns the currently wired probe manager. +func (r *runtimeBindings) currentProbeManager() runtime.ProbeManager { r.mu.RLock() defer r.mu.RUnlock() return r.probeManager @@ -241,44 +245,34 @@ func (r *runtimeBindings) resetDashboardSnapshotSource() *statsengine.Snapshot { // RuntimeBindingsFromContext returns the full TraceRuntimeBindings when the // context was created by the TUI. Use RuntimePublisherFromContext when only -// write access is needed. +// write access is needed. Delegates to runtime.RuntimeBindingsFromContext. func RuntimeBindingsFromContext(ctx context.Context) (TraceRuntimeBindings, bool) { - bindings, ok := ctx.Value(runtimeBindingsContextKey{}).(TraceRuntimeBindings) - if !ok || bindings == nil { - return nil, false - } - return bindings, true + return runtime.RuntimeBindingsFromContext(ctx) } -// RuntimePublisherFromContext returns only the RuntimePublisher side of the -// TUI bindings. Use this when the caller only injects data and does not need -// to read persistent TUI state. +// RuntimePublisherFromContext returns only the RuntimePublisher side of the TUI +// bindings. Use this when the caller only injects data and does not need to +// read persistent TUI state. Delegates to runtime.RuntimePublisherFromContext. func RuntimePublisherFromContext(ctx context.Context) (RuntimePublisher, bool) { - bindings, ok := ctx.Value(runtimeBindingsContextKey{}).(RuntimePublisher) - if !ok || bindings == nil { - return nil, false - } - return bindings, true + return runtime.RuntimePublisherFromContext(ctx) } // ContextWithRuntimeBindings stores trace runtime bindings on the context. +// Delegates to runtime.ContextWithRuntimeBindings. func ContextWithRuntimeBindings(ctx context.Context, bindings TraceRuntimeBindings) context.Context { - return context.WithValue(ctx, runtimeBindingsContextKey{}, bindings) + return runtime.ContextWithRuntimeBindings(ctx, bindings) } // ContextWithTraceFilters stores the active trace filters for the trace starter. +// Delegates to runtime.ContextWithTraceFilters. func ContextWithTraceFilters(ctx context.Context, filter globalfilter.Filter) context.Context { - filters := traceFilters{filter: filter.Clone()} - return context.WithValue(ctx, traceFiltersContextKey{}, filters) + return runtime.ContextWithTraceFilters(ctx, filter) } // TraceFiltersFromContext returns the active trace filters when provided by the TUI model. +// Delegates to runtime.TraceFiltersFromContext. func TraceFiltersFromContext(ctx context.Context) (globalfilter.Filter, bool) { - filters, ok := ctx.Value(traceFiltersContextKey{}).(traceFilters) - if !ok { - return globalfilter.Filter{}, false - } - return filters.filter.Clone(), true + return runtime.TraceFiltersFromContext(ctx) } // RunWithTraceStarterConfig starts the TUI with explicit runtime flags. |
