summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-08 08:49:53 +0200
committerPaul Buetow <paul@buetow.org>2026-03-08 08:49:53 +0200
commitdc20240d2eddacba8a690a75547cbd8f1d3df98e (patch)
tree4c3a56524b86dc77b0dc11b63349cb54bef2bb41
parentc67887f9abbfb726d20d1fa67dca0041a97398bc (diff)
task(ior): add comm resolver shutdown lifecycle (task fcd866dd)
-rw-r--r--internal/eventloop.go44
-rw-r--r--internal/eventloop_commresolver_test.go52
2 files changed, 90 insertions, 6 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go
index 2def4c0..479ac59 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -73,11 +73,14 @@ type commResolver struct {
mu sync.RWMutex
pending map[uint32]struct{}
+ closed bool
lookupQueue chan uint32
lookupWorkers int
resolveFn func(uint32) string
startWorkersOnce sync.Once
+ workersWG sync.WaitGroup
+ shutdownOnce sync.Once
}
func newCommResolver(comms map[uint32]string) *commResolver {
@@ -106,14 +109,22 @@ func (r *commResolver) ensureLookupConfig() {
func (r *commResolver) startLookupWorkers() {
r.ensureLookupConfig()
+ r.mu.RLock()
+ closed := r.closed
+ r.mu.RUnlock()
+ if closed {
+ return
+ }
r.startWorkersOnce.Do(func() {
for i := 0; i < r.lookupWorkers; i++ {
+ r.workersWG.Add(1)
go r.lookupWorker()
}
})
}
func (r *commResolver) lookupWorker() {
+ defer r.workersWG.Done()
for tid := range r.lookupQueue {
comm := r.resolveFn(tid)
r.mu.Lock()
@@ -176,31 +187,51 @@ func (r *commResolver) queueLookup(tid uint32) {
if tid == 0 {
return
}
+ r.startLookupWorkers()
+
r.mu.Lock()
+ defer r.mu.Unlock()
+ if r.closed {
+ return
+ }
if _, ok := r.comms[tid]; ok {
- r.mu.Unlock()
return
}
if r.pending == nil {
r.pending = make(map[uint32]struct{})
}
if _, ok := r.pending[tid]; ok {
- r.mu.Unlock()
return
}
r.pending[tid] = struct{}{}
- r.mu.Unlock()
-
- r.startLookupWorkers()
// Keep event processing non-blocking if resolver workers are saturated.
select {
case r.lookupQueue <- tid:
default:
- r.mu.Lock()
delete(r.pending, tid)
+ }
+}
+
+func (r *commResolver) shutdown() {
+ r.shutdownOnce.Do(func() {
+ r.ensureLookupConfig()
+ r.mu.Lock()
+ r.closed = true
+ for tid := range r.pending {
+ delete(r.pending, tid)
+ }
+ queue := r.lookupQueue
r.mu.Unlock()
+ close(queue)
+ r.workersWG.Wait()
+ })
+}
+func (e *eventLoop) shutdownCommResolver() {
+ if e.commResolver == nil {
+ return
}
+ e.commResolver.shutdown()
}
type rawEventHandler func(raw []byte, ch chan<- *event.Pair)
@@ -341,6 +372,7 @@ func (e *eventLoop) stats() string {
func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) {
defer close(e.done)
+ defer e.shutdownCommResolver()
if e.cfg.pprofEnable {
fmt.Println("Profiling, press Ctrl+C to stop")
diff --git a/internal/eventloop_commresolver_test.go b/internal/eventloop_commresolver_test.go
index 0f10db8..d04a05f 100644
--- a/internal/eventloop_commresolver_test.go
+++ b/internal/eventloop_commresolver_test.go
@@ -21,6 +21,7 @@ func TestCommResolverQueueLookupRespectsWorkerLimit(t *testing.T) {
var maxRunning int32
resolver := newCommResolver(nil)
+ defer resolver.shutdown()
resolver.lookupWorkers = workers
resolver.lookupQueue = make(chan uint32, lookups)
resolver.resolveFn = func(tid uint32) string {
@@ -80,6 +81,7 @@ func TestCommResolverQueueLookupQueueFullClearsPending(t *testing.T) {
release := make(chan struct{})
resolver := newCommResolver(nil)
+ defer resolver.shutdown()
resolver.lookupWorkers = 1
resolver.lookupQueue = make(chan uint32, 1)
resolver.resolveFn = func(tid uint32) string {
@@ -130,6 +132,56 @@ func TestCommResolverQueueLookupQueueFullClearsPending(t *testing.T) {
})
}
+func TestCommResolverShutdownStopsWorkersAndPreventsNewLookups(t *testing.T) {
+ started := make(chan struct{}, 1)
+ release := make(chan struct{})
+
+ resolver := newCommResolver(nil)
+ resolver.lookupWorkers = 1
+ resolver.lookupQueue = make(chan uint32, 1)
+ resolver.resolveFn = func(tid uint32) string {
+ started <- struct{}{}
+ <-release
+ return fmt.Sprintf("comm-%d", tid)
+ }
+
+ const activeTID uint32 = 201
+ const postShutdownTID uint32 = 202
+
+ resolver.queueLookup(activeTID)
+ waitForStarts(t, started, 1, 2*time.Second)
+
+ shutdownDone := make(chan struct{})
+ go func() {
+ resolver.shutdown()
+ close(shutdownDone)
+ }()
+
+ select {
+ case <-shutdownDone:
+ t.Fatal("shutdown returned before in-flight lookup completed")
+ case <-time.After(75 * time.Millisecond):
+ }
+
+ close(release)
+ select {
+ case <-shutdownDone:
+ case <-time.After(2 * time.Second):
+ t.Fatal("timed out waiting for resolver shutdown")
+ }
+
+ resolver.queueLookup(postShutdownTID)
+ if hasPending(resolver, postShutdownTID) {
+ t.Fatalf("expected no pending entry after shutdown for tid %d", postShutdownTID)
+ }
+ if _, ok := resolver.cached(postShutdownTID); ok {
+ t.Fatalf("did not expect tid %d to resolve after shutdown", postShutdownTID)
+ }
+ if pending := pendingCount(resolver); pending != 0 {
+ t.Fatalf("expected no pending lookups after shutdown, got %d", pending)
+ }
+}
+
func hasPending(r *commResolver, tid uint32) bool {
r.mu.RLock()
defer r.mu.RUnlock()