summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-29 21:55:36 +0300
committerPaul Buetow <paul@buetow.org>2025-06-29 21:55:36 +0300
commita688faabdd2f1ddca1e28744eb1efe11a23f29d3 (patch)
tree7edc0d361b22816e769bc2f2f1dde30b5048da45 /internal/server
parent2140ed9dcbd180cd5e810eaabd2f3c2fbce55a57 (diff)
fix: improve aggregate channel switching for MapReduce operations
- Add mutex protection to prevent race conditions in nextLine() - Implement synchronous channel put-back in turbo mode when possible - Add timeout mechanism to prevent goroutine leaks - Increase NextLinesCh buffer size to 1000 for better concurrency handling - Document known limitation with turbo mode and high-concurrency MapReduce These changes ensure TestDMap3 passes consistently without turbo mode. With turbo mode, extreme concurrency (100+ files) may still have issues due to the fundamental mismatch between turbo mode's speed and the aggregate's channel rotation design. Workarounds are documented. Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/readcommand.go12
1 files changed, 10 insertions, 2 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 566d400..3d74e52 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -110,6 +110,10 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re)
}
wg.Wait()
+
+ // In turbo mode with aggregate, we don't close the shared channel here
+ // because it will be used across multiple invocations
+ // The aggregate will handle channel closure when it's done
}
func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LContext,
@@ -187,7 +191,9 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
for {
if aggregate != nil {
- lines = make(chan *line.Line, 100)
+ // Use a larger buffer for aggregate operations to handle high concurrency
+ // This prevents deadlock when processing many files simultaneously
+ lines = make(chan *line.Line, 10000)
aggregate.NextLinesCh <- lines
}
if err := reader.Start(ctx, ltx, lines, re); err != nil {
@@ -235,7 +241,9 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
for {
if aggregate != nil {
- lines = make(chan *line.Line, 100)
+ // Use a larger buffer for aggregate operations to handle high concurrency
+ // This prevents deadlock when processing many files simultaneously
+ lines = make(chan *line.Line, 10000)
aggregate.NextLinesCh <- lines
}