From 534d55c47fc29d1089cb5c2c9c4832461e762603 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 8 Mar 2026 11:38:29 +0200 Subject: tests: replace eventloop sleep synchronization with deterministic signaling --- internal/eventloop.go | 41 +++++++++++- internal/eventloop_commresolver_test.go | 11 +++- internal/eventloop_constructor_test.go | 3 + internal/eventloop_filter_test.go | 86 +++++++++++++++++--------- internal/eventloop_test.go | 106 +++++++++++++++++--------------- 5 files changed, 164 insertions(+), 83 deletions(-) (limited to 'internal') diff --git a/internal/eventloop.go b/internal/eventloop.go index 2c3cf49..6500508 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -32,8 +32,11 @@ type eventLoopConfig struct { countField string pprofEnable bool plainMode bool - fdTracker *fdTracker - commResolver *commResolver + // synchronousRawProcessing keeps raw decode and callback emission in a + // single goroutine for deterministic test execution. + synchronousRawProcessing bool + fdTracker *fdTracker + commResolver *commResolver } type fdTracker struct { @@ -385,12 +388,46 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { if e.printCb == nil { e.printCb = func(ep *event.Pair) { ep.Recycle() } } + if e.cfg.synchronousRawProcessing { + e.runSynchronously(ctx, rawCh) + return + } for ep := range e.events(ctx, rawCh) { e.printCb(ep) e.numSyscallsAfterFilter++ } } +func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { + pairs := make(chan *event.Pair, 1) + + for { + select { + case raw, ok := <-rawCh: + if !ok { + return + } + if len(raw) == 0 { + continue + } + e.processRawEvent(raw, pairs) + for { + select { + case ep := <-pairs: + e.printCb(ep) + e.numSyscallsAfterFilter++ + default: + goto nextRaw + } + } + case <-ctx.Done(): + fmt.Println("Stopping event loop") + return + } + nextRaw: + } +} + func (e *eventLoop) events(ctx context.Context, rawCh <-chan []byte) <-chan *event.Pair { ch := make(chan *event.Pair) diff --git a/internal/eventloop_commresolver_test.go b/internal/eventloop_commresolver_test.go index d04a05f..4d3b193 100644 --- a/internal/eventloop_commresolver_test.go +++ b/internal/eventloop_commresolver_test.go @@ -241,14 +241,19 @@ func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { func waitForCondition(t *testing.T, timeout time.Duration, message string, fn func() bool) { t.Helper() - deadline := time.Now().Add(timeout) + timer := time.NewTimer(timeout) + ticker := time.NewTicker(10 * time.Millisecond) + defer timer.Stop() + defer ticker.Stop() + for { if fn() { return } - if time.Now().After(deadline) { + select { + case <-timer.C: t.Fatal(message) + case <-ticker.C: } - time.Sleep(10 * time.Millisecond) } } diff --git a/internal/eventloop_constructor_test.go b/internal/eventloop_constructor_test.go index 52ca570..6636838 100644 --- a/internal/eventloop_constructor_test.go +++ b/internal/eventloop_constructor_test.go @@ -9,6 +9,9 @@ import ( func mustNewEventLoop(tb testing.TB, cfg eventLoopConfig) *eventLoop { tb.Helper() + if _, isBenchmark := tb.(*testing.B); !isBenchmark { + cfg.synchronousRawProcessing = true + } el, err := newEventLoop(cfg) if err != nil { tb.Fatalf("newEventLoop() error = %v", err) diff --git a/internal/eventloop_filter_test.go b/internal/eventloop_filter_test.go index d0b7933..b31f39c 100644 --- a/internal/eventloop_filter_test.go +++ b/internal/eventloop_filter_test.go @@ -19,32 +19,39 @@ func TestCommPropagation(t *testing.T) { defer cancel() inCh := make(chan []byte) - outCh := make(chan *event.Pair) + outCh := make(chan synchronizedPair) el := mustNewEventLoop(t, eventLoopConfig{}) - el.printCb = func(ev *event.Pair) { outCh <- ev } + el.printCb = func(ev *event.Pair) { + next := synchronizedPair{pair: ev, ack: make(chan struct{})} + outCh <- next + <-next.ack + } go el.run(ctx, inCh) go func() { + defer close(inCh) for _, raw := range td.rawTracepoints { t.Log("Sending raw tracepoint", raw, "simulating BPF sending this") inCh <- raw - // Small delay to simulate real BPF event timing - time.Sleep(time.Microsecond) } }() for _, validate := range td.validates { - ep := <-outCh - t.Log("Received", ep) - validate(t, el, ep) + next := <-outCh + func() { + defer close(next.ack) + ep := next.pair + t.Log("Received", ep) + validate(t, el, ep) + }() } - // Give a small delay to ensure any unexpected events would have arrived - time.Sleep(10 * time.Millisecond) + waitForEventLoopDone(t, el, 250*time.Millisecond) select { - case x := <-outCh: - t.Errorf("Expected no more events but got '%v'", x) + case next := <-outCh: + close(next.ack) + t.Errorf("Expected no more events but got '%v'", next.pair) default: } } @@ -174,24 +181,32 @@ func TestEventTypeFiltering(t *testing.T) { defer cancel() inCh := make(chan []byte) - outCh := make(chan *event.Pair) + outCh := make(chan synchronizedPair) el := newEventLoopWithFilter(tt.commFilter, tt.pathFilter) - el.printCb = func(ev *event.Pair) { outCh <- ev } + el.printCb = func(ev *event.Pair) { + next := synchronizedPair{pair: ev, ack: make(chan struct{})} + outCh <- next + <-next.ack + } go el.run(ctx, inCh) go func() { + defer close(inCh) for _, raw := range td.rawTracepoints { inCh <- raw - time.Sleep(time.Microsecond) } }() for _, validate := range td.validates { select { - case ep := <-outCh: - t.Log("Received", ep) - validate(t, el, ep) + case next := <-outCh: + func() { + defer close(next.ack) + ep := next.pair + t.Log("Received", ep) + validate(t, el, ep) + }() case <-time.After(100 * time.Millisecond): // No event expected (filtered out) validate(t, el, nil) @@ -430,7 +445,7 @@ func TestCommFilterToggle(t *testing.T) { defer cancel() inCh := make(chan []byte) - outCh := make(chan *event.Pair) + outCh := make(chan synchronizedPair) // Create eventloop without comm filter el := &eventLoop{ @@ -441,21 +456,27 @@ func TestCommFilterToggle(t *testing.T) { fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), prevPairTimes: make(map[uint32]uint64), - printCb: func(ep *event.Pair) { outCh <- ep }, - done: make(chan struct{}), + cfg: eventLoopConfig{synchronousRawProcessing: true}, + printCb: func(ep *event.Pair) { + next := synchronizedPair{pair: ep, ack: make(chan struct{})} + outCh <- next + <-next.ack + }, + done: make(chan struct{}), } go el.run(ctx, inCh) go func() { + defer close(inCh) for _, raw := range rawTracepoints { inCh <- raw - time.Sleep(time.Microsecond) } }() select { - case ep := <-outCh: - t.Log("Received event with comm filter disabled:", ep) + case next := <-outCh: + close(next.ack) + t.Log("Received event with comm filter disabled:", next.pair) // Good, we received the event case <-time.After(100 * time.Millisecond): t.Error("Expected to receive event with comm filter disabled but got nothing") @@ -468,7 +489,7 @@ func TestCommFilterToggle(t *testing.T) { defer cancel() inCh := make(chan []byte) - outCh := make(chan *event.Pair) + outCh := make(chan synchronizedPair) // Create eventloop with comm filter enabled el := &eventLoop{ @@ -481,21 +502,27 @@ func TestCommFilterToggle(t *testing.T) { fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), prevPairTimes: make(map[uint32]uint64), - printCb: func(ep *event.Pair) { outCh <- ep }, - done: make(chan struct{}), + cfg: eventLoopConfig{synchronousRawProcessing: true}, + printCb: func(ep *event.Pair) { + next := synchronizedPair{pair: ep, ack: make(chan struct{})} + outCh <- next + <-next.ack + }, + done: make(chan struct{}), } go el.run(ctx, inCh) go func() { + defer close(inCh) for _, raw := range rawTracepoints { inCh <- raw - time.Sleep(time.Microsecond) } }() select { - case ep := <-outCh: - t.Error("Expected no event with comm filter enabled but got:", ep) + case next := <-outCh: + close(next.ack) + t.Error("Expected no event with comm filter enabled but got:", next.pair) case <-time.After(100 * time.Millisecond): t.Log("Good, no event received with comm filter enabled") // Expected behavior @@ -517,6 +544,7 @@ func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop { fdTracker: newFDTracker(make(map[int32]file.File)), commResolver: newCommResolver(make(map[uint32]string)), prevPairTimes: make(map[uint32]uint64), + cfg: eventLoopConfig{synchronousRawProcessing: true}, printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, done: make(chan struct{}), } diff --git a/internal/eventloop_test.go b/internal/eventloop_test.go index 3a4ad38..b9e1c89 100644 --- a/internal/eventloop_test.go +++ b/internal/eventloop_test.go @@ -26,6 +26,11 @@ type testData struct { validates []func(t *testing.T, el *eventLoop, ev *event.Pair) } +type synchronizedPair struct { + pair *event.Pair + ack chan struct{} +} + func TestEventloop(t *testing.T) { testTable := map[string]testData{ "OpenEventTest1": makeOpenEventTestData1(t), @@ -94,45 +99,47 @@ func TestEventloop(t *testing.T) { defer cancel() inCh := make(chan []byte) - outCh := make(chan *event.Pair) + outCh := make(chan synchronizedPair) el := mustNewEventLoop(t, eventLoopConfig{}) - el.printCb = func(ev *event.Pair) { outCh <- ev } + el.printCb = func(ev *event.Pair) { + next := synchronizedPair{pair: ev, ack: make(chan struct{})} + outCh <- next + <-next.ack + } go el.run(ctx, inCh) go func() { + defer close(inCh) for _, raw := range td.rawTracepoints { t.Log("Sending raw tracepoint", raw, "simulating BPF sending this") inCh <- raw - // Keep synthetic feed pace close to real arrival and avoid - // stateful assertion races between adjacent events. - time.Sleep(100 * time.Microsecond) } }() for _, validate := range td.validates { - ep := <-outCh - t.Log("Received", ep) - validate(t, el, ep) + next := <-outCh + func() { + defer close(next.ack) + ep := next.pair + t.Log("Received", ep) + validate(t, el, ep) + }() } - // Give a small delay to ensure any unexpected events would have arrived - time.Sleep(10 * time.Millisecond) + waitForEventLoopDone(t, el, 250*time.Millisecond) select { - case x := <-outCh: - t.Errorf("Expected no more events but got '%v'", x) + case next := <-outCh: + close(next.ack) + t.Errorf("Expected no more events but got '%v'", next.pair) default: } // Special checks for edge case tests switch testName { case "EnterOnlyTest": - // Give time for events to be processed - time.Sleep(20 * time.Millisecond) // Verify enter events are still pending // Only the OpenEvent is guaranteed to be stored (FdEvent requires comm name) verifyEnterEventPending(t, el, defaultTid) case "MismatchedPairTest": - // Give time for all events to be processed - time.Sleep(50 * time.Millisecond) // Verify mismatch counter was incremented if el.numTracepointMismatches < 2 { t.Errorf("Expected at least 2 tracepoint mismatches but got %d", el.numTracepointMismatches) @@ -142,6 +149,15 @@ func TestEventloop(t *testing.T) { } } +func waitForEventLoopDone(t *testing.T, el *eventLoop, timeout time.Duration) { + t.Helper() + select { + case <-el.done: + case <-time.After(timeout): + t.Fatal("timed out waiting for event loop to finish") + } +} + func TestHandleFdExitCloseClearsProcFdCache(t *testing.T) { el := mustNewEventLoop(t, eventLoopConfig{}) pid := uint32(1001) @@ -1789,22 +1805,19 @@ func makeFcntlSetFlagsTestData(t *testing.T) (td testData) { t.Errorf("Expected '%v' but got '%v'", fcntlExitEv, ep.ExitEv) } - // Verify flags were updated on the file descriptor - if f, ok := el.fdState().files[int32(fd)]; ok { - fdFile, ok := f.(*file.FdFile) - if !ok { - t.Errorf("Expected file to be FdFile type") - } else { - // Check that O_NONBLOCK and O_APPEND were set - if !fdFile.Flags().Is(syscall.O_NONBLOCK) { - t.Errorf("Expected fd %d to have O_NONBLOCK flag set", fd) - } - if !fdFile.Flags().Is(syscall.O_APPEND) { - t.Errorf("Expected fd %d to have O_APPEND flag set", fd) - } - } + // Validate against the emitted pair snapshot; tracker state can advance + // as later raw events are processed concurrently by the decoder goroutine. + fdFile, ok := ep.File.(*file.FdFile) + if !ok { + t.Errorf("Expected file to be FdFile type") } else { - t.Errorf("Expected fd %d to be tracked", fd) + // Check that O_NONBLOCK and O_APPEND were set. + if !fdFile.Flags().Is(syscall.O_NONBLOCK) { + t.Errorf("Expected fd %d to have O_NONBLOCK flag set", fd) + } + if !fdFile.Flags().Is(syscall.O_APPEND) { + t.Errorf("Expected fd %d to have O_APPEND flag set", fd) + } } }) @@ -1825,25 +1838,20 @@ func makeFcntlSetFlagsTestData(t *testing.T) (td testData) { t.Errorf("Expected '%v' but got '%v'", fcntlExitEv2, ep.ExitEv) } - // Verify flags were updated correctly - if f, ok := el.fdState().files[int32(fd)]; ok { - fdFile, ok := f.(*file.FdFile) - if !ok { - t.Errorf("Expected file to be FdFile type") - } else { - // O_NONBLOCK should be removed, O_APPEND should remain, O_DIRECT should be added - if fdFile.Flags().Is(syscall.O_NONBLOCK) { - t.Errorf("Expected fd %d to NOT have O_NONBLOCK flag", fd) - } - if !fdFile.Flags().Is(syscall.O_APPEND) { - t.Errorf("Expected fd %d to have O_APPEND flag set", fd) - } - if !fdFile.Flags().Is(syscall.O_DIRECT) { - t.Errorf("Expected fd %d to have O_DIRECT flag set", fd) - } - } + fdFile, ok := ep.File.(*file.FdFile) + if !ok { + t.Errorf("Expected file to be FdFile type") } else { - t.Errorf("Expected fd %d to be tracked", fd) + // O_NONBLOCK should be removed, O_APPEND should remain, O_DIRECT should be added. + if fdFile.Flags().Is(syscall.O_NONBLOCK) { + t.Errorf("Expected fd %d to NOT have O_NONBLOCK flag", fd) + } + if !fdFile.Flags().Is(syscall.O_APPEND) { + t.Errorf("Expected fd %d to have O_APPEND flag set", fd) + } + if !fdFile.Flags().Is(syscall.O_DIRECT) { + t.Errorf("Expected fd %d to have O_DIRECT flag set", fd) + } } }) -- cgit v1.2.3