diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
| commit | f1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch) | |
| tree | a9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/server | |
| parent | a4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff) | |
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access
- Implement TurboAggregate for direct line processing without channels
- Fix race conditions in turbo mode MapReduce aggregation
- Add proper synchronization for batch processing completion
- Update shutdown sequence to ensure all data is serialized
- Add integration test configuration for high-concurrency scenarios
The turbo mode now correctly handles MapReduce queries with significant
performance improvements while maintaining data integrity and preventing
race conditions during concurrent aggregation.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 15 | ||||
| -rw-r--r-- | internal/server/handlers/mapcommand.go | 32 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 36 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 3 |
4 files changed, 73 insertions, 13 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index bfc7ec2..427ab6c 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -31,6 +31,7 @@ type baseHandler struct { handleCommandCb handleCommandCb lines chan *line.Line aggregate *server.Aggregate + turboAggregate *server.TurboAggregate // Turbo mode aggregate maprMessages chan string serverMessages chan string hostname string @@ -56,6 +57,16 @@ type baseHandler struct { // Shutdown the handler. func (h *baseHandler) Shutdown() { + // Shutdown turbo aggregate if present + if h.turboAggregate != nil { + dlog.Server.Info(h.user, "Shutting down turbo aggregate") + h.turboAggregate.Shutdown() + } + // Shutdown regular aggregate if present + if h.aggregate != nil { + dlog.Server.Info(h.user, "Shutting down regular aggregate") + h.aggregate.Shutdown() + } h.done.Shutdown() } @@ -372,6 +383,10 @@ func (h *baseHandler) flush() { if h.turboMode { maxIterations = 300 // Give more time for turbo mode to drain } + // Also increase iterations if we have MapReduce messages + if h.turboAggregate != nil || h.aggregate != nil { + maxIterations = 300 // Give more time for MapReduce results + } for i := 0; i < maxIterations; i++ { if numUnsentMessages() == 0 { diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go index 65e0ed8..5dc7b8f 100644 --- a/internal/server/handlers/mapcommand.go +++ b/internal/server/handlers/mapcommand.go @@ -4,29 +4,49 @@ import ( "context" "strings" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr/server" ) // Map command implements the mapreduce command server side. type mapCommand struct { - aggregate *server.Aggregate - server *ServerHandler + aggregate *server.Aggregate + turboAggregate *server.TurboAggregate + server *ServerHandler } // NewMapCommand returns a new server side mapreduce command. func newMapCommand(serverHandler *ServerHandler, argc int, - args []string) (mapCommand, *server.Aggregate, error) { + args []string) (mapCommand, *server.Aggregate, *server.TurboAggregate, error) { m := mapCommand{server: serverHandler} queryStr := strings.Join(args[1:], " ") + + // If turbo mode is enabled, create a TurboAggregate + if config.Server.TurboModeEnable { + dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr) + turboAggregate, err := server.NewTurboAggregate(queryStr) + if err != nil { + return m, nil, nil, err + } + m.turboAggregate = turboAggregate + return m, nil, turboAggregate, nil + } + + // Otherwise, create a regular Aggregate aggregate, err := server.NewAggregate(queryStr) if err != nil { - return m, nil, err + return m, nil, nil, err } m.aggregate = aggregate - return m, aggregate, nil + return m, aggregate, nil, nil } func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) { - m.aggregate.Start(ctx, aggregatedMessages) + if m.turboAggregate != nil { + m.turboAggregate.Start(ctx, aggregatedMessages) + } else { + m.aggregate.Start(ctx, aggregatedMessages) + } } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index bdb7b8b..ce44996 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "context" "os" "path/filepath" @@ -14,6 +15,7 @@ import ( "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/lcontext" + "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" ) @@ -169,6 +171,14 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC // Check if we should trigger shutdown now // Only shutdown if no files are pending AND no commands are active if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 { + // If we have a turbo aggregate, trigger final serialization + if r.server.turboAggregate != nil { + dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization") + r.server.turboAggregate.Serialize(context.Background()) + // Give time for serialization to complete + time.Sleep(100 * time.Millisecond) + } + // Double-check that we really have no pending work // In turbo mode, there might be a race condition time.Sleep(10 * time.Millisecond) @@ -248,10 +258,11 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, } // Check if we should use the turbo boost optimizations - // Enable turbo boost for cat/grep/tail modes, but NOT for aggregate (MapReduce) operations - // MapReduce requires the traditional channel-based approach to work correctly - if config.Server.TurboModeEnable && r.server.aggregate == nil && - (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { + // Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations + // MapReduce now has a turbo mode implementation that bypasses channels + if config.Server.TurboModeEnable && + (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient || r.server.turboAggregate != nil) { + dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil) r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader) return } @@ -390,8 +401,21 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L for { dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration") - // Create a direct processor that writes without channels - processor := NewDirectLineProcessor(writer, globID) + // Create a processor based on whether we're doing MapReduce or not + var processor interface { + ProcessLine(*bytes.Buffer, uint64, string) error + Flush() error + Close() error + } + + if r.server.turboAggregate != nil { + // Use turbo aggregate processor for MapReduce operations + dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID) + processor = server.NewTurboAggregateProcessor(r.server.turboAggregate, globID) + } else { + // Use direct line processor for cat/grep/tail + processor = NewDirectLineProcessor(writer, globID) + } // Use the optimized reader dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start") diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index da27066..df227ab 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -93,7 +93,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon commandFinished() }() case "map": - command, aggregate, err := newMapCommand(h, argc, args) + command, aggregate, turboAggregate, err := newMapCommand(h, argc, args) if err != nil { h.sendln(h.serverMessages, err.Error()) dlog.Server.Error(h.user, err) @@ -101,6 +101,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon return } h.aggregate = aggregate + h.turboAggregate = turboAggregate go func() { command.Start(ctx, h.maprMessages) commandFinished() |
