diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
| commit | f1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch) | |
| tree | a9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/server/handlers/readcommand.go | |
| parent | a4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff) | |
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access
- Implement TurboAggregate for direct line processing without channels
- Fix race conditions in turbo mode MapReduce aggregation
- Add proper synchronization for batch processing completion
- Update shutdown sequence to ensure all data is serialized
- Add integration test configuration for high-concurrency scenarios
The turbo mode now correctly handles MapReduce queries with significant
performance improvements while maintaining data integrity and preventing
race conditions during concurrent aggregation.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/readcommand.go')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 36 |
1 files changed, 30 insertions, 6 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index bdb7b8b..ce44996 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "context" "os" "path/filepath" @@ -14,6 +15,7 @@ import ( "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/lcontext" + "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" ) @@ -169,6 +171,14 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC // Check if we should trigger shutdown now // Only shutdown if no files are pending AND no commands are active if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 { + // If we have a turbo aggregate, trigger final serialization + if r.server.turboAggregate != nil { + dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization") + r.server.turboAggregate.Serialize(context.Background()) + // Give time for serialization to complete + time.Sleep(100 * time.Millisecond) + } + // Double-check that we really have no pending work // In turbo mode, there might be a race condition time.Sleep(10 * time.Millisecond) @@ -248,10 +258,11 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, } // Check if we should use the turbo boost optimizations - // Enable turbo boost for cat/grep/tail modes, but NOT for aggregate (MapReduce) operations - // MapReduce requires the traditional channel-based approach to work correctly - if config.Server.TurboModeEnable && r.server.aggregate == nil && - (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { + // Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations + // MapReduce now has a turbo mode implementation that bypasses channels + if config.Server.TurboModeEnable && + (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient || r.server.turboAggregate != nil) { + dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil) r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader) return } @@ -390,8 +401,21 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L for { dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration") - // Create a direct processor that writes without channels - processor := NewDirectLineProcessor(writer, globID) + // Create a processor based on whether we're doing MapReduce or not + var processor interface { + ProcessLine(*bytes.Buffer, uint64, string) error + Flush() error + Close() error + } + + if r.server.turboAggregate != nil { + // Use turbo aggregate processor for MapReduce operations + dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID) + processor = server.NewTurboAggregateProcessor(r.server.turboAggregate, globID) + } else { + // Use direct line processor for cat/grep/tail + processor = NewDirectLineProcessor(writer, globID) + } // Use the optimized reader dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start") |
