summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-07-04 08:55:10 +0300
committerPaul Buetow <paul@buetow.org>2025-07-04 08:55:10 +0300
commit73b885b7b1e74de010fd8aafc0b89dc60b7ac870 (patch)
tree6330c1a2084aa6ea98192ebaa130911bed0cf90b /internal/server
parent859be4593e4f7ef37ff2c91dc90f42e6930a3996 (diff)
fix: resolve MapReduce turbo mode issues and serverless processing
- Fix serverless MapReduce to pass options with map command for proper mode detection - Prevent raw lines from being sent to client during MapReduce operations - Only use turbo mode for cat/grep/tail when no aggregate is present - Fix race conditions in TurboAggregate with proper synchronization - Add SafeAggregateSet wrapper for thread-safe operations - Fix parser selection to use correct parser names - Add comprehensive unit tests for turbo aggregate functionality This ensures MapReduce operations in both turbo and non-turbo modes produce identical results and fixes serverless mode processing. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/mapcommand.go6
-rw-r--r--internal/server/handlers/readcommand.go26
2 files changed, 23 insertions, 9 deletions
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go
index 5dc7b8f..c804189 100644
--- a/internal/server/handlers/mapcommand.go
+++ b/internal/server/handlers/mapcommand.go
@@ -23,8 +23,10 @@ func newMapCommand(serverHandler *ServerHandler, argc int,
m := mapCommand{server: serverHandler}
queryStr := strings.Join(args[1:], " ")
- // If turbo mode is enabled, create a TurboAggregate
- if config.Server.TurboModeEnable {
+ // If turbo mode is enabled AND we're in server mode (not serverless), create a TurboAggregate
+ // Turbo mode is a server-side optimization and should not be used in serverless mode
+ dlog.Server.Debug("MapReduce mode check", "turboModeEnable", config.Server.TurboModeEnable, "serverless", serverHandler.serverless)
+ if config.Server.TurboModeEnable && !serverHandler.serverless {
dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr)
turboAggregate, err := server.NewTurboAggregate(queryStr)
if err != nil {
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 0ceb8ee..2ce1fc7 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -261,24 +261,33 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
// Check if we should use the turbo boost optimizations
// Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations
// MapReduce now has a turbo mode implementation that bypasses channels
+ dlog.Server.Debug(r.server.user, "Checking turbo mode", "turboModeEnable", config.Server.TurboModeEnable,
+ "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil, "hasAggregate", r.server.aggregate != nil)
+ // Only use turbo mode if:
+ // 1. Turbo mode is enabled AND
+ // 2. We have a turbo aggregate OR (we're in cat/grep/tail mode AND we don't have a regular aggregate)
if config.Server.TurboModeEnable &&
- (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient || r.server.turboAggregate != nil) {
+ (r.server.turboAggregate != nil || ((r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && r.server.aggregate == nil)) {
dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil)
r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader)
return
}
// Original channel-based implementation
- lines := r.server.lines
aggregate := r.server.aggregate
+ var lines chan *line.Line
for {
if aggregate != nil {
- // Use a larger buffer for aggregate operations to handle high concurrency
- // This prevents deadlock when processing many files simultaneously
+ // For MapReduce operations, create a new channel that goes only to the aggregate
+ // This prevents lines from being sent to the client
lines = make(chan *line.Line, 10000)
aggregate.NextLinesCh <- lines
+ } else {
+ // For non-MapReduce operations, use the server's lines channel
+ lines = r.server.lines
}
+
if err := reader.Start(ctx, ltx, lines, re); err != nil {
dlog.Server.Error(r.server.user, path, globID, err)
}
@@ -315,18 +324,21 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
}
// Use the existing lines channel but with the processor-based reader
- lines := r.server.lines
aggregate := r.server.aggregate
+ var lines chan *line.Line
// Use the optimized version if turbo boost is enabled
turboBoostEnabled := config.Server.TurboModeEnable
for {
if aggregate != nil {
- // Use a larger buffer for aggregate operations to handle high concurrency
- // This prevents deadlock when processing many files simultaneously
+ // For MapReduce operations, create a new channel that goes only to the aggregate
+ // This prevents lines from being sent to the client
lines = make(chan *line.Line, 10000)
aggregate.NextLinesCh <- lines
+ } else {
+ // For non-MapReduce operations, use the server's lines channel
+ lines = r.server.lines
}
// Create a processor that sends to the lines channel