From 16e413799363871c1efd73527fba299dfdfadfd3 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 13 May 2026 19:42:30 +0300 Subject: refactor: extract outputFormatter collaborator from eventLoop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The printCb and warningCb function fields on eventLoop bundled two distinct concerns (pair emission and warning delivery) directly on the event-processing struct. This commit extracts them into a dedicated outputFormatter type that owns these callbacks plus emit() and notifyWarning() helper methods. outputFormatter is embedded (not pointed-to) in eventLoop so that existing call sites — including tests that write el.printCb = ... and el.warningCb = ... directly — require no changes beyond the three struct-literal sites in eventloop_filter_test.go that used field initialiser syntax. fdTracker and commResolver were already proper collaborator types; only the output concern needed extraction. Co-Authored-By: Claude Sonnet 4.6 --- internal/eventloop.go | 15 +++++++------ internal/eventloop_exit.go | 12 +++++------ internal/eventloop_filter_test.go | 26 ++++++++++++++--------- internal/eventloop_output.go | 44 +++++++++++++++++++++++++++++++++++++++ internal/eventloop_runtime.go | 9 +++++--- 5 files changed, 80 insertions(+), 26 deletions(-) create mode 100644 internal/eventloop_output.go (limited to 'internal') diff --git a/internal/eventloop.go b/internal/eventloop.go index fc82ee4..0fe327c 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -48,9 +48,8 @@ type eventLoop struct { pendingHandles *pendingHandleTracker // TID → pathname from name_to_handle_at, for open_by_handle_at correlation fdTracker *fdTracker // fd table and procfs resolution cache commResolver *commResolver + outputFormatter // pair-emission and warning-notification callbacks (embedded collaborator) rawHandlers map[types.EventType]rawEventHandler - printCb func(ep *event.Pair) // Callback to print the event - warningCb func(message string) // Optional callback for non-fatal event processing warnings cfg eventLoopConfig // Statistics @@ -92,10 +91,14 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { pendingHandles: newPendingHandleTracker(), fdTracker: fdState, commResolver: commState, - rawHandlers: make(map[types.EventType]rawEventHandler), - printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, - cfg: cfg, - done: make(chan struct{}), + // Default printCb prints each pair to stdout then recycles it; callers + // (e.g. TUI, headless-parquet) replace this via configureEventLoopOutput. + outputFormatter: outputFormatter{ + printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, + }, + rawHandlers: make(map[types.EventType]rawEventHandler), + cfg: cfg, + done: make(chan struct{}), } el.SetFilter(cfg.filter) el.initRawHandlers() diff --git a/internal/eventloop_exit.go b/internal/eventloop_exit.go index 5f6442c..e4ae6eb 100644 --- a/internal/eventloop_exit.go +++ b/internal/eventloop_exit.go @@ -291,18 +291,16 @@ func (e *eventLoop) registerDup(fdFile *file.FdFile, newFd int32, extraFlags int e.fdState().set(newFd, duppedFdFile) } +// recyclePair notifies about the problem described by warning, then returns ep +// to the pool. It is a convenience helper used throughout the exit handlers to +// keep the error path concise. func (e *eventLoop) recyclePair(ep *event.Pair, warning string) { e.notifyWarning(warning) ep.Recycle() } -func (e *eventLoop) notifyWarning(message string) { - if e.warningCb == nil || message == "" { - return - } - e.warningCb(message) -} - +// dropMalformedRawEvent records a warning when a raw BPF event cannot be +// decoded, keeping the error visible without crashing the event loop. func (e *eventLoop) dropMalformedRawEvent(evType types.EventType, raw []byte) { e.notifyWarning(fmt.Sprintf("Dropped malformed raw event type %d (len=%d)", evType, len(raw))) } diff --git a/internal/eventloop_filter_test.go b/internal/eventloop_filter_test.go index 930312c..9d978e4 100644 --- a/internal/eventloop_filter_test.go +++ b/internal/eventloop_filter_test.go @@ -454,10 +454,12 @@ func TestCommFilterToggle(t *testing.T) { fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), cfg: eventLoopConfig{synchronousRawProcessing: true}, - printCb: func(ep *event.Pair) { - next := synchronizedPair{pair: ep, ack: make(chan struct{})} - outCh <- next - <-next.ack + outputFormatter: outputFormatter{ + printCb: func(ep *event.Pair) { + next := synchronizedPair{pair: ep, ack: make(chan struct{})} + outCh <- next + <-next.ack + }, }, done: make(chan struct{}), } @@ -495,10 +497,12 @@ func TestCommFilterToggle(t *testing.T) { fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), cfg: eventLoopConfig{synchronousRawProcessing: true}, - printCb: func(ep *event.Pair) { - next := synchronizedPair{pair: ep, ack: make(chan struct{})} - outCh <- next - <-next.ack + outputFormatter: outputFormatter{ + printCb: func(ep *event.Pair) { + next := synchronizedPair{pair: ep, ack: make(chan struct{})} + outCh <- next + <-next.ack + }, }, done: make(chan struct{}), } @@ -531,8 +535,10 @@ func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop { fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), cfg: eventLoopConfig{synchronousRawProcessing: true}, - printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, - done: make(chan struct{}), + outputFormatter: outputFormatter{ + printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, + }, + done: make(chan struct{}), } el.SetFilter(testFilter(commFilter, pathFilter)) return el diff --git a/internal/eventloop_output.go b/internal/eventloop_output.go new file mode 100644 index 0000000..9a5a0f9 --- /dev/null +++ b/internal/eventloop_output.go @@ -0,0 +1,44 @@ +package internal + +import "ior/internal/event" + +// outputFormatter bundles the pair-emission and warning-notification callbacks +// used by the event loop. Extracting these two concerns into a dedicated type +// separates "what to do with a completed event pair" and "how to report +// non-fatal problems" from the core event-matching and FD-tracking logic. +// +// The struct is embedded (not pointed-to) inside eventLoop so that existing +// call sites — including tests — can still write e.printCb = ... and +// e.warningCb = ... without any changes. +type outputFormatter struct { + // printCb is called for each completed, filter-passing event pair. + // The callback owns the pair after the call: it must either recycle it + // (ep.Recycle) or hand it off to another owner. + printCb func(ep *event.Pair) + + // warningCb is an optional callback for non-fatal event-processing + // warnings (e.g. malformed events, unresolved comms). nil means silent. + warningCb func(message string) +} + +// emit invokes printCb for the given pair, falling back to a safe recycle-only +// callback when printCb has not been set. This prevents a nil-pointer dereference +// during early initialisation or in tests that do not configure printCb. +func (f *outputFormatter) emit(ep *event.Pair) { + if f.printCb != nil { + f.printCb(ep) + return + } + // Fallback: recycle the pair so it is not leaked even when no callback is wired. + ep.Recycle() +} + +// notifyWarning delivers message to warningCb if one is registered and the +// message is non-empty. Silently drops the message otherwise so callers do not +// need to guard every warning site. +func (f *outputFormatter) notifyWarning(message string) { + if f.warningCb == nil || message == "" { + return + } + f.warningCb(message) +} diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index 01bc798..5addd46 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -23,6 +23,8 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { } e.startTime = time.Now() + // emit() already handles a nil printCb safely, but guard here so that + // hot-path event emission never pays for a nil check inside the loop. if e.printCb == nil { e.printCb = func(ep *event.Pair) { ep.Recycle() } } @@ -32,7 +34,7 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { return } for ep := range e.events(ctx, rawCh) { - e.printCb(ep) + e.emit(ep) e.numSyscallsAfterFilter++ } } @@ -58,12 +60,13 @@ func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { } } -// drainPairs consumes all immediately available pairs from the buffered channel. +// drainPairs consumes all immediately available pairs from the buffered channel, +// routing each completed pair through the outputFormatter. func (e *eventLoop) drainPairs(pairs <-chan *event.Pair) { for { select { case ep := <-pairs: - e.printCb(ep) + e.emit(ep) e.numSyscallsAfterFilter++ default: return -- cgit v1.2.3