diff options
Diffstat (limited to 'internal/ior.go')
| -rw-r--r-- | internal/ior.go | 103 |
1 files changed, 67 insertions, 36 deletions
diff --git a/internal/ior.go b/internal/ior.go index 3341a82..b6070dc 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -16,30 +16,52 @@ import ( "ior/internal/globalfilter" "ior/internal/parquet" "ior/internal/probemanager" + "ior/internal/runtime" "ior/internal/statsengine" - "ior/internal/tui" - "ior/internal/tui/eventstream" + "ior/internal/streamrow" bpf "github.com/aquasecurity/libbpfgo" ) +// tuiRunFunc is the function type for launching the TUI with a given config +// and trace starter. Concrete implementations live in the tui layer; they are +// injected at startup via SetTUIRunners so that the core package (internal) +// never imports the TUI layer. +type tuiRunFunc func(flags.Config, runtime.TraceStarter) error + var ( - runTraceFn = runTrace - runParquetFn = runHeadlessParquet - runTraceWithContextFn = runTraceWithContext - runTUIFn = tui.RunWithTraceStarterConfig - runTUITestFlamesFn = tui.RunTestFlamesWithTraceStarterConfig - runTUITestLiveFlamesFn = tui.RunTestFlamesWithTraceStarterConfig // same runner; starter differs (static vs live) + runTraceFn = runTrace + runParquetFn = runHeadlessParquet + runTraceWithContextFn = runTraceWithContext + // runTUIFn, runTUITestFlamesFn, runTUITestLiveFlamesFn are injected by + // main (via SetTUIRunners) before Run is called. They default to nil so + // that test files can replace individual runners without importing tui. + runTUIFn tuiRunFunc + runTUITestFlamesFn tuiRunFunc + runTUITestLiveFlamesFn tuiRunFunc getEUID = os.Geteuid errRootPrivilegesRequired = errors.New("tracing requires root privileges (run with sudo)") ) -type streamEventSink interface { - eventstream.Source - Push(eventstream.StreamEvent) +// SetTUIRunners injects the concrete TUI runner functions from the cmd layer +// so the core internal package does not need to import the TUI packages. +// This must be called before Run when running in TUI mode. +func SetTUIRunners( + runTUI tuiRunFunc, + runTUITestFlames tuiRunFunc, + runTUITestLiveFlames tuiRunFunc, +) { + runTUIFn = runTUI + runTUITestFlamesFn = runTUITestFlames + runTUITestLiveFlamesFn = runTUITestLiveFlames } +// streamEventSink is the write-side contract for the stream ring buffer used +// by the TUI trace starter. It is identical to runtime.EventSink but defined +// here to avoid a second import alias at the call sites below. +type streamEventSink = runtime.EventSink + // Run is the main entry point for the ior binary. // cfg must be provided by the caller; it should not be fetched from the global singleton here. func Run(cfg flags.Config) error { @@ -110,11 +132,13 @@ func validateRunConfig(cfg flags.Config) error { return nil } -func tuiTestFlamesStarter(cfg flags.Config) tui.TraceStarter { +// tuiTestFlamesStarter returns a TraceStarter that seeds static test flame data +// into the runtime bindings without starting BPF tracing. +func tuiTestFlamesStarter(cfg flags.Config) runtime.TraceStarter { return func(ctx context.Context) error { engine, streamBuf, liveTrie := buildTestFlamesRuntime(cfg) // Only setter methods are needed here; use the narrower publisher interface. - if bindings, ok := tui.RuntimePublisherFromContext(ctx); ok { + if bindings, ok := runtime.RuntimePublisherFromContext(ctx); ok { bindings.SetDashboardSnapshotSource(engine) bindings.SetEventStreamSource(streamBuf) bindings.SetLiveTrie(liveTrie) @@ -123,11 +147,13 @@ func tuiTestFlamesStarter(cfg flags.Config) tui.TraceStarter { } } -func tuiTestLiveFlamesStarter(cfg flags.Config) tui.TraceStarter { +// tuiTestLiveFlamesStarter returns a TraceStarter that seeds a continuously +// updating synthetic flame data source into the runtime bindings. +func tuiTestLiveFlamesStarter(cfg flags.Config) runtime.TraceStarter { return func(ctx context.Context) error { engine, streamBuf, liveTrie := buildTestLiveFlamesRuntime(ctx, cfg) // Only setter methods are needed here; use the narrower publisher interface. - if bindings, ok := tui.RuntimePublisherFromContext(ctx); ok { + if bindings, ok := runtime.RuntimePublisherFromContext(ctx); ok { bindings.SetDashboardSnapshotSource(engine) bindings.SetEventStreamSource(streamBuf) bindings.SetLiveTrie(liveTrie) @@ -136,17 +162,21 @@ func tuiTestLiveFlamesStarter(cfg flags.Config) tui.TraceStarter { } } -func buildTestFlamesRuntime(cfg flags.Config) (*statsengine.Engine, *eventstream.RingBuffer, *flamegraph.LiveTrie) { +// buildTestFlamesRuntime allocates a stats engine, stream buffer, and seeded +// live trie for static test-flames mode. +func buildTestFlamesRuntime(cfg flags.Config) (*statsengine.Engine, *streamrow.RingBuffer, *flamegraph.LiveTrie) { engine := statsengine.NewEngine(64) - streamBuf := eventstream.NewRingBuffer() + streamBuf := streamrow.NewRingBuffer() liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField) flamegraph.SeedTestFlameData(liveTrie) return engine, streamBuf, liveTrie } -func buildTestLiveFlamesRuntime(ctx context.Context, cfg flags.Config) (*statsengine.Engine, *eventstream.RingBuffer, *flamegraph.LiveTrie) { +// buildTestLiveFlamesRuntime allocates a stats engine, stream buffer, and live +// trie for live test-flames mode, then launches a goroutine to update the trie. +func buildTestLiveFlamesRuntime(ctx context.Context, cfg flags.Config) (*statsengine.Engine, *streamrow.RingBuffer, *flamegraph.LiveTrie) { engine := statsengine.NewEngine(64) - streamBuf := eventstream.NewRingBuffer() + streamBuf := streamrow.NewRingBuffer() liveTrie := flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField) flamegraph.SeedTestLiveFlameData(liveTrie, 0) @@ -188,8 +218,8 @@ func shouldRunTraceMode(cfg flags.Config) bool { type tuiRuntime struct { engine *statsengine.Engine streamBuf streamEventSink - streamSrc eventstream.Source - streamSeq *eventstream.Sequencer + streamSrc runtime.StreamSource + streamSeq *streamrow.Sequencer liveTrie *flamegraph.LiveTrie recorder *parquet.Recorder filterEpoch uint64 @@ -201,14 +231,14 @@ type tuiRuntime struct { func buildTUIRuntime(ctx context.Context, cfg flags.Config) (*tuiRuntime, error) { rt := &tuiRuntime{ engine: statsengine.NewEngine(64), - streamSeq: eventstream.NewSequencer(0), + streamSeq: streamrow.NewSequencer(0), liveTrie: flamegraph.NewLiveTrie(cfg.CollapsedFields, cfg.CountField), } - buf := streamEventSink(eventstream.NewRingBuffer()) + buf := streamrow.NewRingBuffer() rt.streamBuf = buf - rt.streamSrc = eventstream.Source(buf) + rt.streamSrc = buf - if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + if bindings, ok := runtime.RuntimeBindingsFromContext(ctx); ok { if persistent := bindings.StreamBuffer(); persistent != nil { rt.streamSrc = persistent sink, ok := persistent.(streamEventSink) @@ -244,7 +274,7 @@ func makeTUIEventLoopConfigurer(ctx context.Context, cfg flags.Config, rt *tuiRu ep.Recycle() return } - row := eventstream.NewStreamEvent(rt.streamSeq.Next(), ep) + row := streamrow.New(rt.streamSeq.Next(), ep) rt.engine.Ingest(ep) rt.streamBuf.Push(row) if rt.recorder != nil { @@ -262,28 +292,29 @@ func makeTUIEventLoopConfigurer(ctx context.Context, cfg flags.Config, rt *tuiRu ep.Recycle() } el.warningCb = func(message string) { - rt.streamBuf.Push(eventstream.NewWarningEvent(rt.streamSeq.Next(), message)) + rt.streamBuf.Push(streamrow.NewWarning(rt.streamSeq.Next(), message)) } - if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + if bindings, ok := runtime.RuntimeBindingsFromContext(ctx); ok { bindings.SetLiveFilterSetter(el.SetFilter) } } } -// tuiTraceStarterFromRunTrace returns a tui.TraceStarter that drives a full -// BPF trace session from within the TUI lifecycle. It allocates per-restart -// state via buildTUIRuntime, wires the event loop via makeTUIEventLoopConfigurer, -// and starts the trace in a goroutine, signalling the TUI once BPF probes are -// attached (via startedCh) or returning an error if startup fails. +// tuiTraceStarterFromRunTrace returns a runtime.TraceStarter that drives a +// full BPF trace session from within the TUI lifecycle. It allocates +// per-restart state via buildTUIRuntime, wires the event loop via +// makeTUIEventLoopConfigurer, and starts the trace in a goroutine, signalling +// the TUI once BPF probes are attached (via startedCh) or returning an error +// if startup fails. func tuiTraceStarterFromRunTrace( baseCfg flags.Config, startTrace func(context.Context, flags.Config, chan<- struct{}, func(*eventLoop)) error, -) tui.TraceStarter { +) runtime.TraceStarter { return func(ctx context.Context) error { bpf.SetLoggerCbs(bpf.Callbacks{Log: func(int, string) {}}) cfg := baseCfg - if filter, ok := tui.TraceFiltersFromContext(ctx); ok { + if filter, ok := runtime.TraceFiltersFromContext(ctx); ok { cfg.GlobalFilter = filter.Clone() applyTraceScopeFromGlobalFilter(&cfg, filter) } @@ -298,7 +329,7 @@ func tuiTraceStarterFromRunTrace( errCh := make(chan error, 1) go func() { err := startTrace(ctx, cfg, startedCh, configureEl) - if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + if bindings, ok := runtime.RuntimeBindingsFromContext(ctx); ok { bindings.SetLiveFilterSetter(nil) } errCh <- err |
