summaryrefslogtreecommitdiff
path: root/internal/eventloop.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-06 16:29:49 +0200
committerPaul Buetow <paul@buetow.org>2026-03-06 16:29:49 +0200
commit96f30bc109818547e456251ad0c25e1e7308e22b (patch)
tree242dbd0d01b093061ce830085282ea868a9ba337 /internal/eventloop.go
parentc1814dafe443c9042ebfee99b7e14c4723ce9db3 (diff)
refactor: bound comm resolver lookups with worker pool (task 384)
Diffstat (limited to 'internal/eventloop.go')
-rw-r--r--internal/eventloop.go62
1 files changed, 55 insertions, 7 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go
index 86954d4..5fa6f33 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -20,6 +20,11 @@ import (
const sysEnterNameToHandleAtName = "name_to_handle_at"
+const (
+ defaultCommLookupWorkers = 4
+ defaultCommLookupQueueSize = 512
+)
+
type eventLoopConfig struct {
pidFilter int
commFilter string
@@ -69,16 +74,56 @@ type commResolver struct {
mu sync.RWMutex
pending map[uint32]struct{}
+
+ lookupQueue chan uint32
+ lookupWorkers int
+ resolveFn func(uint32) string
+ startWorkersOnce sync.Once
}
func newCommResolver(comms map[uint32]string) *commResolver {
if comms == nil {
comms = make(map[uint32]string)
}
- return &commResolver{
+ r := &commResolver{
comms: comms,
pending: make(map[uint32]struct{}),
}
+ r.ensureLookupConfig()
+ return r
+}
+
+func (r *commResolver) ensureLookupConfig() {
+ if r.lookupWorkers <= 0 {
+ r.lookupWorkers = defaultCommLookupWorkers
+ }
+ if r.lookupQueue == nil {
+ r.lookupQueue = make(chan uint32, defaultCommLookupQueueSize)
+ }
+ if r.resolveFn == nil {
+ r.resolveFn = resolveCommFromProc
+ }
+}
+
+func (r *commResolver) startLookupWorkers() {
+ r.ensureLookupConfig()
+ r.startWorkersOnce.Do(func() {
+ for i := 0; i < r.lookupWorkers; i++ {
+ go r.lookupWorker()
+ }
+ })
+}
+
+func (r *commResolver) lookupWorker() {
+ for tid := range r.lookupQueue {
+ comm := r.resolveFn(tid)
+ r.mu.Lock()
+ delete(r.pending, tid)
+ if comm != "" {
+ r.comms[tid] = comm
+ }
+ r.mu.Unlock()
+ }
}
func (r *commResolver) seedTrackedPidComm(pidFilter int) {
@@ -147,15 +192,16 @@ func (r *commResolver) queueLookup(tid uint32) {
r.pending[tid] = struct{}{}
r.mu.Unlock()
- go func() {
- comm := resolveCommFromProc(tid)
+ r.startLookupWorkers()
+
+ // Keep event processing non-blocking if resolver workers are saturated.
+ select {
+ case r.lookupQueue <- tid:
+ default:
r.mu.Lock()
delete(r.pending, tid)
- if comm != "" {
- r.comms[tid] = comm
- }
r.mu.Unlock()
- }()
+ }
}
type rawEventHandler func(raw []byte, ch chan<- *event.Pair)
@@ -237,6 +283,7 @@ func configuredCommResolver(injected *commResolver) *commResolver {
if injected.pending == nil {
injected.pending = make(map[uint32]struct{})
}
+ injected.ensureLookupConfig()
return injected
}
@@ -265,6 +312,7 @@ func (e *eventLoop) commState() *commResolver {
if e.commResolver.pending == nil {
e.commResolver.pending = make(map[uint32]struct{})
}
+ e.commResolver.ensureLookupConfig()
e.comms = e.commResolver.comms
return e.commResolver
}