summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/eventloop.go15
-rw-r--r--internal/eventloop_exit.go12
-rw-r--r--internal/eventloop_filter_test.go26
-rw-r--r--internal/eventloop_output.go44
-rw-r--r--internal/eventloop_runtime.go9
5 files changed, 80 insertions, 26 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go
index fc82ee4..0fe327c 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -48,9 +48,8 @@ type eventLoop struct {
pendingHandles *pendingHandleTracker // TID → pathname from name_to_handle_at, for open_by_handle_at correlation
fdTracker *fdTracker // fd table and procfs resolution cache
commResolver *commResolver
+ outputFormatter // pair-emission and warning-notification callbacks (embedded collaborator)
rawHandlers map[types.EventType]rawEventHandler
- printCb func(ep *event.Pair) // Callback to print the event
- warningCb func(message string) // Optional callback for non-fatal event processing warnings
cfg eventLoopConfig
// Statistics
@@ -92,10 +91,14 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) {
pendingHandles: newPendingHandleTracker(),
fdTracker: fdState,
commResolver: commState,
- rawHandlers: make(map[types.EventType]rawEventHandler),
- printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() },
- cfg: cfg,
- done: make(chan struct{}),
+ // Default printCb prints each pair to stdout then recycles it; callers
+ // (e.g. TUI, headless-parquet) replace this via configureEventLoopOutput.
+ outputFormatter: outputFormatter{
+ printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() },
+ },
+ rawHandlers: make(map[types.EventType]rawEventHandler),
+ cfg: cfg,
+ done: make(chan struct{}),
}
el.SetFilter(cfg.filter)
el.initRawHandlers()
diff --git a/internal/eventloop_exit.go b/internal/eventloop_exit.go
index 5f6442c..e4ae6eb 100644
--- a/internal/eventloop_exit.go
+++ b/internal/eventloop_exit.go
@@ -291,18 +291,16 @@ func (e *eventLoop) registerDup(fdFile *file.FdFile, newFd int32, extraFlags int
e.fdState().set(newFd, duppedFdFile)
}
+// recyclePair notifies about the problem described by warning, then returns ep
+// to the pool. It is a convenience helper used throughout the exit handlers to
+// keep the error path concise.
func (e *eventLoop) recyclePair(ep *event.Pair, warning string) {
e.notifyWarning(warning)
ep.Recycle()
}
-func (e *eventLoop) notifyWarning(message string) {
- if e.warningCb == nil || message == "" {
- return
- }
- e.warningCb(message)
-}
-
+// dropMalformedRawEvent records a warning when a raw BPF event cannot be
+// decoded, keeping the error visible without crashing the event loop.
func (e *eventLoop) dropMalformedRawEvent(evType types.EventType, raw []byte) {
e.notifyWarning(fmt.Sprintf("Dropped malformed raw event type %d (len=%d)", evType, len(raw)))
}
diff --git a/internal/eventloop_filter_test.go b/internal/eventloop_filter_test.go
index 930312c..9d978e4 100644
--- a/internal/eventloop_filter_test.go
+++ b/internal/eventloop_filter_test.go
@@ -454,10 +454,12 @@ func TestCommFilterToggle(t *testing.T) {
fdTracker: newFDTracker(make(map[int32]file.File)),
commResolver: newCommResolver(make(map[uint32]string)),
cfg: eventLoopConfig{synchronousRawProcessing: true},
- printCb: func(ep *event.Pair) {
- next := synchronizedPair{pair: ep, ack: make(chan struct{})}
- outCh <- next
- <-next.ack
+ outputFormatter: outputFormatter{
+ printCb: func(ep *event.Pair) {
+ next := synchronizedPair{pair: ep, ack: make(chan struct{})}
+ outCh <- next
+ <-next.ack
+ },
},
done: make(chan struct{}),
}
@@ -495,10 +497,12 @@ func TestCommFilterToggle(t *testing.T) {
fdTracker: newFDTracker(make(map[int32]file.File)),
commResolver: newCommResolver(make(map[uint32]string)),
cfg: eventLoopConfig{synchronousRawProcessing: true},
- printCb: func(ep *event.Pair) {
- next := synchronizedPair{pair: ep, ack: make(chan struct{})}
- outCh <- next
- <-next.ack
+ outputFormatter: outputFormatter{
+ printCb: func(ep *event.Pair) {
+ next := synchronizedPair{pair: ep, ack: make(chan struct{})}
+ outCh <- next
+ <-next.ack
+ },
},
done: make(chan struct{}),
}
@@ -531,8 +535,10 @@ func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop {
fdTracker: newFDTracker(make(map[int32]file.File)),
commResolver: newCommResolver(make(map[uint32]string)),
cfg: eventLoopConfig{synchronousRawProcessing: true},
- printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() },
- done: make(chan struct{}),
+ outputFormatter: outputFormatter{
+ printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() },
+ },
+ done: make(chan struct{}),
}
el.SetFilter(testFilter(commFilter, pathFilter))
return el
diff --git a/internal/eventloop_output.go b/internal/eventloop_output.go
new file mode 100644
index 0000000..9a5a0f9
--- /dev/null
+++ b/internal/eventloop_output.go
@@ -0,0 +1,44 @@
+package internal
+
+import "ior/internal/event"
+
+// outputFormatter bundles the pair-emission and warning-notification callbacks
+// used by the event loop. Extracting these two concerns into a dedicated type
+// separates "what to do with a completed event pair" and "how to report
+// non-fatal problems" from the core event-matching and FD-tracking logic.
+//
+// The struct is embedded (not pointed-to) inside eventLoop so that existing
+// call sites — including tests — can still write e.printCb = ... and
+// e.warningCb = ... without any changes.
+type outputFormatter struct {
+ // printCb is called for each completed, filter-passing event pair.
+ // The callback owns the pair after the call: it must either recycle it
+ // (ep.Recycle) or hand it off to another owner.
+ printCb func(ep *event.Pair)
+
+ // warningCb is an optional callback for non-fatal event-processing
+ // warnings (e.g. malformed events, unresolved comms). nil means silent.
+ warningCb func(message string)
+}
+
+// emit invokes printCb for the given pair, falling back to a safe recycle-only
+// callback when printCb has not been set. This prevents a nil-pointer dereference
+// during early initialisation or in tests that do not configure printCb.
+func (f *outputFormatter) emit(ep *event.Pair) {
+ if f.printCb != nil {
+ f.printCb(ep)
+ return
+ }
+ // Fallback: recycle the pair so it is not leaked even when no callback is wired.
+ ep.Recycle()
+}
+
+// notifyWarning delivers message to warningCb if one is registered and the
+// message is non-empty. Silently drops the message otherwise so callers do not
+// need to guard every warning site.
+func (f *outputFormatter) notifyWarning(message string) {
+ if f.warningCb == nil || message == "" {
+ return
+ }
+ f.warningCb(message)
+}
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go
index 01bc798..5addd46 100644
--- a/internal/eventloop_runtime.go
+++ b/internal/eventloop_runtime.go
@@ -23,6 +23,8 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) {
}
e.startTime = time.Now()
+ // emit() already handles a nil printCb safely, but guard here so that
+ // hot-path event emission never pays for a nil check inside the loop.
if e.printCb == nil {
e.printCb = func(ep *event.Pair) { ep.Recycle() }
}
@@ -32,7 +34,7 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) {
return
}
for ep := range e.events(ctx, rawCh) {
- e.printCb(ep)
+ e.emit(ep)
e.numSyscallsAfterFilter++
}
}
@@ -58,12 +60,13 @@ func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) {
}
}
-// drainPairs consumes all immediately available pairs from the buffered channel.
+// drainPairs consumes all immediately available pairs from the buffered channel,
+// routing each completed pair through the outputFormatter.
func (e *eventLoop) drainPairs(pairs <-chan *event.Pair) {
for {
select {
case ep := <-pairs:
- e.printCb(ep)
+ e.emit(ep)
e.numSyscallsAfterFilter++
default:
return