summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/eventloop.go27
-rw-r--r--internal/eventloop_exit.go8
-rw-r--r--internal/eventloop_filter_test.go10
-rw-r--r--internal/eventloop_runtime.go8
-rw-r--r--internal/ior.go16
-rw-r--r--internal/ior_mode_test.go83
-rw-r--r--internal/tui/tui.go64
7 files changed, 192 insertions, 24 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go
index 87f99ed..fc82ee4 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -2,6 +2,7 @@ package internal
import (
"fmt"
+ "sync/atomic"
"time"
"ior/internal/event"
@@ -38,7 +39,11 @@ type eventLoopConfig struct {
type rawEventHandler func(raw []byte, ch chan<- *event.Pair)
type eventLoop struct {
- filter globalfilter.Filter
+ // filterPtr holds the active global filter. Stored as atomic.Pointer so
+ // the TUI can swap filters in place via SetFilter without tearing down
+ // and reattaching the BPF probes (the previous behavior caused a multi-
+ // second 'Attaching tracepoints' overlay every time the filter changed).
+ filterPtr atomic.Pointer[globalfilter.Filter]
pairs pairTracker // enter/exit pairing state and inter-syscall duration tracking
pendingHandles *pendingHandleTracker // TID → pathname from name_to_handle_at, for open_by_handle_at correlation
fdTracker *fdTracker // fd table and procfs resolution cache
@@ -57,6 +62,24 @@ type eventLoop struct {
done chan struct{}
}
+// Filter returns a snapshot of the currently active global filter. Each call
+// loads a single atomic pointer and returns the underlying value, so the
+// caller observes a consistent filter even if SetFilter races concurrently.
+func (e *eventLoop) Filter() globalfilter.Filter {
+ if p := e.filterPtr.Load(); p != nil {
+ return *p
+ }
+ return globalfilter.Filter{}
+}
+
+// SetFilter atomically replaces the active global filter. The replacement is
+// cloned so the caller can keep mutating its own filter without affecting
+// what the eventloop sees.
+func (e *eventLoop) SetFilter(filter globalfilter.Filter) {
+ cloned := filter.Clone()
+ e.filterPtr.Store(&cloned)
+}
+
func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) {
fdState := configuredFDTracker(cfg.fdTracker)
commState := configuredCommResolver(cfg.commResolver)
@@ -65,7 +88,6 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) {
}
el := &eventLoop{
- filter: cfg.filter.Clone(),
pairs: newPairTracker(),
pendingHandles: newPendingHandleTracker(),
fdTracker: fdState,
@@ -75,6 +97,7 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) {
cfg: cfg,
done: make(chan struct{}),
}
+ el.SetFilter(cfg.filter)
el.initRawHandlers()
el.configureOutputCallback()
el.seedTrackedPidComm()
diff --git a/internal/eventloop_exit.go b/internal/eventloop_exit.go
index e97688a..5f6442c 100644
--- a/internal/eventloop_exit.go
+++ b/internal/eventloop_exit.go
@@ -111,7 +111,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *types.FdEvent) bool {
}
}
ep.Comm = e.comm(fdEv.GetTid())
- if !e.filter.MatchPair(ep) {
+ if !e.Filter().MatchPair(ep) {
ep.Recycle()
return false
}
@@ -152,7 +152,7 @@ func (e *eventLoop) handleDup3Exit(ep *event.Pair, dup3Ev *types.Dup3Event) bool
fd := int32(dup3Ev.Fd)
ep.File = e.fdState().resolve(fd, dup3Ev.Pid)
ep.Comm = e.comm(dup3Ev.GetTid())
- if !e.filter.MatchPair(ep) {
+ if !e.Filter().MatchPair(ep) {
ep.Recycle()
return false
}
@@ -233,7 +233,7 @@ func (e *eventLoop) handleNullExit(ep *event.Pair, nullEv *types.NullEvent) bool
}
}
ep.Comm = e.comm(nullEv.GetTid())
- if !e.filter.MatchPair(ep) {
+ if !e.Filter().MatchPair(ep) {
ep.Recycle()
return false
}
@@ -244,7 +244,7 @@ func (e *eventLoop) handleFcntlExit(ep *event.Pair, fcntlEv *types.FcntlEvent) b
ep.Comm = e.comm(fcntlEv.GetTid())
fd := int32(fcntlEv.Fd)
ep.File = e.fdState().resolve(fd, fcntlEv.Pid)
- if !e.filter.MatchPair(ep) {
+ if !e.Filter().MatchPair(ep) {
ep.Recycle()
return false
}
diff --git a/internal/eventloop_filter_test.go b/internal/eventloop_filter_test.go
index 4e45060..930312c 100644
--- a/internal/eventloop_filter_test.go
+++ b/internal/eventloop_filter_test.go
@@ -450,7 +450,6 @@ func TestCommFilterToggle(t *testing.T) {
// Create eventloop without comm filter
el := &eventLoop{
- filter: globalfilter.Filter{},
pairs: newPairTracker(),
fdTracker: newFDTracker(make(map[int32]file.File)),
commResolver: newCommResolver(make(map[uint32]string)),
@@ -462,6 +461,7 @@ func TestCommFilterToggle(t *testing.T) {
},
done: make(chan struct{}),
}
+ el.SetFilter(globalfilter.Filter{})
go el.run(ctx, inCh)
go func() {
@@ -491,9 +491,6 @@ func TestCommFilterToggle(t *testing.T) {
// Create eventloop with comm filter enabled
el := &eventLoop{
- filter: globalfilter.Filter{
- Comm: &globalfilter.StringFilter{Pattern: "test"},
- },
pairs: newPairTracker(),
fdTracker: newFDTracker(make(map[int32]file.File)),
commResolver: newCommResolver(make(map[uint32]string)),
@@ -505,6 +502,9 @@ func TestCommFilterToggle(t *testing.T) {
},
done: make(chan struct{}),
}
+ el.SetFilter(globalfilter.Filter{
+ Comm: &globalfilter.StringFilter{Pattern: "test"},
+ })
go el.run(ctx, inCh)
go func() {
@@ -527,7 +527,6 @@ func TestCommFilterToggle(t *testing.T) {
func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop {
el := &eventLoop{
- filter: testFilter(commFilter, pathFilter),
pairs: newPairTracker(),
fdTracker: newFDTracker(make(map[int32]file.File)),
commResolver: newCommResolver(make(map[uint32]string)),
@@ -535,6 +534,7 @@ func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop {
printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() },
done: make(chan struct{}),
}
+ el.SetFilter(testFilter(commFilter, pathFilter))
return el
}
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go
index 7f540ec..85a90a1 100644
--- a/internal/eventloop_runtime.go
+++ b/internal/eventloop_runtime.go
@@ -123,7 +123,7 @@ func (e *eventLoop) initRawHandlers() {
if !ok {
return
}
- if e.filter.MatchOpenEvent(openEv) {
+ if e.Filter().MatchOpenEvent(openEv) {
e.tracepointEntered(openEv)
}
}
@@ -174,7 +174,7 @@ func (e *eventLoop) initRawHandlers() {
if !ok {
return
}
- if e.filter.MatchNameEvent(nameEv) {
+ if e.Filter().MatchNameEvent(nameEv) {
e.tracepointEntered(nameEv)
}
}
@@ -183,7 +183,7 @@ func (e *eventLoop) initRawHandlers() {
if !ok {
return
}
- if e.filter.MatchPathEvent(pathEv) {
+ if e.Filter().MatchPathEvent(pathEv) {
e.tracepointEntered(pathEv)
}
}
@@ -223,7 +223,7 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) {
tid := enterEv.GetTid()
// Schedule comm lookup as early as possible to reduce races for short-lived processes.
e.queueCommLookup(tid)
- if !e.filter.UsesCommFilter() {
+ if !e.Filter().UsesCommFilter() {
e.pairs.set(enterEv)
return
}
diff --git a/internal/ior.go b/internal/ior.go
index 63da223..354267b 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -228,8 +228,16 @@ func tuiTraceStarterFromRunTrace(
go func() {
err := startTrace(ctx, cfg, startedCh, func(el *eventLoop) {
+ // Seed the eventloop's filter from the config so subsequent
+ // reads via el.Filter() see the same filter the trace was
+ // started with. The TUI can later replace it in place via
+ // runtimeBindings.applyLiveFilter, which calls el.SetFilter.
+ el.SetFilter(cfg.GlobalFilter)
+ // Read the live filter on each event so the TUI can swap
+ // filters in place via runtimeBindings.applyLiveFilter
+ // without restarting the BPF probes.
el.printCb = func(ep *event.Pair) {
- if !shouldIngestTracePair(cfg.GlobalFilter, ep) {
+ if !shouldIngestTracePair(el.Filter(), ep) {
ep.Recycle()
return
}
@@ -253,7 +261,13 @@ func tuiTraceStarterFromRunTrace(
el.warningCb = func(message string) {
streamBuf.Push(eventstream.NewWarningEvent(streamSeq.Next(), message))
}
+ if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok {
+ bindings.SetLiveFilterSetter(el.SetFilter)
+ }
})
+ if bindings, ok := tui.RuntimeBindingsFromContext(ctx); ok {
+ bindings.SetLiveFilterSetter(nil)
+ }
errCh <- err
close(errCh)
}()
diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go
index ab0466d..876b374 100644
--- a/internal/ior_mode_test.go
+++ b/internal/ior_mode_test.go
@@ -8,6 +8,7 @@ import (
"io"
"os"
"path/filepath"
+ "sync"
"testing"
"testing/synctest"
"time"
@@ -832,12 +833,82 @@ func TestTuiTraceStarterFromRunTracePersistsRecorderAcrossRestarts(t *testing.T)
}
}
+// TestTuiTraceStarterAppliesLiveFilterSwapInPlace exercises the in-place
+// filter swap path: after the trace is running, calling the registered
+// SetLiveFilterSetter callback should change which events the eventloop's
+// printCb admits, without any restart of the trace pipeline.
+func TestTuiTraceStarterAppliesLiveFilterSwapInPlace(t *testing.T) {
+ bindings := &traceRuntimeBindingsStub{
+ streamBuffer: eventstream.NewRingBuffer(),
+ streamSeq: eventstream.NewSequencer(0),
+ }
+ base := flags.NewFlags()
+ base.GlobalFilter = globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "keep"}}
+
+ // release lets the test hold the fake starter open while assertions
+ // run. In production, startTrace blocks on el.run for the trace's
+ // lifetime, so the runtime bindings keep their live filter setter
+ // registered the whole time. Returning immediately would race against
+ // the trace starter's deferred SetLiveFilterSetter(nil) cleanup.
+ release := make(chan struct{})
+ captured := make(chan *eventLoop, 1)
+ starter := tuiTraceStarterFromRunTrace(
+ base,
+ func(_ context.Context, _ flags.Config, started chan<- struct{}, configure func(*eventLoop)) error {
+ el := &eventLoop{}
+ configure(el)
+ captured <- el
+ close(started)
+ <-release
+ return nil
+ },
+ )
+
+ ctx := tui.ContextWithRuntimeBindings(context.Background(), bindings)
+ starterErr := make(chan error, 1)
+ go func() { starterErr <- starter(ctx) }()
+
+ el := <-captured
+
+ // Initial filter from base.GlobalFilter accepts only comm=="keep".
+ el.printCb(testTracePair(1, "keep"))
+ el.printCb(testTracePair(2, "drop"))
+ if got := bindings.streamBuffer.Len(); got != 1 {
+ t.Fatalf("stream rows after initial filter = %d, want 1", got)
+ }
+
+ // Trace starter must have registered an in-place filter setter.
+ setter := bindings.currentLiveFilterSetter()
+ if setter == nil {
+ t.Fatalf("expected SetLiveFilterSetter to receive a non-nil callback")
+ }
+
+ // Swap to a filter that accepts only comm=="drop". No restart should
+ // happen — the same eventloop now emits the previously-dropped events.
+ setter(globalfilter.Filter{Comm: &globalfilter.StringFilter{Pattern: "drop"}})
+ el.printCb(testTracePair(3, "keep"))
+ el.printCb(testTracePair(4, "drop"))
+ if got := bindings.streamBuffer.Len(); got != 2 {
+ t.Fatalf("stream rows after live swap = %d, want 2", got)
+ }
+
+ close(release)
+ if err := <-starterErr; err != nil {
+ t.Fatalf("starter() error = %v", err)
+ }
+}
+
type traceRuntimeBindingsStub struct {
streamBuffer *eventstream.RingBuffer
streamSource eventstream.Source
streamSeq *eventstream.Sequencer
recorder *parquet.Recorder
filterEpoch uint64
+ // mu guards liveFilterSetter, which is mutated from the trace-starter
+ // goroutine (via SetLiveFilterSetter) and read from the test
+ // goroutine when invoking the in-place swap.
+ mu sync.Mutex
+ liveFilterSetter func(globalfilter.Filter)
}
func (b *traceRuntimeBindingsStub) SetDashboardSnapshotSource(tui.SnapshotSource) {}
@@ -850,6 +921,18 @@ func (b *traceRuntimeBindingsStub) SetLiveTrie(flamegraphtui.LiveTrieSource) {}
func (b *traceRuntimeBindingsStub) SetProbeManager(tui.ProbeManager) {}
+func (b *traceRuntimeBindingsStub) SetLiveFilterSetter(setter func(globalfilter.Filter)) {
+ b.mu.Lock()
+ b.liveFilterSetter = setter
+ b.mu.Unlock()
+}
+
+func (b *traceRuntimeBindingsStub) currentLiveFilterSetter() func(globalfilter.Filter) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ return b.liveFilterSetter
+}
+
func (b *traceRuntimeBindingsStub) StreamBuffer() eventstream.Source {
return b.streamBuffer
}
diff --git a/internal/tui/tui.go b/internal/tui/tui.go
index 3a866e5..42441a1 100644
--- a/internal/tui/tui.go
+++ b/internal/tui/tui.go
@@ -65,6 +65,11 @@ type RuntimePublisher interface {
SetEventStreamSource(source eventstream.Source)
SetLiveTrie(liveTrie flamegraphtui.LiveTrieSource)
SetProbeManager(manager ProbeManager)
+ // SetLiveFilterSetter registers (or, with nil, unregisters) a callback that
+ // applies a new global filter to the running trace pipeline in place. The
+ // trace starter passes its eventloop's SetFilter; the TUI calls it on every
+ // filter change to avoid restarting the BPF probes.
+ SetLiveFilterSetter(setter func(globalfilter.Filter))
}
// RuntimeState is the read side of the TUI runtime contract.
@@ -89,14 +94,15 @@ type traceFiltersContextKey struct{}
type runtimeBindings struct {
mu sync.RWMutex
- snapshotSource SnapshotSource
- streamSource eventstream.Source
- streamBuffer *eventstream.RingBuffer
- streamSeq *eventstream.Sequencer
- recorder *parquet.Recorder
- liveTrieSource flamegraphtui.LiveTrieSource
- probeManager ProbeManager
- filterEpoch atomic.Uint64
+ snapshotSource SnapshotSource
+ streamSource eventstream.Source
+ streamBuffer *eventstream.RingBuffer
+ streamSeq *eventstream.Sequencer
+ recorder *parquet.Recorder
+ liveTrieSource flamegraphtui.LiveTrieSource
+ probeManager ProbeManager
+ liveFilterSetter func(globalfilter.Filter)
+ filterEpoch atomic.Uint64
}
type traceFilters struct {
@@ -159,6 +165,27 @@ func (r *runtimeBindings) SetProbeManager(manager ProbeManager) {
r.mu.Unlock()
}
+func (r *runtimeBindings) SetLiveFilterSetter(setter func(globalfilter.Filter)) {
+ r.mu.Lock()
+ r.liveFilterSetter = setter
+ r.mu.Unlock()
+}
+
+// applyLiveFilter swaps the active global filter in place via the setter
+// registered by the trace starter, returning true if a setter was available.
+// Returning false tells the caller it must fall back to a full trace restart
+// (typically because no trace is currently running).
+func (r *runtimeBindings) applyLiveFilter(filter globalfilter.Filter) bool {
+ r.mu.RLock()
+ setter := r.liveFilterSetter
+ r.mu.RUnlock()
+ if setter == nil {
+ return false
+ }
+ setter(filter)
+ return true
+}
+
func (r *runtimeBindings) dashboardSnapshotSource() SnapshotSource {
r.mu.RLock()
defer r.mu.RUnlock()
@@ -915,6 +942,20 @@ func (m Model) applyGlobalFilter(filter globalfilter.Filter, action string) (tea
}
m.runtime.advanceFilterEpoch()
+ // Try the in-place swap first: hand the new filter to the running
+ // eventloop via the registered setter and only reset the dashboard
+ // aggregates so the displayed counts reflect the new filter going
+ // forward. The BPF probes stay attached, so the user no longer sees
+ // the multi-second 'Attaching tracepoints' overlay on filter changes.
+ if m.runtime.applyLiveFilter(nextFilter) {
+ m.dashboard.PrepareForTraceRestart()
+ m.lastErr = nil
+ return m, nil
+ }
+
+ // Fallback: no trace currently running (e.g. first invocation), so
+ // restart the pipeline so the new filter takes effect on the next
+ // trace start.
m.stopTrace()
m.dashboard.PrepareForTraceRestart()
m.attaching = true
@@ -937,6 +978,13 @@ func (m Model) undoGlobalFilter() (tea.Model, tea.Cmd) {
}
m.runtime.advanceFilterEpoch()
+ // Same in-place swap path as applyGlobalFilter — see comment there.
+ if m.runtime.applyLiveFilter(prev) {
+ m.dashboard.PrepareForTraceRestart()
+ m.lastErr = nil
+ return m, nil
+ }
+
m.stopTrace()
m.dashboard.PrepareForTraceRestart()
m.attaching = true