diff options
| -rw-r--r-- | internal/eventloop.go | 27 | ||||
| -rw-r--r-- | internal/eventloop_exit.go | 8 | ||||
| -rw-r--r-- | internal/eventloop_filter_test.go | 10 | ||||
| -rw-r--r-- | internal/eventloop_runtime.go | 8 | ||||
| -rw-r--r-- | internal/ior.go | 16 | ||||
| -rw-r--r-- | internal/ior_mode_test.go | 83 | ||||
| -rw-r--r-- | internal/tui/tui.go | 64 |
7 files changed, 192 insertions, 24 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go index 87f99ed..fc82ee4 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -2,6 +2,7 @@ package internal import ( "fmt" + "sync/atomic" "time" "ior/internal/event" @@ -38,7 +39,11 @@ type eventLoopConfig struct { type rawEventHandler func(raw []byte, ch chan<- *event.Pair) type eventLoop struct { - filter globalfilter.Filter + // filterPtr holds the active global filter. Stored as atomic.Pointer so + // the TUI can swap filters in place via SetFilter without tearing down + // and reattaching the BPF probes (the previous behavior caused a multi- + // second 'Attaching tracepoints' overlay every time the filter changed). + filterPtr atomic.Pointer[globalfilter.Filter] pairs pairTracker // enter/exit pairing state and inter-syscall duration tracking pendingHandles *pendingHandleTracker // TID → pathname from name_to_handle_at, for open_by_handle_at correlation fdTracker *fdTracker // fd table and procfs resolution cache @@ -57,6 +62,24 @@ type eventLoop struct { done chan struct{} } +// Filter returns a snapshot of the currently active global filter. Each call +// loads a single atomic pointer and returns the underlying value, so the +// caller observes a consistent filter even if SetFilter races concurrently. +func (e *eventLoop) Filter() globalfilter.Filter { + if p := e.filterPtr.Load(); p != nil { + return *p + } + return globalfilter.Filter{} +} + +// SetFilter atomically replaces the active global filter. The replacement is +// cloned so the caller can keep mutating its own filter without affecting +// what the eventloop sees. +func (e *eventLoop) SetFilter(filter globalfilter.Filter) { + cloned := filter.Clone() + e.filterPtr.Store(&cloned) +} + func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { fdState := configuredFDTracker(cfg.fdTracker) commState := configuredCommResolver(cfg.commResolver) @@ -65,7 +88,6 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { } el := &eventLoop{ - filter: cfg.filter.Clone(), pairs: newPairTracker(), pendingHandles: newPendingHandleTracker(), fdTracker: fdState, @@ -75,6 +97,7 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { cfg: cfg, done: make(chan struct{}), } + el.SetFilter(cfg.filter) el.initRawHandlers() el.configureOutputCallback() el.seedTrackedPidComm() diff --git a/internal/eventloop_exit.go b/internal/eventloop_exit.go index e97688a..5f6442c 100644 --- a/internal/eventloop_exit.go +++ b/internal/eventloop_exit.go @@ -111,7 +111,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *types.FdEvent) bool { } } ep.Comm = e.comm(fdEv.GetTid()) - if !e.filter.MatchPair(ep) { + if !e.Filter().MatchPair(ep) { ep.Recycle() return false } @@ -152,7 +152,7 @@ func (e *eventLoop) handleDup3Exit(ep *event.Pair, dup3Ev *types.Dup3Event) bool fd := int32(dup3Ev.Fd) ep.File = e.fdState().resolve(fd, dup3Ev.Pid) ep.Comm = e.comm(dup3Ev.GetTid()) - if !e.filter.MatchPair(ep) { + if !e.Filter().MatchPair(ep) { ep.Recycle() return false } @@ -233,7 +233,7 @@ func (e *eventLoop) handleNullExit(ep *event.Pair, nullEv *types.NullEvent) bool } } ep.Comm = e.comm(nullEv.GetTid()) - if !e.filter.MatchPair(ep) { + if !e.Filter().MatchPair(ep) { ep.Recycle() return false } @@ -244,7 +244,7 @@ func (e *eventLoop) handleFcntlExit(ep *event.Pair, fcntlEv *types.FcntlEvent) b ep.Comm = e.comm(fcntlEv.GetTid()) fd := int32(fcntlEv.Fd) ep.File = e.fdState().resolve(fd, fcntlEv.Pid) - if !e.filter.MatchPair(ep) { + if !e.Filter().MatchPair(ep) { ep.Recycle() return false } diff --git a/internal/eventloop_filter_test.go b/internal/eventloop_filter_test.go index 4e45060..930312c 100644 --- a/internal/eventloop_filter_test.go +++ b/internal/eventloop_filter_test.go @@ -450,7 +450,6 @@ func TestCommFilterToggle(t *testing.T) { // Create eventloop without comm filter el := &eventLoop{ - filter: globalfilter.Filter{}, pairs: newPairTracker(), fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), @@ -462,6 +461,7 @@ func TestCommFilterToggle(t *testing.T) { }, done: make(chan struct{}), } + el.SetFilter(globalfilter.Filter{}) go el.run(ctx, inCh) go func() { @@ -491,9 +491,6 @@ func TestCommFilterToggle(t *testing.T) { // Create eventloop with comm filter enabled el := &eventLoop{ - filter: globalfilter.Filter{ - Comm: &globalfilter.StringFilter{Pattern: "test"}, - }, pairs: newPairTracker(), fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), @@ -505,6 +502,9 @@ func TestCommFilterToggle(t *testing.T) { }, done: make(chan struct{}), } + el.SetFilter(globalfilter.Filter{ + Comm: &globalfilter.StringFilter{Pattern: "test"}, + }) go el.run(ctx, inCh) go func() { @@ -527,7 +527,6 @@ func TestCommFilterToggle(t *testing.T) { func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop { el := &eventLoop{ - filter: testFilter(commFilter, pathFilter), pairs: newPairTracker(), fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), @@ -535,6 +534,7 @@ func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop { printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, done: make(chan struct{}), } + el.SetFilter(testFilter(commFilter, pathFilter)) return el } diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index 7f540ec..85a90a1 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -123,7 +123,7 @@ func (e *eventLoop) initRawHandlers() { if !ok { return } - if e.filter.MatchOpenEvent(openEv) { + if e.Filter().MatchOpenEvent(openEv) { e.tracepointEntered(openEv) } } @@ -174,7 +174,7 @@ func (e *eventLoop) initRawHandlers() { if !ok { return } - if e.filter.MatchNameEvent(nameEv) { + if e.Filter().MatchNameEvent(nameEv) { e.tracepointEntered(nameEv) } } @@ -183,7 +183,7 @@ func (e *eventLoop) initRawHandlers() { if !ok { return } - if e.filter.MatchPathEvent(pathEv) { + if e.Filter().MatchPathEvent(pathEv) { e.tracepointEntered(pathEv) } } @@ -223,7 +223,7 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) { tid := enterEv.GetTid() // Schedule comm lookup as early as possible to reduce races for short-lived processes. e.queueCommLookup(tid) - if !e.filter.UsesCommFilter() { + if !e.Filter().UsesCommFilter() { e.pairs.set(enterEv) return } diff --git a/internal/ior.go b/internal/ior.go index 63da223..354267b 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -228,8 +228,16 @@ func tuiTraceStarterFromRunTrace( go func() { err := startTrace(ctx, cfg, startedCh, func(el *eventLoop) { + // Seed the eventloop's filter from the config so subsequent + // reads via el.Filter() see the same filter the trace was + // started with. The TUI can later replace it in place via + // runtimeBindings.applyLiveFilter, which calls el.SetFilter. + el.SetFilter(cfg.GlobalFilter) + // Read the live filter on each event so the TUI can swap + // filters in place via runtimeBindings.applyLiveFilter + // without restarting the BPF probes. el.printCb = func(ep *event.Pair) { - if !shouldIngestTracePair(cfg.GlobalFilter, ep) { + if !shouldIngestTracePair(el.Filter(), ep) { ep.Recycle() return } @@ -253,7 +261,13 @@ func tuiTraceStarterFromRunTrace( el.warningCb = func(message string) { streamBuf.Push(eventstream.NewWarningEvent(streamSeq.Next(), message)) } + if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + bindings.SetLiveFilterSetter(el.SetFilter) + } }) + if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + bindings.SetLiveFilterSetter(nil) + } errCh <- err close(errCh) }() diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go index ab0466d..876b374 100644 --- a/internal/ior_mode_test.go +++ b/internal/ior_mode_test.go @@ -8,6 +8,7 @@ import ( "io" "os" "path/filepath" + "sync" "testing" "testing/synctest" "time" @@ -832,12 +833,82 @@ func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) } } +// TestTuiTraceStarterAppliesLiveFilterSwapInPlace exercises the in-place +// filter swap path: after the trace is running, calling the registered +// SetLiveFilterSetter callback should change which events the eventloop's +// printCb admits, without any restart of the trace pipeline. +func TestTuiTraceStarterAppliesLiveFilterSwapInPlace(t *testing.T) { + bindings := &traceRuntimeBindingsStub{ + streamBuffer: eventstream.NewRingBuffer(), + streamSeq: eventstream.NewSequencer(0), + } + base := flags.NewFlags() + base.GlobalFilter = globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "keep"}} + + // release lets the test hold the fake starter open while assertions + // run. In production, startTrace blocks on el.run for the trace's + // lifetime, so the runtime bindings keep their live filter setter + // registered the whole time. Returning immediately would race against + // the trace starter's deferred SetLiveFilterSetter(nil) cleanup. + release := make(chan struct{}) + captured := make(chan *eventLoop, 1) + starter := tuiTraceStarterFromRunTrace( + base, + func(_ context.Context, _ flags.Config, started chan<- struct{}, configure func(*eventLoop)) error { + el := &eventLoop{} + configure(el) + captured <- el + close(started) + <-release + return nil + }, + ) + + ctx := tui.ContextWithRuntimeBindings(context.Background(), bindings) + starterErr := make(chan error, 1) + go func() { starterErr <- starter(ctx) }() + + el := <-captured + + // Initial filter from base.GlobalFilter accepts only comm=="keep". + el.printCb(testTracePair(1, "keep")) + el.printCb(testTracePair(2, "drop")) + if got := bindings.streamBuffer.Len(); got != 1 { + t.Fatalf("stream rows after initial filter = %d, want 1", got) + } + + // Trace starter must have registered an in-place filter setter. + setter := bindings.currentLiveFilterSetter() + if setter == nil { + t.Fatalf("expected SetLiveFilterSetter to receive a non-nil callback") + } + + // Swap to a filter that accepts only comm=="drop". No restart should + // happen — the same eventloop now emits the previously-dropped events. + setter(globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "drop"}}) + el.printCb(testTracePair(3, "keep")) + el.printCb(testTracePair(4, "drop")) + if got := bindings.streamBuffer.Len(); got != 2 { + t.Fatalf("stream rows after live swap = %d, want 2", got) + } + + close(release) + if err := <-starterErr; err != nil { + t.Fatalf("starter() error = %v", err) + } +} + type traceRuntimeBindingsStub struct { streamBuffer *eventstream.RingBuffer streamSource eventstream.Source streamSeq *eventstream.Sequencer recorder *parquet.Recorder filterEpoch uint64 + // mu guards liveFilterSetter, which is mutated from the trace-starter + // goroutine (via SetLiveFilterSetter) and read from the test + // goroutine when invoking the in-place swap. + mu sync.Mutex + liveFilterSetter func(globalfilter.Filter) } func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(tui.SnapshotSource) {} @@ -850,6 +921,18 @@ func (b *traceRuntimeBindingsStub) SetLiveTrie(flamegraphtui.LiveTrieSource) {} func (b *traceRuntimeBindingsStub) SetProbeManager(tui.ProbeManager) {} +func (b *traceRuntimeBindingsStub) SetLiveFilterSetter(setter func(globalfilter.Filter)) { + b.mu.Lock() + b.liveFilterSetter = setter + b.mu.Unlock() +} + +func (b *traceRuntimeBindingsStub) currentLiveFilterSetter() func(globalfilter.Filter) { + b.mu.Lock() + defer b.mu.Unlock() + return b.liveFilterSetter +} + func (b *traceRuntimeBindingsStub) StreamBuffer() eventstream.Source { return b.streamBuffer } diff --git a/internal/tui/tui.go b/internal/tui/tui.go index 3a866e5..42441a1 100644 --- a/internal/tui/tui.go +++ b/internal/tui/tui.go @@ -65,6 +65,11 @@ type RuntimePublisher interface { SetEventStreamSource(source eventstream.Source) SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource) SetProbeManager(manager ProbeManager) + // SetLiveFilterSetter registers (or, with nil, unregisters) a callback that + // applies a new global filter to the running trace pipeline in place. The + // trace starter passes its eventloop's SetFilter; the TUI calls it on every + // filter change to avoid restarting the BPF probes. + SetLiveFilterSetter(setter func(globalfilter.Filter)) } // RuntimeState is the read side of the TUI runtime contract. @@ -89,14 +94,15 @@ type traceFiltersContextKey struct{} type runtimeBindings struct { mu sync.RWMutex - snapshotSource SnapshotSource - streamSource eventstream.Source - streamBuffer *eventstream.RingBuffer - streamSeq *eventstream.Sequencer - recorder *parquet.Recorder - liveTrieSource flamegraphtui.LiveTrieSource - probeManager ProbeManager - filterEpoch atomic.Uint64 + snapshotSource SnapshotSource + streamSource eventstream.Source + streamBuffer *eventstream.RingBuffer + streamSeq *eventstream.Sequencer + recorder *parquet.Recorder + liveTrieSource flamegraphtui.LiveTrieSource + probeManager ProbeManager + liveFilterSetter func(globalfilter.Filter) + filterEpoch atomic.Uint64 } type traceFilters struct { @@ -159,6 +165,27 @@ func (r *runtimeBindings) SetProbeManager(manager ProbeManager) { r.mu.Unlock() } +func (r *runtimeBindings) SetLiveFilterSetter(setter func(globalfilter.Filter)) { + r.mu.Lock() + r.liveFilterSetter = setter + r.mu.Unlock() +} + +// applyLiveFilter swaps the active global filter in place via the setter +// registered by the trace starter, returning true if a setter was available. +// Returning false tells the caller it must fall back to a full trace restart +// (typically because no trace is currently running). +func (r *runtimeBindings) applyLiveFilter(filter globalfilter.Filter) bool { + r.mu.RLock() + setter := r.liveFilterSetter + r.mu.RUnlock() + if setter == nil { + return false + } + setter(filter) + return true +} + func (r *runtimeBindings) dashboardSnapshotSource() SnapshotSource { r.mu.RLock() defer r.mu.RUnlock() @@ -915,6 +942,20 @@ func (m Model) applyGlobalFilter(filter globalfilter.Filter, action string) (tea } m.runtime.advanceFilterEpoch() + // Try the in-place swap first: hand the new filter to the running + // eventloop via the registered setter and only reset the dashboard + // aggregates so the displayed counts reflect the new filter going + // forward. The BPF probes stay attached, so the user no longer sees + // the multi-second 'Attaching tracepoints' overlay on filter changes. + if m.runtime.applyLiveFilter(nextFilter) { + m.dashboard.PrepareForTraceRestart() + m.lastErr = nil + return m, nil + } + + // Fallback: no trace currently running (e.g. first invocation), so + // restart the pipeline so the new filter takes effect on the next + // trace start. m.stopTrace() m.dashboard.PrepareForTraceRestart() m.attaching = true @@ -937,6 +978,13 @@ func (m Model) undoGlobalFilter() (tea.Model, tea.Cmd) { } m.runtime.advanceFilterEpoch() + // Same in-place swap path as applyGlobalFilter — see comment there. + if m.runtime.applyLiveFilter(prev) { + m.dashboard.PrepareForTraceRestart() + m.lastErr = nil + return m, nil + } + m.stopTrace() m.dashboard.PrepareForTraceRestart() m.attaching = true |
