diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
| commit | f1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch) | |
| tree | a9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/server/handlers/mapcommand.go | |
| parent | a4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff) | |
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access
- Implement TurboAggregate for direct line processing without channels
- Fix race conditions in turbo mode MapReduce aggregation
- Add proper synchronization for batch processing completion
- Update shutdown sequence to ensure all data is serialized
- Add integration test configuration for high-concurrency scenarios
The turbo mode now correctly handles MapReduce queries with significant
performance improvements while maintaining data integrity and preventing
race conditions during concurrent aggregation.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/mapcommand.go')
| -rw-r--r-- | internal/server/handlers/mapcommand.go | 32 |
1 files changed, 26 insertions, 6 deletions
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go index 65e0ed8..5dc7b8f 100644 --- a/internal/server/handlers/mapcommand.go +++ b/internal/server/handlers/mapcommand.go @@ -4,29 +4,49 @@ import ( "context" "strings" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr/server" ) // Map command implements the mapreduce command server side. type mapCommand struct { - aggregate *server.Aggregate - server *ServerHandler + aggregate *server.Aggregate + turboAggregate *server.TurboAggregate + server *ServerHandler } // NewMapCommand returns a new server side mapreduce command. func newMapCommand(serverHandler *ServerHandler, argc int, - args []string) (mapCommand, *server.Aggregate, error) { + args []string) (mapCommand, *server.Aggregate, *server.TurboAggregate, error) { m := mapCommand{server: serverHandler} queryStr := strings.Join(args[1:], " ") + + // If turbo mode is enabled, create a TurboAggregate + if config.Server.TurboModeEnable { + dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr) + turboAggregate, err := server.NewTurboAggregate(queryStr) + if err != nil { + return m, nil, nil, err + } + m.turboAggregate = turboAggregate + return m, nil, turboAggregate, nil + } + + // Otherwise, create a regular Aggregate aggregate, err := server.NewAggregate(queryStr) if err != nil { - return m, nil, err + return m, nil, nil, err } m.aggregate = aggregate - return m, aggregate, nil + return m, aggregate, nil, nil } func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) { - m.aggregate.Start(ctx, aggregatedMessages) + if m.turboAggregate != nil { + m.turboAggregate.Start(ctx, aggregatedMessages) + } else { + m.aggregate.Start(ctx, aggregatedMessages) + } } |
