summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-06 16:29:49 +0200
committerPaul Buetow <paul@buetow.org>2026-03-06 16:29:49 +0200
commit96f30bc109818547e456251ad0c25e1e7308e22b (patch)
tree242dbd0d01b093061ce830085282ea868a9ba337 /internal
parentc1814dafe443c9042ebfee99b7e14c4723ce9db3 (diff)
refactor: bound comm resolver lookups with worker pool (task 384)
Diffstat (limited to 'internal')
-rw-r--r--internal/eventloop.go62
-rw-r--r--internal/eventloop_commresolver_test.go202
2 files changed, 257 insertions, 7 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go
index 86954d4..5fa6f33 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -20,6 +20,11 @@ import (
const sysEnterNameToHandleAtName = "name_to_handle_at"
+const (
+ defaultCommLookupWorkers = 4
+ defaultCommLookupQueueSize = 512
+)
+
type eventLoopConfig struct {
pidFilter int
commFilter string
@@ -69,16 +74,56 @@ type commResolver struct {
mu sync.RWMutex
pending map[uint32]struct{}
+
+ lookupQueue chan uint32
+ lookupWorkers int
+ resolveFn func(uint32) string
+ startWorkersOnce sync.Once
}
func newCommResolver(comms map[uint32]string) *commResolver {
if comms == nil {
comms = make(map[uint32]string)
}
- return &commResolver{
+ r := &commResolver{
comms: comms,
pending: make(map[uint32]struct{}),
}
+ r.ensureLookupConfig()
+ return r
+}
+
+func (r *commResolver) ensureLookupConfig() {
+ if r.lookupWorkers <= 0 {
+ r.lookupWorkers = defaultCommLookupWorkers
+ }
+ if r.lookupQueue == nil {
+ r.lookupQueue = make(chan uint32, defaultCommLookupQueueSize)
+ }
+ if r.resolveFn == nil {
+ r.resolveFn = resolveCommFromProc
+ }
+}
+
+func (r *commResolver) startLookupWorkers() {
+ r.ensureLookupConfig()
+ r.startWorkersOnce.Do(func() {
+ for i := 0; i < r.lookupWorkers; i++ {
+ go r.lookupWorker()
+ }
+ })
+}
+
+func (r *commResolver) lookupWorker() {
+ for tid := range r.lookupQueue {
+ comm := r.resolveFn(tid)
+ r.mu.Lock()
+ delete(r.pending, tid)
+ if comm != "" {
+ r.comms[tid] = comm
+ }
+ r.mu.Unlock()
+ }
}
func (r *commResolver) seedTrackedPidComm(pidFilter int) {
@@ -147,15 +192,16 @@ func (r *commResolver) queueLookup(tid uint32) {
r.pending[tid] = struct{}{}
r.mu.Unlock()
- go func() {
- comm := resolveCommFromProc(tid)
+ r.startLookupWorkers()
+
+ // Keep event processing non-blocking if resolver workers are saturated.
+ select {
+ case r.lookupQueue <- tid:
+ default:
r.mu.Lock()
delete(r.pending, tid)
- if comm != "" {
- r.comms[tid] = comm
- }
r.mu.Unlock()
- }()
+ }
}
type rawEventHandler func(raw []byte, ch chan<- *event.Pair)
@@ -237,6 +283,7 @@ func configuredCommResolver(injected *commResolver) *commResolver {
if injected.pending == nil {
injected.pending = make(map[uint32]struct{})
}
+ injected.ensureLookupConfig()
return injected
}
@@ -265,6 +312,7 @@ func (e *eventLoop) commState() *commResolver {
if e.commResolver.pending == nil {
e.commResolver.pending = make(map[uint32]struct{})
}
+ e.commResolver.ensureLookupConfig()
e.comms = e.commResolver.comms
return e.commResolver
}
diff --git a/internal/eventloop_commresolver_test.go b/internal/eventloop_commresolver_test.go
new file mode 100644
index 0000000..0f10db8
--- /dev/null
+++ b/internal/eventloop_commresolver_test.go
@@ -0,0 +1,202 @@
+package internal
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestCommResolverQueueLookupRespectsWorkerLimit(t *testing.T) {
+ const workers = 2
+ const lookups = 6
+
+ started := make(chan struct{}, lookups)
+ release := make(chan struct{})
+ var wg sync.WaitGroup
+ wg.Add(lookups)
+
+ var running int32
+ var maxRunning int32
+
+ resolver := newCommResolver(nil)
+ resolver.lookupWorkers = workers
+ resolver.lookupQueue = make(chan uint32, lookups)
+ resolver.resolveFn = func(tid uint32) string {
+ current := atomic.AddInt32(&running, 1)
+ setMaxInt32(&maxRunning, current)
+ started <- struct{}{}
+ <-release
+ atomic.AddInt32(&running, -1)
+ wg.Done()
+ return fmt.Sprintf("comm-%d", tid)
+ }
+
+ for i := 1; i <= lookups; i++ {
+ resolver.queueLookup(uint32(i))
+ }
+
+ waitForStarts(t, started, workers, 2*time.Second)
+ select {
+ case <-started:
+ t.Fatalf("expected at most %d concurrent lookups", workers)
+ case <-time.After(75 * time.Millisecond):
+ }
+
+ close(release)
+ waitForWaitGroup(t, &wg, 2*time.Second)
+ waitForCondition(t, 2*time.Second, "expected all queued tids to be cached", func() bool {
+ for i := 1; i <= lookups; i++ {
+ if _, ok := resolver.cached(uint32(i)); !ok {
+ return false
+ }
+ }
+ return pendingCount(resolver) == 0
+ })
+
+ if got := atomic.LoadInt32(&maxRunning); got > workers {
+ t.Fatalf("expected max concurrent lookups <= %d, got %d", workers, got)
+ }
+
+ for i := 1; i <= lookups; i++ {
+ want := fmt.Sprintf("comm-%d", i)
+ got, ok := resolver.cached(uint32(i))
+ if !ok {
+ t.Fatalf("expected cached comm for tid %d", i)
+ }
+ if got != want {
+ t.Fatalf("expected tid %d comm %q, got %q", i, want, got)
+ }
+ }
+
+ if pending := pendingCount(resolver); pending != 0 {
+ t.Fatalf("expected no pending lookups after completion, got %d", pending)
+ }
+}
+
+func TestCommResolverQueueLookupQueueFullClearsPending(t *testing.T) {
+ started := make(chan struct{}, 1)
+ release := make(chan struct{})
+
+ resolver := newCommResolver(nil)
+ resolver.lookupWorkers = 1
+ resolver.lookupQueue = make(chan uint32, 1)
+ resolver.resolveFn = func(tid uint32) string {
+ select {
+ case started <- struct{}{}:
+ default:
+ }
+ <-release
+ return fmt.Sprintf("comm-%d", tid)
+ }
+
+ const tid1 uint32 = 101
+ const tid2 uint32 = 102
+ const tid3 uint32 = 103
+
+ resolver.queueLookup(tid1)
+ waitForStarts(t, started, 1, 2*time.Second)
+
+ resolver.queueLookup(tid2)
+ resolver.queueLookup(tid3)
+
+ if !hasPending(resolver, tid1) {
+ t.Fatalf("expected tid %d to remain pending while worker is blocked", tid1)
+ }
+ if !hasPending(resolver, tid2) {
+ t.Fatalf("expected tid %d to remain pending while queued", tid2)
+ }
+ if hasPending(resolver, tid3) {
+ t.Fatalf("expected tid %d pending flag to be cleared when queue is full", tid3)
+ }
+
+ close(release)
+
+ waitForCondition(t, 2*time.Second, "expected first two tids to resolve", func() bool {
+ _, ok1 := resolver.cached(tid1)
+ _, ok2 := resolver.cached(tid2)
+ return ok1 && ok2
+ })
+
+ if _, ok := resolver.cached(tid3); ok {
+ t.Fatalf("did not expect tid %d to resolve from the dropped queue request", tid3)
+ }
+
+ resolver.queueLookup(tid3)
+ waitForCondition(t, 2*time.Second, "expected dropped tid to be retried successfully", func() bool {
+ _, ok := resolver.cached(tid3)
+ return ok
+ })
+}
+
+func hasPending(r *commResolver, tid uint32) bool {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ _, ok := r.pending[tid]
+ return ok
+}
+
+func pendingCount(r *commResolver) int {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return len(r.pending)
+}
+
+func setMaxInt32(target *int32, candidate int32) {
+ for {
+ current := atomic.LoadInt32(target)
+ if candidate <= current {
+ return
+ }
+ if atomic.CompareAndSwapInt32(target, current, candidate) {
+ return
+ }
+ }
+}
+
+func waitForStarts(t *testing.T, ch <-chan struct{}, count int, timeout time.Duration) {
+ t.Helper()
+
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+
+ for i := 0; i < count; i++ {
+ select {
+ case <-ch:
+ case <-timer.C:
+ t.Fatalf("timed out waiting for %d resolver lookups to start", count)
+ }
+ }
+}
+
+func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
+ t.Helper()
+
+ done := make(chan struct{})
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(timeout):
+ t.Fatal("timed out waiting for resolver lookups to complete")
+ }
+}
+
+func waitForCondition(t *testing.T, timeout time.Duration, message string, fn func() bool) {
+ t.Helper()
+
+ deadline := time.Now().Add(timeout)
+ for {
+ if fn() {
+ return
+ }
+ if time.Now().After(deadline) {
+ t.Fatal(message)
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+}