diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-08 23:18:52 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-08 23:18:52 +0300 |
| commit | 75c483ec6443f731cc6f2149c4738547eb602c6f (patch) | |
| tree | 4875b619864a5eeeb8faff84f475c21382acb499 /internal | |
| parent | f86699a94bdde7d973ba5d6fa3e7ca4ab2f234fb (diff) | |
swap global filter in place to skip BPF reattach
Changing the global filter used to call stopTrace + beginTraceCmd, which
detached and re-attached every tracepoint and re-loaded the BPF object.
On heavily loaded I/O systems that took several seconds and showed an
'Attaching tracepoints...' overlay each time. The probe set never depends
on the global filter (ShouldIAttachTracepoint only reads CLI regex flags),
so the restart was gratuitous.
Now the eventloop holds its filter behind atomic.Pointer with SetFilter /
Filter accessors, and the trace starter registers el.SetFilter via the
runtime bindings as a SetLiveFilterSetter callback. applyGlobalFilter
and undoGlobalFilter call runtime.applyLiveFilter first; only if no
trace is running do they fall back to the full restart path.
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/eventloop.go | 27 | ||||
| -rw-r--r-- | internal/eventloop_exit.go | 8 | ||||
| -rw-r--r-- | internal/eventloop_filter_test.go | 10 | ||||
| -rw-r--r-- | internal/eventloop_runtime.go | 8 | ||||
| -rw-r--r-- | internal/ior.go | 16 | ||||
| -rw-r--r-- | internal/ior_mode_test.go | 83 | ||||
| -rw-r--r-- | internal/tui/tui.go | 64 |
7 files changed, 192 insertions, 24 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go index 87f99ed..fc82ee4 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -2,6 +2,7 @@ package internal import ( "fmt" + "sync/atomic" "time" "ior/internal/event" @@ -38,7 +39,11 @@ type eventLoopConfig struct { type rawEventHandler func(raw []byte, ch chan<- *event.Pair) type eventLoop struct { - filter globalfilter.Filter + // filterPtr holds the active global filter. Stored as atomic.Pointer so + // the TUI can swap filters in place via SetFilter without tearing down + // and reattaching the BPF probes (the previous behavior caused a multi- + // second 'Attaching tracepoints' overlay every time the filter changed). + filterPtr atomic.Pointer[globalfilter.Filter] pairs pairTracker // enter/exit pairing state and inter-syscall duration tracking pendingHandles *pendingHandleTracker // TID → pathname from name_to_handle_at, for open_by_handle_at correlation fdTracker *fdTracker // fd table and procfs resolution cache @@ -57,6 +62,24 @@ type eventLoop struct { done chan struct{} } +// Filter returns a snapshot of the currently active global filter. Each call +// loads a single atomic pointer and returns the underlying value, so the +// caller observes a consistent filter even if SetFilter races concurrently. +func (e *eventLoop) Filter() globalfilter.Filter { + if p := e.filterPtr.Load(); p != nil { + return *p + } + return globalfilter.Filter{} +} + +// SetFilter atomically replaces the active global filter. The replacement is +// cloned so the caller can keep mutating its own filter without affecting +// what the eventloop sees. +func (e *eventLoop) SetFilter(filter globalfilter.Filter) { + cloned := filter.Clone() + e.filterPtr.Store(&cloned) +} + func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { fdState := configuredFDTracker(cfg.fdTracker) commState := configuredCommResolver(cfg.commResolver) @@ -65,7 +88,6 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { } el := &eventLoop{ - filter: cfg.filter.Clone(), pairs: newPairTracker(), pendingHandles: newPendingHandleTracker(), fdTracker: fdState, @@ -75,6 +97,7 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { cfg: cfg, done: make(chan struct{}), } + el.SetFilter(cfg.filter) el.initRawHandlers() el.configureOutputCallback() el.seedTrackedPidComm() diff --git a/internal/eventloop_exit.go b/internal/eventloop_exit.go index e97688a..5f6442c 100644 --- a/internal/eventloop_exit.go +++ b/internal/eventloop_exit.go @@ -111,7 +111,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *types.FdEvent) bool { } } ep.Comm = e.comm(fdEv.GetTid()) - if !e.filter.MatchPair(ep) { + if !e.Filter().MatchPair(ep) { ep.Recycle() return false } @@ -152,7 +152,7 @@ func (e *eventLoop) handleDup3Exit(ep *event.Pair, dup3Ev *types.Dup3Event) bool fd := int32(dup3Ev.Fd) ep.File = e.fdState().resolve(fd, dup3Ev.Pid) ep.Comm = e.comm(dup3Ev.GetTid()) - if !e.filter.MatchPair(ep) { + if !e.Filter().MatchPair(ep) { ep.Recycle() return false } @@ -233,7 +233,7 @@ func (e *eventLoop) handleNullExit(ep *event.Pair, nullEv *types.NullEvent) bool } } ep.Comm = e.comm(nullEv.GetTid()) - if !e.filter.MatchPair(ep) { + if !e.Filter().MatchPair(ep) { ep.Recycle() return false } @@ -244,7 +244,7 @@ func (e *eventLoop) handleFcntlExit(ep *event.Pair, fcntlEv *types.FcntlEvent) b ep.Comm = e.comm(fcntlEv.GetTid()) fd := int32(fcntlEv.Fd) ep.File = e.fdState().resolve(fd, fcntlEv.Pid) - if !e.filter.MatchPair(ep) { + if !e.Filter().MatchPair(ep) { ep.Recycle() return false } diff --git a/internal/eventloop_filter_test.go b/internal/eventloop_filter_test.go index 4e45060..930312c 100644 --- a/internal/eventloop_filter_test.go +++ b/internal/eventloop_filter_test.go @@ -450,7 +450,6 @@ func TestCommFilterToggle(t *testing.T) { // Create eventloop without comm filter el := &eventLoop{ - filter: globalfilter.Filter{}, pairs: newPairTracker(), fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), @@ -462,6 +461,7 @@ func TestCommFilterToggle(t *testing.T) { }, done: make(chan struct{}), } + el.SetFilter(globalfilter.Filter{}) go el.run(ctx, inCh) go func() { @@ -491,9 +491,6 @@ func TestCommFilterToggle(t *testing.T) { // Create eventloop with comm filter enabled el := &eventLoop{ - filter: globalfilter.Filter{ - Comm: &globalfilter.StringFilter{Pattern: "test"}, - }, pairs: newPairTracker(), fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), @@ -505,6 +502,9 @@ func TestCommFilterToggle(t *testing.T) { }, done: make(chan struct{}), } + el.SetFilter(globalfilter.Filter{ + Comm: &globalfilter.StringFilter{Pattern: "test"}, + }) go el.run(ctx, inCh) go func() { @@ -527,7 +527,6 @@ func TestCommFilterToggle(t *testing.T) { func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop { el := &eventLoop{ - filter: testFilter(commFilter, pathFilter), pairs: newPairTracker(), fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), @@ -535,6 +534,7 @@ func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop { printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, done: make(chan struct{}), } + el.SetFilter(testFilter(commFilter, pathFilter)) return el } diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index 7f540ec..85a90a1 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -123,7 +123,7 @@ func (e *eventLoop) initRawHandlers() { if !ok { return } - if e.filter.MatchOpenEvent(openEv) { + if e.Filter().MatchOpenEvent(openEv) { e.tracepointEntered(openEv) } } @@ -174,7 +174,7 @@ func (e *eventLoop) initRawHandlers() { if !ok { return } - if e.filter.MatchNameEvent(nameEv) { + if e.Filter().MatchNameEvent(nameEv) { e.tracepointEntered(nameEv) } } @@ -183,7 +183,7 @@ func (e *eventLoop) initRawHandlers() { if !ok { return } - if e.filter.MatchPathEvent(pathEv) { + if e.Filter().MatchPathEvent(pathEv) { e.tracepointEntered(pathEv) } } @@ -223,7 +223,7 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) { tid := enterEv.GetTid() // Schedule comm lookup as early as possible to reduce races for short-lived processes. e.queueCommLookup(tid) - if !e.filter.UsesCommFilter() { + if !e.Filter().UsesCommFilter() { e.pairs.set(enterEv) return } diff --git a/internal/ior.go b/internal/ior.go index 63da223..354267b 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -228,8 +228,16 @@ func tuiTraceStarterFromRunTrace( go func() { err := startTrace(ctx, cfg, startedCh, func(el *eventLoop) { + // Seed the eventloop's filter from the config so subsequent + // reads via el.Filter() see the same filter the trace was + // started with. The TUI can later replace it in place via + // runtimeBindings.applyLiveFilter, which calls el.SetFilter. + el.SetFilter(cfg.GlobalFilter) + // Read the live filter on each event so the TUI can swap + // filters in place via runtimeBindings.applyLiveFilter + // without restarting the BPF probes. el.printCb = func(ep *event.Pair) { - if !shouldIngestTracePair(cfg.GlobalFilter, ep) { + if !shouldIngestTracePair(el.Filter(), ep) { ep.Recycle() return } @@ -253,7 +261,13 @@ func tuiTraceStarterFromRunTrace( el.warningCb = func(message string) { streamBuf.Push(eventstream.NewWarningEvent(streamSeq.Next(), message)) } + if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + bindings.SetLiveFilterSetter(el.SetFilter) + } }) + if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok { + bindings.SetLiveFilterSetter(nil) + } errCh <- err close(errCh) }() diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go index ab0466d..876b374 100644 --- a/internal/ior_mode_test.go +++ b/internal/ior_mode_test.go @@ -8,6 +8,7 @@ import ( "io" "os" "path/filepath" + "sync" "testing" "testing/synctest" "time" @@ -832,12 +833,82 @@ func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T) } } +// TestTuiTraceStarterAppliesLiveFilterSwapInPlace exercises the in-place +// filter swap path: after the trace is running, calling the registered +// SetLiveFilterSetter callback should change which events the eventloop's +// printCb admits, without any restart of the trace pipeline. +func TestTuiTraceStarterAppliesLiveFilterSwapInPlace(t *testing.T) { + bindings := &traceRuntimeBindingsStub{ + streamBuffer: eventstream.NewRingBuffer(), + streamSeq: eventstream.NewSequencer(0), + } + base := flags.NewFlags() + base.GlobalFilter = globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "keep"}} + + // release lets the test hold the fake starter open while assertions + // run. In production, startTrace blocks on el.run for the trace's + // lifetime, so the runtime bindings keep their live filter setter + // registered the whole time. Returning immediately would race against + // the trace starter's deferred SetLiveFilterSetter(nil) cleanup. + release := make(chan struct{}) + captured := make(chan *eventLoop, 1) + starter := tuiTraceStarterFromRunTrace( + base, + func(_ context.Context, _ flags.Config, started chan<- struct{}, configure func(*eventLoop)) error { + el := &eventLoop{} + configure(el) + captured <- el + close(started) + <-release + return nil + }, + ) + + ctx := tui.ContextWithRuntimeBindings(context.Background(), bindings) + starterErr := make(chan error, 1) + go func() { starterErr <- starter(ctx) }() + + el := <-captured + + // Initial filter from base.GlobalFilter accepts only comm=="keep". + el.printCb(testTracePair(1, "keep")) + el.printCb(testTracePair(2, "drop")) + if got := bindings.streamBuffer.Len(); got != 1 { + t.Fatalf("stream rows after initial filter = %d, want 1", got) + } + + // Trace starter must have registered an in-place filter setter. + setter := bindings.currentLiveFilterSetter() + if setter == nil { + t.Fatalf("expected SetLiveFilterSetter to receive a non-nil callback") + } + + // Swap to a filter that accepts only comm=="drop". No restart should + // happen — the same eventloop now emits the previously-dropped events. + setter(globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "drop"}}) + el.printCb(testTracePair(3, "keep")) + el.printCb(testTracePair(4, "drop")) + if got := bindings.streamBuffer.Len(); got != 2 { + t.Fatalf("stream rows after live swap = %d, want 2", got) + } + + close(release) + if err := <-starterErr; err != nil { + t.Fatalf("starter() error = %v", err) + } +} + type traceRuntimeBindingsStub struct { streamBuffer *eventstream.RingBuffer streamSource eventstream.Source streamSeq *eventstream.Sequencer recorder *parquet.Recorder filterEpoch uint64 + // mu guards liveFilterSetter, which is mutated from the trace-starter + // goroutine (via SetLiveFilterSetter) and read from the test + // goroutine when invoking the in-place swap. + mu sync.Mutex + liveFilterSetter func(globalfilter.Filter) } func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(tui.SnapshotSource) {} @@ -850,6 +921,18 @@ func (b *traceRuntimeBindingsStub) SetLiveTrie(flamegraphtui.LiveTrieSource) {} func (b *traceRuntimeBindingsStub) SetProbeManager(tui.ProbeManager) {} +func (b *traceRuntimeBindingsStub) SetLiveFilterSetter(setter func(globalfilter.Filter)) { + b.mu.Lock() + b.liveFilterSetter = setter + b.mu.Unlock() +} + +func (b *traceRuntimeBindingsStub) currentLiveFilterSetter() func(globalfilter.Filter) { + b.mu.Lock() + defer b.mu.Unlock() + return b.liveFilterSetter +} + func (b *traceRuntimeBindingsStub) StreamBuffer() eventstream.Source { return b.streamBuffer } diff --git a/internal/tui/tui.go b/internal/tui/tui.go index 3a866e5..42441a1 100644 --- a/internal/tui/tui.go +++ b/internal/tui/tui.go @@ -65,6 +65,11 @@ type RuntimePublisher interface { SetEventStreamSource(source eventstream.Source) SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource) SetProbeManager(manager ProbeManager) + // SetLiveFilterSetter registers (or, with nil, unregisters) a callback that + // applies a new global filter to the running trace pipeline in place. The + // trace starter passes its eventloop's SetFilter; the TUI calls it on every + // filter change to avoid restarting the BPF probes. + SetLiveFilterSetter(setter func(globalfilter.Filter)) } // RuntimeState is the read side of the TUI runtime contract. @@ -89,14 +94,15 @@ type traceFiltersContextKey struct{} type runtimeBindings struct { mu sync.RWMutex - snapshotSource SnapshotSource - streamSource eventstream.Source - streamBuffer *eventstream.RingBuffer - streamSeq *eventstream.Sequencer - recorder *parquet.Recorder - liveTrieSource flamegraphtui.LiveTrieSource - probeManager ProbeManager - filterEpoch atomic.Uint64 + snapshotSource SnapshotSource + streamSource eventstream.Source + streamBuffer *eventstream.RingBuffer + streamSeq *eventstream.Sequencer + recorder *parquet.Recorder + liveTrieSource flamegraphtui.LiveTrieSource + probeManager ProbeManager + liveFilterSetter func(globalfilter.Filter) + filterEpoch atomic.Uint64 } type traceFilters struct { @@ -159,6 +165,27 @@ func (r *runtimeBindings) SetProbeManager(manager ProbeManager) { r.mu.Unlock() } +func (r *runtimeBindings) SetLiveFilterSetter(setter func(globalfilter.Filter)) { + r.mu.Lock() + r.liveFilterSetter = setter + r.mu.Unlock() +} + +// applyLiveFilter swaps the active global filter in place via the setter +// registered by the trace starter, returning true if a setter was available. +// Returning false tells the caller it must fall back to a full trace restart +// (typically because no trace is currently running). +func (r *runtimeBindings) applyLiveFilter(filter globalfilter.Filter) bool { + r.mu.RLock() + setter := r.liveFilterSetter + r.mu.RUnlock() + if setter == nil { + return false + } + setter(filter) + return true +} + func (r *runtimeBindings) dashboardSnapshotSource() SnapshotSource { r.mu.RLock() defer r.mu.RUnlock() @@ -915,6 +942,20 @@ func (m Model) applyGlobalFilter(filter globalfilter.Filter, action string) (tea } m.runtime.advanceFilterEpoch() + // Try the in-place swap first: hand the new filter to the running + // eventloop via the registered setter and only reset the dashboard + // aggregates so the displayed counts reflect the new filter going + // forward. The BPF probes stay attached, so the user no longer sees + // the multi-second 'Attaching tracepoints' overlay on filter changes. + if m.runtime.applyLiveFilter(nextFilter) { + m.dashboard.PrepareForTraceRestart() + m.lastErr = nil + return m, nil + } + + // Fallback: no trace currently running (e.g. first invocation), so + // restart the pipeline so the new filter takes effect on the next + // trace start. m.stopTrace() m.dashboard.PrepareForTraceRestart() m.attaching = true @@ -937,6 +978,13 @@ func (m Model) undoGlobalFilter() (tea.Model, tea.Cmd) { } m.runtime.advanceFilterEpoch() + // Same in-place swap path as applyGlobalFilter — see comment there. + if m.runtime.applyLiveFilter(prev) { + m.dashboard.PrepareForTraceRestart() + m.lastErr = nil + return m, nil + } + m.stopTrace() m.dashboard.PrepareForTraceRestart() m.attaching = true |
