diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-08 08:49:53 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-08 08:49:53 +0200 |
| commit | dc20240d2eddacba8a690a75547cbd8f1d3df98e (patch) | |
| tree | 4c3a56524b86dc77b0dc11b63349cb54bef2bb41 | |
| parent | c67887f9abbfb726d20d1fa67dca0041a97398bc (diff) | |
task(ior): add comm resolver shutdown lifecycle (task fcd866dd)
| -rw-r--r-- | internal/eventloop.go | 44 | ||||
| -rw-r--r-- | internal/eventloop_commresolver_test.go | 52 |
2 files changed, 90 insertions, 6 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go index 2def4c0..479ac59 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -73,11 +73,14 @@ type commResolver struct { mu sync.RWMutex pending map[uint32]struct{} + closed bool lookupQueue chan uint32 lookupWorkers int resolveFn func(uint32) string startWorkersOnce sync.Once + workersWG sync.WaitGroup + shutdownOnce sync.Once } func newCommResolver(comms map[uint32]string) *commResolver { @@ -106,14 +109,22 @@ func (r *commResolver) ensureLookupConfig() { func (r *commResolver) startLookupWorkers() { r.ensureLookupConfig() + r.mu.RLock() + closed := r.closed + r.mu.RUnlock() + if closed { + return + } r.startWorkersOnce.Do(func() { for i := 0; i < r.lookupWorkers; i++ { + r.workersWG.Add(1) go r.lookupWorker() } }) } func (r *commResolver) lookupWorker() { + defer r.workersWG.Done() for tid := range r.lookupQueue { comm := r.resolveFn(tid) r.mu.Lock() @@ -176,31 +187,51 @@ func (r *commResolver) queueLookup(tid uint32) { if tid == 0 { return } + r.startLookupWorkers() + r.mu.Lock() + defer r.mu.Unlock() + if r.closed { + return + } if _, ok := r.comms[tid]; ok { - r.mu.Unlock() return } if r.pending == nil { r.pending = make(map[uint32]struct{}) } if _, ok := r.pending[tid]; ok { - r.mu.Unlock() return } r.pending[tid] = struct{}{} - r.mu.Unlock() - - r.startLookupWorkers() // Keep event processing non-blocking if resolver workers are saturated. select { case r.lookupQueue <- tid: default: - r.mu.Lock() delete(r.pending, tid) + } +} + +func (r *commResolver) shutdown() { + r.shutdownOnce.Do(func() { + r.ensureLookupConfig() + r.mu.Lock() + r.closed = true + for tid := range r.pending { + delete(r.pending, tid) + } + queue := r.lookupQueue r.mu.Unlock() + close(queue) + r.workersWG.Wait() + }) +} +func (e *eventLoop) shutdownCommResolver() { + if e.commResolver == nil { + return } + e.commResolver.shutdown() } type rawEventHandler func(raw []byte, ch chan<- *event.Pair) @@ -341,6 +372,7 @@ func (e *eventLoop) stats() string { func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { defer close(e.done) + defer e.shutdownCommResolver() if e.cfg.pprofEnable { fmt.Println("Profiling, press Ctrl+C to stop") diff --git a/internal/eventloop_commresolver_test.go b/internal/eventloop_commresolver_test.go index 0f10db8..d04a05f 100644 --- a/internal/eventloop_commresolver_test.go +++ b/internal/eventloop_commresolver_test.go @@ -21,6 +21,7 @@ func TestCommResolverQueueLookupRespectsWorkerLimit(t *testing.T) { var maxRunning int32 resolver := newCommResolver(nil) + defer resolver.shutdown() resolver.lookupWorkers = workers resolver.lookupQueue = make(chan uint32, lookups) resolver.resolveFn = func(tid uint32) string { @@ -80,6 +81,7 @@ func TestCommResolverQueueLookupQueueFullClearsPending(t *testing.T) { release := make(chan struct{}) resolver := newCommResolver(nil) + defer resolver.shutdown() resolver.lookupWorkers = 1 resolver.lookupQueue = make(chan uint32, 1) resolver.resolveFn = func(tid uint32) string { @@ -130,6 +132,56 @@ func TestCommResolverQueueLookupQueueFullClearsPending(t *testing.T) { }) } +func TestCommResolverShutdownStopsWorkersAndPreventsNewLookups(t *testing.T) { + started := make(chan struct{}, 1) + release := make(chan struct{}) + + resolver := newCommResolver(nil) + resolver.lookupWorkers = 1 + resolver.lookupQueue = make(chan uint32, 1) + resolver.resolveFn = func(tid uint32) string { + started <- struct{}{} + <-release + return fmt.Sprintf("comm-%d", tid) + } + + const activeTID uint32 = 201 + const postShutdownTID uint32 = 202 + + resolver.queueLookup(activeTID) + waitForStarts(t, started, 1, 2*time.Second) + + shutdownDone := make(chan struct{}) + go func() { + resolver.shutdown() + close(shutdownDone) + }() + + select { + case <-shutdownDone: + t.Fatal("shutdown returned before in-flight lookup completed") + case <-time.After(75 * time.Millisecond): + } + + close(release) + select { + case <-shutdownDone: + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for resolver shutdown") + } + + resolver.queueLookup(postShutdownTID) + if hasPending(resolver, postShutdownTID) { + t.Fatalf("expected no pending entry after shutdown for tid %d", postShutdownTID) + } + if _, ok := resolver.cached(postShutdownTID); ok { + t.Fatalf("did not expect tid %d to resolve after shutdown", postShutdownTID) + } + if pending := pendingCount(resolver); pending != 0 { + t.Fatalf("expected no pending lookups after shutdown, got %d", pending) + } +} + func hasPending(r *commResolver, tid uint32) bool { r.mu.RLock() defer r.mu.RUnlock() |
