summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-08 11:38:29 +0200
committerPaul Buetow <paul@buetow.org>2026-03-08 11:38:29 +0200
commit534d55c47fc29d1089cb5c2c9c4832461e762603 (patch)
tree5f9d0cabee00f10fc7200d880c160a67712c5cac /internal
parentcc3b3cdc764edc5e5261f920384a098ea6968b77 (diff)
tests: replace eventloop sleep synchronization with deterministic signaling
Diffstat (limited to 'internal')
-rw-r--r--internal/eventloop.go41
-rw-r--r--internal/eventloop_commresolver_test.go11
-rw-r--r--internal/eventloop_constructor_test.go3
-rw-r--r--internal/eventloop_filter_test.go86
-rw-r--r--internal/eventloop_test.go106
5 files changed, 164 insertions, 83 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go
index 2c3cf49..6500508 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -32,8 +32,11 @@ type eventLoopConfig struct {
countField string
pprofEnable bool
plainMode bool
- fdTracker *fdTracker
- commResolver *commResolver
+ // synchronousRawProcessing keeps raw decode and callback emission in a
+ // single goroutine for deterministic test execution.
+ synchronousRawProcessing bool
+ fdTracker *fdTracker
+ commResolver *commResolver
}
type fdTracker struct {
@@ -385,12 +388,46 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) {
if e.printCb == nil {
e.printCb = func(ep *event.Pair) { ep.Recycle() }
}
+ if e.cfg.synchronousRawProcessing {
+ e.runSynchronously(ctx, rawCh)
+ return
+ }
for ep := range e.events(ctx, rawCh) {
e.printCb(ep)
e.numSyscallsAfterFilter++
}
}
+func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) {
+ pairs := make(chan *event.Pair, 1)
+
+ for {
+ select {
+ case raw, ok := <-rawCh:
+ if !ok {
+ return
+ }
+ if len(raw) == 0 {
+ continue
+ }
+ e.processRawEvent(raw, pairs)
+ for {
+ select {
+ case ep := <-pairs:
+ e.printCb(ep)
+ e.numSyscallsAfterFilter++
+ default:
+ goto nextRaw
+ }
+ }
+ case <-ctx.Done():
+ fmt.Println("Stopping event loop")
+ return
+ }
+ nextRaw:
+ }
+}
+
func (e *eventLoop) events(ctx context.Context, rawCh <-chan []byte) <-chan *event.Pair {
ch := make(chan *event.Pair)
diff --git a/internal/eventloop_commresolver_test.go b/internal/eventloop_commresolver_test.go
index d04a05f..4d3b193 100644
--- a/internal/eventloop_commresolver_test.go
+++ b/internal/eventloop_commresolver_test.go
@@ -241,14 +241,19 @@ func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
func waitForCondition(t *testing.T, timeout time.Duration, message string, fn func() bool) {
t.Helper()
- deadline := time.Now().Add(timeout)
+ timer := time.NewTimer(timeout)
+ ticker := time.NewTicker(10 * time.Millisecond)
+ defer timer.Stop()
+ defer ticker.Stop()
+
for {
if fn() {
return
}
- if time.Now().After(deadline) {
+ select {
+ case <-timer.C:
t.Fatal(message)
+ case <-ticker.C:
}
- time.Sleep(10 * time.Millisecond)
}
}
diff --git a/internal/eventloop_constructor_test.go b/internal/eventloop_constructor_test.go
index 52ca570..6636838 100644
--- a/internal/eventloop_constructor_test.go
+++ b/internal/eventloop_constructor_test.go
@@ -9,6 +9,9 @@ import (
func mustNewEventLoop(tb testing.TB, cfg eventLoopConfig) *eventLoop {
tb.Helper()
+ if _, isBenchmark := tb.(*testing.B); !isBenchmark {
+ cfg.synchronousRawProcessing = true
+ }
el, err := newEventLoop(cfg)
if err != nil {
tb.Fatalf("newEventLoop() error = %v", err)
diff --git a/internal/eventloop_filter_test.go b/internal/eventloop_filter_test.go
index d0b7933..b31f39c 100644
--- a/internal/eventloop_filter_test.go
+++ b/internal/eventloop_filter_test.go
@@ -19,32 +19,39 @@ func TestCommPropagation(t *testing.T) {
defer cancel()
inCh := make(chan []byte)
- outCh := make(chan *event.Pair)
+ outCh := make(chan synchronizedPair)
el := mustNewEventLoop(t, eventLoopConfig{})
- el.printCb = func(ev *event.Pair) { outCh <- ev }
+ el.printCb = func(ev *event.Pair) {
+ next := synchronizedPair{pair: ev, ack: make(chan struct{})}
+ outCh <- next
+ <-next.ack
+ }
go el.run(ctx, inCh)
go func() {
+ defer close(inCh)
for _, raw := range td.rawTracepoints {
t.Log("Sending raw tracepoint", raw, "simulating BPF sending this")
inCh <- raw
- // Small delay to simulate real BPF event timing
- time.Sleep(time.Microsecond)
}
}()
for _, validate := range td.validates {
- ep := <-outCh
- t.Log("Received", ep)
- validate(t, el, ep)
+ next := <-outCh
+ func() {
+ defer close(next.ack)
+ ep := next.pair
+ t.Log("Received", ep)
+ validate(t, el, ep)
+ }()
}
- // Give a small delay to ensure any unexpected events would have arrived
- time.Sleep(10 * time.Millisecond)
+ waitForEventLoopDone(t, el, 250*time.Millisecond)
select {
- case x := <-outCh:
- t.Errorf("Expected no more events but got '%v'", x)
+ case next := <-outCh:
+ close(next.ack)
+ t.Errorf("Expected no more events but got '%v'", next.pair)
default:
}
}
@@ -174,24 +181,32 @@ func TestEventTypeFiltering(t *testing.T) {
defer cancel()
inCh := make(chan []byte)
- outCh := make(chan *event.Pair)
+ outCh := make(chan synchronizedPair)
el := newEventLoopWithFilter(tt.commFilter, tt.pathFilter)
- el.printCb = func(ev *event.Pair) { outCh <- ev }
+ el.printCb = func(ev *event.Pair) {
+ next := synchronizedPair{pair: ev, ack: make(chan struct{})}
+ outCh <- next
+ <-next.ack
+ }
go el.run(ctx, inCh)
go func() {
+ defer close(inCh)
for _, raw := range td.rawTracepoints {
inCh <- raw
- time.Sleep(time.Microsecond)
}
}()
for _, validate := range td.validates {
select {
- case ep := <-outCh:
- t.Log("Received", ep)
- validate(t, el, ep)
+ case next := <-outCh:
+ func() {
+ defer close(next.ack)
+ ep := next.pair
+ t.Log("Received", ep)
+ validate(t, el, ep)
+ }()
case <-time.After(100 * time.Millisecond):
// No event expected (filtered out)
validate(t, el, nil)
@@ -430,7 +445,7 @@ func TestCommFilterToggle(t *testing.T) {
defer cancel()
inCh := make(chan []byte)
- outCh := make(chan *event.Pair)
+ outCh := make(chan synchronizedPair)
// Create eventloop without comm filter
el := &eventLoop{
@@ -441,21 +456,27 @@ func TestCommFilterToggle(t *testing.T) {
fdTracker: newFDTracker(make(map[int32]file.File)),
commResolver: newCommResolver(make(map[uint32]string)),
prevPairTimes: make(map[uint32]uint64),
- printCb: func(ep *event.Pair) { outCh <- ep },
- done: make(chan struct{}),
+ cfg: eventLoopConfig{synchronousRawProcessing: true},
+ printCb: func(ep *event.Pair) {
+ next := synchronizedPair{pair: ep, ack: make(chan struct{})}
+ outCh <- next
+ <-next.ack
+ },
+ done: make(chan struct{}),
}
go el.run(ctx, inCh)
go func() {
+ defer close(inCh)
for _, raw := range rawTracepoints {
inCh <- raw
- time.Sleep(time.Microsecond)
}
}()
select {
- case ep := <-outCh:
- t.Log("Received event with comm filter disabled:", ep)
+ case next := <-outCh:
+ close(next.ack)
+ t.Log("Received event with comm filter disabled:", next.pair)
// Good, we received the event
case <-time.After(100 * time.Millisecond):
t.Error("Expected to receive event with comm filter disabled but got nothing")
@@ -468,7 +489,7 @@ func TestCommFilterToggle(t *testing.T) {
defer cancel()
inCh := make(chan []byte)
- outCh := make(chan *event.Pair)
+ outCh := make(chan synchronizedPair)
// Create eventloop with comm filter enabled
el := &eventLoop{
@@ -481,21 +502,27 @@ func TestCommFilterToggle(t *testing.T) {
fdTracker: newFDTracker(make(map[int32]file.File)),
commResolver: newCommResolver(make(map[uint32]string)),
prevPairTimes: make(map[uint32]uint64),
- printCb: func(ep *event.Pair) { outCh <- ep },
- done: make(chan struct{}),
+ cfg: eventLoopConfig{synchronousRawProcessing: true},
+ printCb: func(ep *event.Pair) {
+ next := synchronizedPair{pair: ep, ack: make(chan struct{})}
+ outCh <- next
+ <-next.ack
+ },
+ done: make(chan struct{}),
}
go el.run(ctx, inCh)
go func() {
+ defer close(inCh)
for _, raw := range rawTracepoints {
inCh <- raw
- time.Sleep(time.Microsecond)
}
}()
select {
- case ep := <-outCh:
- t.Error("Expected no event with comm filter enabled but got:", ep)
+ case next := <-outCh:
+ close(next.ack)
+ t.Error("Expected no event with comm filter enabled but got:", next.pair)
case <-time.After(100 * time.Millisecond):
t.Log("Good, no event received with comm filter enabled")
// Expected behavior
@@ -517,6 +544,7 @@ func newEventLoopWithFilter(commFilter, pathFilter string) *eventLoop {
fdTracker: newFDTracker(make(map[int32]file.File)),
commResolver: newCommResolver(make(map[uint32]string)),
prevPairTimes: make(map[uint32]uint64),
+ cfg: eventLoopConfig{synchronousRawProcessing: true},
printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() },
done: make(chan struct{}),
}
diff --git a/internal/eventloop_test.go b/internal/eventloop_test.go
index 3a4ad38..b9e1c89 100644
--- a/internal/eventloop_test.go
+++ b/internal/eventloop_test.go
@@ -26,6 +26,11 @@ type testData struct {
validates []func(t *testing.T, el *eventLoop, ev *event.Pair)
}
+type synchronizedPair struct {
+ pair *event.Pair
+ ack chan struct{}
+}
+
func TestEventloop(t *testing.T) {
testTable := map[string]testData{
"OpenEventTest1": makeOpenEventTestData1(t),
@@ -94,45 +99,47 @@ func TestEventloop(t *testing.T) {
defer cancel()
inCh := make(chan []byte)
- outCh := make(chan *event.Pair)
+ outCh := make(chan synchronizedPair)
el := mustNewEventLoop(t, eventLoopConfig{})
- el.printCb = func(ev *event.Pair) { outCh <- ev }
+ el.printCb = func(ev *event.Pair) {
+ next := synchronizedPair{pair: ev, ack: make(chan struct{})}
+ outCh <- next
+ <-next.ack
+ }
go el.run(ctx, inCh)
go func() {
+ defer close(inCh)
for _, raw := range td.rawTracepoints {
t.Log("Sending raw tracepoint", raw, "simulating BPF sending this")
inCh <- raw
- // Keep synthetic feed pace close to real arrival and avoid
- // stateful assertion races between adjacent events.
- time.Sleep(100 * time.Microsecond)
}
}()
for _, validate := range td.validates {
- ep := <-outCh
- t.Log("Received", ep)
- validate(t, el, ep)
+ next := <-outCh
+ func() {
+ defer close(next.ack)
+ ep := next.pair
+ t.Log("Received", ep)
+ validate(t, el, ep)
+ }()
}
- // Give a small delay to ensure any unexpected events would have arrived
- time.Sleep(10 * time.Millisecond)
+ waitForEventLoopDone(t, el, 250*time.Millisecond)
select {
- case x := <-outCh:
- t.Errorf("Expected no more events but got '%v'", x)
+ case next := <-outCh:
+ close(next.ack)
+ t.Errorf("Expected no more events but got '%v'", next.pair)
default:
}
// Special checks for edge case tests
switch testName {
case "EnterOnlyTest":
- // Give time for events to be processed
- time.Sleep(20 * time.Millisecond)
// Verify enter events are still pending
// Only the OpenEvent is guaranteed to be stored (FdEvent requires comm name)
verifyEnterEventPending(t, el, defaultTid)
case "MismatchedPairTest":
- // Give time for all events to be processed
- time.Sleep(50 * time.Millisecond)
// Verify mismatch counter was incremented
if el.numTracepointMismatches < 2 {
t.Errorf("Expected at least 2 tracepoint mismatches but got %d", el.numTracepointMismatches)
@@ -142,6 +149,15 @@ func TestEventloop(t *testing.T) {
}
}
+func waitForEventLoopDone(t *testing.T, el *eventLoop, timeout time.Duration) {
+ t.Helper()
+ select {
+ case <-el.done:
+ case <-time.After(timeout):
+ t.Fatal("timed out waiting for event loop to finish")
+ }
+}
+
func TestHandleFdExitCloseClearsProcFdCache(t *testing.T) {
el := mustNewEventLoop(t, eventLoopConfig{})
pid := uint32(1001)
@@ -1789,22 +1805,19 @@ func makeFcntlSetFlagsTestData(t *testing.T) (td testData) {
t.Errorf("Expected '%v' but got '%v'", fcntlExitEv, ep.ExitEv)
}
- // Verify flags were updated on the file descriptor
- if f, ok := el.fdState().files[int32(fd)]; ok {
- fdFile, ok := f.(*file.FdFile)
- if !ok {
- t.Errorf("Expected file to be FdFile type")
- } else {
- // Check that O_NONBLOCK and O_APPEND were set
- if !fdFile.Flags().Is(syscall.O_NONBLOCK) {
- t.Errorf("Expected fd %d to have O_NONBLOCK flag set", fd)
- }
- if !fdFile.Flags().Is(syscall.O_APPEND) {
- t.Errorf("Expected fd %d to have O_APPEND flag set", fd)
- }
- }
+ // Validate against the emitted pair snapshot; tracker state can advance
+ // as later raw events are processed concurrently by the decoder goroutine.
+ fdFile, ok := ep.File.(*file.FdFile)
+ if !ok {
+ t.Errorf("Expected file to be FdFile type")
} else {
- t.Errorf("Expected fd %d to be tracked", fd)
+ // Check that O_NONBLOCK and O_APPEND were set.
+ if !fdFile.Flags().Is(syscall.O_NONBLOCK) {
+ t.Errorf("Expected fd %d to have O_NONBLOCK flag set", fd)
+ }
+ if !fdFile.Flags().Is(syscall.O_APPEND) {
+ t.Errorf("Expected fd %d to have O_APPEND flag set", fd)
+ }
}
})
@@ -1825,25 +1838,20 @@ func makeFcntlSetFlagsTestData(t *testing.T) (td testData) {
t.Errorf("Expected '%v' but got '%v'", fcntlExitEv2, ep.ExitEv)
}
- // Verify flags were updated correctly
- if f, ok := el.fdState().files[int32(fd)]; ok {
- fdFile, ok := f.(*file.FdFile)
- if !ok {
- t.Errorf("Expected file to be FdFile type")
- } else {
- // O_NONBLOCK should be removed, O_APPEND should remain, O_DIRECT should be added
- if fdFile.Flags().Is(syscall.O_NONBLOCK) {
- t.Errorf("Expected fd %d to NOT have O_NONBLOCK flag", fd)
- }
- if !fdFile.Flags().Is(syscall.O_APPEND) {
- t.Errorf("Expected fd %d to have O_APPEND flag set", fd)
- }
- if !fdFile.Flags().Is(syscall.O_DIRECT) {
- t.Errorf("Expected fd %d to have O_DIRECT flag set", fd)
- }
- }
+ fdFile, ok := ep.File.(*file.FdFile)
+ if !ok {
+ t.Errorf("Expected file to be FdFile type")
} else {
- t.Errorf("Expected fd %d to be tracked", fd)
+ // O_NONBLOCK should be removed, O_APPEND should remain, O_DIRECT should be added.
+ if fdFile.Flags().Is(syscall.O_NONBLOCK) {
+ t.Errorf("Expected fd %d to NOT have O_NONBLOCK flag", fd)
+ }
+ if !fdFile.Flags().Is(syscall.O_APPEND) {
+ t.Errorf("Expected fd %d to have O_APPEND flag set", fd)
+ }
+ if !fdFile.Flags().Is(syscall.O_DIRECT) {
+ t.Errorf("Expected fd %d to have O_DIRECT flag set", fd)
+ }
}
})