diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-06 16:29:49 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-06 16:29:49 +0200 |
| commit | 96f30bc109818547e456251ad0c25e1e7308e22b (patch) | |
| tree | 242dbd0d01b093061ce830085282ea868a9ba337 /internal | |
| parent | c1814dafe443c9042ebfee99b7e14c4723ce9db3 (diff) | |
refactor: bound comm resolver lookups with worker pool (task 384)
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/eventloop.go | 62 | ||||
| -rw-r--r-- | internal/eventloop_commresolver_test.go | 202 |
2 files changed, 257 insertions, 7 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go index 86954d4..5fa6f33 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -20,6 +20,11 @@ import ( const sysEnterNameToHandleAtName = "name_to_handle_at" +const ( + defaultCommLookupWorkers = 4 + defaultCommLookupQueueSize = 512 +) + type eventLoopConfig struct { pidFilter int commFilter string @@ -69,16 +74,56 @@ type commResolver struct { mu sync.RWMutex pending map[uint32]struct{} + + lookupQueue chan uint32 + lookupWorkers int + resolveFn func(uint32) string + startWorkersOnce sync.Once } func newCommResolver(comms map[uint32]string) *commResolver { if comms == nil { comms = make(map[uint32]string) } - return &commResolver{ + r := &commResolver{ comms: comms, pending: make(map[uint32]struct{}), } + r.ensureLookupConfig() + return r +} + +func (r *commResolver) ensureLookupConfig() { + if r.lookupWorkers <= 0 { + r.lookupWorkers = defaultCommLookupWorkers + } + if r.lookupQueue == nil { + r.lookupQueue = make(chan uint32, defaultCommLookupQueueSize) + } + if r.resolveFn == nil { + r.resolveFn = resolveCommFromProc + } +} + +func (r *commResolver) startLookupWorkers() { + r.ensureLookupConfig() + r.startWorkersOnce.Do(func() { + for i := 0; i < r.lookupWorkers; i++ { + go r.lookupWorker() + } + }) +} + +func (r *commResolver) lookupWorker() { + for tid := range r.lookupQueue { + comm := r.resolveFn(tid) + r.mu.Lock() + delete(r.pending, tid) + if comm != "" { + r.comms[tid] = comm + } + r.mu.Unlock() + } } func (r *commResolver) seedTrackedPidComm(pidFilter int) { @@ -147,15 +192,16 @@ func (r *commResolver) queueLookup(tid uint32) { r.pending[tid] = struct{}{} r.mu.Unlock() - go func() { - comm := resolveCommFromProc(tid) + r.startLookupWorkers() + + // Keep event processing non-blocking if resolver workers are saturated. + select { + case r.lookupQueue <- tid: + default: r.mu.Lock() delete(r.pending, tid) - if comm != "" { - r.comms[tid] = comm - } r.mu.Unlock() - }() + } } type rawEventHandler func(raw []byte, ch chan<- *event.Pair) @@ -237,6 +283,7 @@ func configuredCommResolver(injected *commResolver) *commResolver { if injected.pending == nil { injected.pending = make(map[uint32]struct{}) } + injected.ensureLookupConfig() return injected } @@ -265,6 +312,7 @@ func (e *eventLoop) commState() *commResolver { if e.commResolver.pending == nil { e.commResolver.pending = make(map[uint32]struct{}) } + e.commResolver.ensureLookupConfig() e.comms = e.commResolver.comms return e.commResolver } diff --git a/internal/eventloop_commresolver_test.go b/internal/eventloop_commresolver_test.go new file mode 100644 index 0000000..0f10db8 --- /dev/null +++ b/internal/eventloop_commresolver_test.go @@ -0,0 +1,202 @@ +package internal + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestCommResolverQueueLookupRespectsWorkerLimit(t *testing.T) { + const workers = 2 + const lookups = 6 + + started := make(chan struct{}, lookups) + release := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(lookups) + + var running int32 + var maxRunning int32 + + resolver := newCommResolver(nil) + resolver.lookupWorkers = workers + resolver.lookupQueue = make(chan uint32, lookups) + resolver.resolveFn = func(tid uint32) string { + current := atomic.AddInt32(&running, 1) + setMaxInt32(&maxRunning, current) + started <- struct{}{} + <-release + atomic.AddInt32(&running, -1) + wg.Done() + return fmt.Sprintf("comm-%d", tid) + } + + for i := 1; i <= lookups; i++ { + resolver.queueLookup(uint32(i)) + } + + waitForStarts(t, started, workers, 2*time.Second) + select { + case <-started: + t.Fatalf("expected at most %d concurrent lookups", workers) + case <-time.After(75 * time.Millisecond): + } + + close(release) + waitForWaitGroup(t, &wg, 2*time.Second) + waitForCondition(t, 2*time.Second, "expected all queued tids to be cached", func() bool { + for i := 1; i <= lookups; i++ { + if _, ok := resolver.cached(uint32(i)); !ok { + return false + } + } + return pendingCount(resolver) == 0 + }) + + if got := atomic.LoadInt32(&maxRunning); got > workers { + t.Fatalf("expected max concurrent lookups <= %d, got %d", workers, got) + } + + for i := 1; i <= lookups; i++ { + want := fmt.Sprintf("comm-%d", i) + got, ok := resolver.cached(uint32(i)) + if !ok { + t.Fatalf("expected cached comm for tid %d", i) + } + if got != want { + t.Fatalf("expected tid %d comm %q, got %q", i, want, got) + } + } + + if pending := pendingCount(resolver); pending != 0 { + t.Fatalf("expected no pending lookups after completion, got %d", pending) + } +} + +func TestCommResolverQueueLookupQueueFullClearsPending(t *testing.T) { + started := make(chan struct{}, 1) + release := make(chan struct{}) + + resolver := newCommResolver(nil) + resolver.lookupWorkers = 1 + resolver.lookupQueue = make(chan uint32, 1) + resolver.resolveFn = func(tid uint32) string { + select { + case started <- struct{}{}: + default: + } + <-release + return fmt.Sprintf("comm-%d", tid) + } + + const tid1 uint32 = 101 + const tid2 uint32 = 102 + const tid3 uint32 = 103 + + resolver.queueLookup(tid1) + waitForStarts(t, started, 1, 2*time.Second) + + resolver.queueLookup(tid2) + resolver.queueLookup(tid3) + + if !hasPending(resolver, tid1) { + t.Fatalf("expected tid %d to remain pending while worker is blocked", tid1) + } + if !hasPending(resolver, tid2) { + t.Fatalf("expected tid %d to remain pending while queued", tid2) + } + if hasPending(resolver, tid3) { + t.Fatalf("expected tid %d pending flag to be cleared when queue is full", tid3) + } + + close(release) + + waitForCondition(t, 2*time.Second, "expected first two tids to resolve", func() bool { + _, ok1 := resolver.cached(tid1) + _, ok2 := resolver.cached(tid2) + return ok1 && ok2 + }) + + if _, ok := resolver.cached(tid3); ok { + t.Fatalf("did not expect tid %d to resolve from the dropped queue request", tid3) + } + + resolver.queueLookup(tid3) + waitForCondition(t, 2*time.Second, "expected dropped tid to be retried successfully", func() bool { + _, ok := resolver.cached(tid3) + return ok + }) +} + +func hasPending(r *commResolver, tid uint32) bool { + r.mu.RLock() + defer r.mu.RUnlock() + _, ok := r.pending[tid] + return ok +} + +func pendingCount(r *commResolver) int { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.pending) +} + +func setMaxInt32(target *int32, candidate int32) { + for { + current := atomic.LoadInt32(target) + if candidate <= current { + return + } + if atomic.CompareAndSwapInt32(target, current, candidate) { + return + } + } +} + +func waitForStarts(t *testing.T, ch <-chan struct{}, count int, timeout time.Duration) { + t.Helper() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + for i := 0; i < count; i++ { + select { + case <-ch: + case <-timer.C: + t.Fatalf("timed out waiting for %d resolver lookups to start", count) + } + } +} + +func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { + t.Helper() + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(timeout): + t.Fatal("timed out waiting for resolver lookups to complete") + } +} + +func waitForCondition(t *testing.T, timeout time.Duration, message string, fn func() bool) { + t.Helper() + + deadline := time.Now().Add(timeout) + for { + if fn() { + return + } + if time.Now().After(deadline) { + t.Fatal(message) + } + time.Sleep(10 * time.Millisecond) + } +} |
