summaryrefslogtreecommitdiff
path: root/internal/eventloop_commresolver_test.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-06 16:29:49 +0200
committerPaul Buetow <paul@buetow.org>2026-03-06 16:29:49 +0200
commit96f30bc109818547e456251ad0c25e1e7308e22b (patch)
tree242dbd0d01b093061ce830085282ea868a9ba337 /internal/eventloop_commresolver_test.go
parentc1814dafe443c9042ebfee99b7e14c4723ce9db3 (diff)
refactor: bound comm resolver lookups with worker pool (task 384)
Diffstat (limited to 'internal/eventloop_commresolver_test.go')
-rw-r--r--internal/eventloop_commresolver_test.go202
1 files changed, 202 insertions, 0 deletions
diff --git a/internal/eventloop_commresolver_test.go b/internal/eventloop_commresolver_test.go
new file mode 100644
index 0000000..0f10db8
--- /dev/null
+++ b/internal/eventloop_commresolver_test.go
@@ -0,0 +1,202 @@
+package internal
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestCommResolverQueueLookupRespectsWorkerLimit(t *testing.T) {
+ const workers = 2
+ const lookups = 6
+
+ started := make(chan struct{}, lookups)
+ release := make(chan struct{})
+ var wg sync.WaitGroup
+ wg.Add(lookups)
+
+ var running int32
+ var maxRunning int32
+
+ resolver := newCommResolver(nil)
+ resolver.lookupWorkers = workers
+ resolver.lookupQueue = make(chan uint32, lookups)
+ resolver.resolveFn = func(tid uint32) string {
+ current := atomic.AddInt32(&running, 1)
+ setMaxInt32(&maxRunning, current)
+ started <- struct{}{}
+ <-release
+ atomic.AddInt32(&running, -1)
+ wg.Done()
+ return fmt.Sprintf("comm-%d", tid)
+ }
+
+ for i := 1; i <= lookups; i++ {
+ resolver.queueLookup(uint32(i))
+ }
+
+ waitForStarts(t, started, workers, 2*time.Second)
+ select {
+ case <-started:
+ t.Fatalf("expected at most %d concurrent lookups", workers)
+ case <-time.After(75 * time.Millisecond):
+ }
+
+ close(release)
+ waitForWaitGroup(t, &wg, 2*time.Second)
+ waitForCondition(t, 2*time.Second, "expected all queued tids to be cached", func() bool {
+ for i := 1; i <= lookups; i++ {
+ if _, ok := resolver.cached(uint32(i)); !ok {
+ return false
+ }
+ }
+ return pendingCount(resolver) == 0
+ })
+
+ if got := atomic.LoadInt32(&maxRunning); got > workers {
+ t.Fatalf("expected max concurrent lookups <= %d, got %d", workers, got)
+ }
+
+ for i := 1; i <= lookups; i++ {
+ want := fmt.Sprintf("comm-%d", i)
+ got, ok := resolver.cached(uint32(i))
+ if !ok {
+ t.Fatalf("expected cached comm for tid %d", i)
+ }
+ if got != want {
+ t.Fatalf("expected tid %d comm %q, got %q", i, want, got)
+ }
+ }
+
+ if pending := pendingCount(resolver); pending != 0 {
+ t.Fatalf("expected no pending lookups after completion, got %d", pending)
+ }
+}
+
+func TestCommResolverQueueLookupQueueFullClearsPending(t *testing.T) {
+ started := make(chan struct{}, 1)
+ release := make(chan struct{})
+
+ resolver := newCommResolver(nil)
+ resolver.lookupWorkers = 1
+ resolver.lookupQueue = make(chan uint32, 1)
+ resolver.resolveFn = func(tid uint32) string {
+ select {
+ case started <- struct{}{}:
+ default:
+ }
+ <-release
+ return fmt.Sprintf("comm-%d", tid)
+ }
+
+ const tid1 uint32 = 101
+ const tid2 uint32 = 102
+ const tid3 uint32 = 103
+
+ resolver.queueLookup(tid1)
+ waitForStarts(t, started, 1, 2*time.Second)
+
+ resolver.queueLookup(tid2)
+ resolver.queueLookup(tid3)
+
+ if !hasPending(resolver, tid1) {
+ t.Fatalf("expected tid %d to remain pending while worker is blocked", tid1)
+ }
+ if !hasPending(resolver, tid2) {
+ t.Fatalf("expected tid %d to remain pending while queued", tid2)
+ }
+ if hasPending(resolver, tid3) {
+ t.Fatalf("expected tid %d pending flag to be cleared when queue is full", tid3)
+ }
+
+ close(release)
+
+ waitForCondition(t, 2*time.Second, "expected first two tids to resolve", func() bool {
+ _, ok1 := resolver.cached(tid1)
+ _, ok2 := resolver.cached(tid2)
+ return ok1 && ok2
+ })
+
+ if _, ok := resolver.cached(tid3); ok {
+ t.Fatalf("did not expect tid %d to resolve from the dropped queue request", tid3)
+ }
+
+ resolver.queueLookup(tid3)
+ waitForCondition(t, 2*time.Second, "expected dropped tid to be retried successfully", func() bool {
+ _, ok := resolver.cached(tid3)
+ return ok
+ })
+}
+
+func hasPending(r *commResolver, tid uint32) bool {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ _, ok := r.pending[tid]
+ return ok
+}
+
+func pendingCount(r *commResolver) int {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return len(r.pending)
+}
+
+func setMaxInt32(target *int32, candidate int32) {
+ for {
+ current := atomic.LoadInt32(target)
+ if candidate <= current {
+ return
+ }
+ if atomic.CompareAndSwapInt32(target, current, candidate) {
+ return
+ }
+ }
+}
+
+func waitForStarts(t *testing.T, ch <-chan struct{}, count int, timeout time.Duration) {
+ t.Helper()
+
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+
+ for i := 0; i < count; i++ {
+ select {
+ case <-ch:
+ case <-timer.C:
+ t.Fatalf("timed out waiting for %d resolver lookups to start", count)
+ }
+ }
+}
+
+func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
+ t.Helper()
+
+ done := make(chan struct{})
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(timeout):
+ t.Fatal("timed out waiting for resolver lookups to complete")
+ }
+}
+
+func waitForCondition(t *testing.T, timeout time.Duration, message string, fn func() bool) {
+ t.Helper()
+
+ deadline := time.Now().Add(timeout)
+ for {
+ if fn() {
+ return
+ }
+ if time.Now().After(deadline) {
+ t.Fatal(message)
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+}