diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-06 16:29:49 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-06 16:29:49 +0200 |
| commit | 96f30bc109818547e456251ad0c25e1e7308e22b (patch) | |
| tree | 242dbd0d01b093061ce830085282ea868a9ba337 /internal/eventloop.go | |
| parent | c1814dafe443c9042ebfee99b7e14c4723ce9db3 (diff) | |
refactor: bound comm resolver lookups with worker pool (task 384)
Diffstat (limited to 'internal/eventloop.go')
| -rw-r--r-- | internal/eventloop.go | 62 |
1 files changed, 55 insertions, 7 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go index 86954d4..5fa6f33 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -20,6 +20,11 @@ import ( const sysEnterNameToHandleAtName = "name_to_handle_at" +const ( + defaultCommLookupWorkers = 4 + defaultCommLookupQueueSize = 512 +) + type eventLoopConfig struct { pidFilter int commFilter string @@ -69,16 +74,56 @@ type commResolver struct { mu sync.RWMutex pending map[uint32]struct{} + + lookupQueue chan uint32 + lookupWorkers int + resolveFn func(uint32) string + startWorkersOnce sync.Once } func newCommResolver(comms map[uint32]string) *commResolver { if comms == nil { comms = make(map[uint32]string) } - return &commResolver{ + r := &commResolver{ comms: comms, pending: make(map[uint32]struct{}), } + r.ensureLookupConfig() + return r +} + +func (r *commResolver) ensureLookupConfig() { + if r.lookupWorkers <= 0 { + r.lookupWorkers = defaultCommLookupWorkers + } + if r.lookupQueue == nil { + r.lookupQueue = make(chan uint32, defaultCommLookupQueueSize) + } + if r.resolveFn == nil { + r.resolveFn = resolveCommFromProc + } +} + +func (r *commResolver) startLookupWorkers() { + r.ensureLookupConfig() + r.startWorkersOnce.Do(func() { + for i := 0; i < r.lookupWorkers; i++ { + go r.lookupWorker() + } + }) +} + +func (r *commResolver) lookupWorker() { + for tid := range r.lookupQueue { + comm := r.resolveFn(tid) + r.mu.Lock() + delete(r.pending, tid) + if comm != "" { + r.comms[tid] = comm + } + r.mu.Unlock() + } } func (r *commResolver) seedTrackedPidComm(pidFilter int) { @@ -147,15 +192,16 @@ func (r *commResolver) queueLookup(tid uint32) { r.pending[tid] = struct{}{} r.mu.Unlock() - go func() { - comm := resolveCommFromProc(tid) + r.startLookupWorkers() + + // Keep event processing non-blocking if resolver workers are saturated. + select { + case r.lookupQueue <- tid: + default: r.mu.Lock() delete(r.pending, tid) - if comm != "" { - r.comms[tid] = comm - } r.mu.Unlock() - }() + } } type rawEventHandler func(raw []byte, ch chan<- *event.Pair) @@ -237,6 +283,7 @@ func configuredCommResolver(injected *commResolver) *commResolver { if injected.pending == nil { injected.pending = make(map[uint32]struct{}) } + injected.ensureLookupConfig() return injected } @@ -265,6 +312,7 @@ func (e *eventLoop) commState() *commResolver { if e.commResolver.pending == nil { e.commResolver.pending = make(map[uint32]struct{}) } + e.commResolver.ensureLookupConfig() e.comms = e.commResolver.comms return e.commResolver } |
