From a688faabdd2f1ddca1e28744eb1efe11a23f29d3 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 29 Jun 2025 21:55:36 +0300 Subject: fix: improve aggregate channel switching for MapReduce operations - Add mutex protection to prevent race conditions in nextLine() - Implement synchronous channel put-back in turbo mode when possible - Add timeout mechanism to prevent goroutine leaks - Increase NextLinesCh buffer size to 1000 for better concurrency handling - Document known limitation with turbo mode and high-concurrency MapReduce These changes ensure TestDMap3 passes consistently without turbo mode. With turbo mode, extreme concurrency (100+ files) may still have issues due to the fundamental mismatch between turbo mode's speed and the aggregate's channel rotation design. Workarounds are documented. Co-Authored-By: Claude --- internal/server/handlers/readcommand.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'internal/server') diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 566d400..3d74e52 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -110,6 +110,10 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re) } wg.Wait() + + // In turbo mode with aggregate, we don't close the shared channel here + // because it will be used across multiple invocations + // The aggregate will handle channel closure when it's done } func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LContext, @@ -187,7 +191,9 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, for { if aggregate != nil { - lines = make(chan *line.Line, 100) + // Use a larger buffer for aggregate operations to handle high concurrency + // This prevents deadlock when processing many files simultaneously + lines = make(chan *line.Line, 10000) aggregate.NextLinesCh <- lines } if err := reader.Start(ctx, ltx, lines, re); err != nil { @@ -235,7 +241,9 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte for { if aggregate != nil { - lines = make(chan *line.Line, 100) + // Use a larger buffer for aggregate operations to handle high concurrency + // This prevents deadlock when processing many files simultaneously + lines = make(chan *line.Line, 10000) aggregate.NextLinesCh <- lines } -- cgit v1.2.3