diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-04 08:55:10 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-04 08:55:10 +0300 |
| commit | 73b885b7b1e74de010fd8aafc0b89dc60b7ac870 (patch) | |
| tree | 6330c1a2084aa6ea98192ebaa130911bed0cf90b /internal/server | |
| parent | 859be4593e4f7ef37ff2c91dc90f42e6930a3996 (diff) | |
fix: resolve MapReduce turbo mode issues and serverless processing
- Fix serverless MapReduce to pass options with map command for proper mode detection
- Prevent raw lines from being sent to client during MapReduce operations
- Only use turbo mode for cat/grep/tail when no aggregate is present
- Fix race conditions in TurboAggregate with proper synchronization
- Add SafeAggregateSet wrapper for thread-safe operations
- Fix parser selection to use correct parser names
- Add comprehensive unit tests for turbo aggregate functionality
This ensures MapReduce operations in both turbo and non-turbo modes
produce identical results and fixes serverless mode processing.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/handlers/mapcommand.go | 6 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 26 |
2 files changed, 23 insertions, 9 deletions
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go index 5dc7b8f..c804189 100644 --- a/internal/server/handlers/mapcommand.go +++ b/internal/server/handlers/mapcommand.go @@ -23,8 +23,10 @@ func newMapCommand(serverHandler *ServerHandler, argc int, m := mapCommand{server: serverHandler} queryStr := strings.Join(args[1:], " ") - // If turbo mode is enabled, create a TurboAggregate - if config.Server.TurboModeEnable { + // If turbo mode is enabled AND we're in server mode (not serverless), create a TurboAggregate + // Turbo mode is a server-side optimization and should not be used in serverless mode + dlog.Server.Debug("MapReduce mode check", "turboModeEnable", config.Server.TurboModeEnable, "serverless", serverHandler.serverless) + if config.Server.TurboModeEnable && !serverHandler.serverless { dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr) turboAggregate, err := server.NewTurboAggregate(queryStr) if err != nil { diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 0ceb8ee..2ce1fc7 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -261,24 +261,33 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, // Check if we should use the turbo boost optimizations // Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations // MapReduce now has a turbo mode implementation that bypasses channels + dlog.Server.Debug(r.server.user, "Checking turbo mode", "turboModeEnable", config.Server.TurboModeEnable, + "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil, "hasAggregate", r.server.aggregate != nil) + // Only use turbo mode if: + // 1. Turbo mode is enabled AND + // 2. We have a turbo aggregate OR (we're in cat/grep/tail mode AND we don't have a regular aggregate) if config.Server.TurboModeEnable && - (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient || r.server.turboAggregate != nil) { + (r.server.turboAggregate != nil || ((r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && r.server.aggregate == nil)) { dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil) r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader) return } // Original channel-based implementation - lines := r.server.lines aggregate := r.server.aggregate + var lines chan *line.Line for { if aggregate != nil { - // Use a larger buffer for aggregate operations to handle high concurrency - // This prevents deadlock when processing many files simultaneously + // For MapReduce operations, create a new channel that goes only to the aggregate + // This prevents lines from being sent to the client lines = make(chan *line.Line, 10000) aggregate.NextLinesCh <- lines + } else { + // For non-MapReduce operations, use the server's lines channel + lines = r.server.lines } + if err := reader.Start(ctx, ltx, lines, re); err != nil { dlog.Server.Error(r.server.user, path, globID, err) } @@ -315,18 +324,21 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte } // Use the existing lines channel but with the processor-based reader - lines := r.server.lines aggregate := r.server.aggregate + var lines chan *line.Line // Use the optimized version if turbo boost is enabled turboBoostEnabled := config.Server.TurboModeEnable for { if aggregate != nil { - // Use a larger buffer for aggregate operations to handle high concurrency - // This prevents deadlock when processing many files simultaneously + // For MapReduce operations, create a new channel that goes only to the aggregate + // This prevents lines from being sent to the client lines = make(chan *line.Line, 10000) aggregate.NextLinesCh <- lines + } else { + // For non-MapReduce operations, use the server's lines channel + lines = r.server.lines } // Create a processor that sends to the lines channel |
