summaryrefslogtreecommitdiff
path: root/internal/server/handlers/mapcommand.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
committerPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
commitf1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch)
treea9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/server/handlers/mapcommand.go
parenta4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff)
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access - Implement TurboAggregate for direct line processing without channels - Fix race conditions in turbo mode MapReduce aggregation - Add proper synchronization for batch processing completion - Update shutdown sequence to ensure all data is serialized - Add integration test configuration for high-concurrency scenarios The turbo mode now correctly handles MapReduce queries with significant performance improvements while maintaining data integrity and preventing race conditions during concurrent aggregation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/mapcommand.go')
-rw-r--r--internal/server/handlers/mapcommand.go32
1 files changed, 26 insertions, 6 deletions
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go
index 65e0ed8..5dc7b8f 100644
--- a/internal/server/handlers/mapcommand.go
+++ b/internal/server/handlers/mapcommand.go
@@ -4,29 +4,49 @@ import (
"context"
"strings"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr/server"
)
// Map command implements the mapreduce command server side.
type mapCommand struct {
- aggregate *server.Aggregate
- server *ServerHandler
+ aggregate *server.Aggregate
+ turboAggregate *server.TurboAggregate
+ server *ServerHandler
}
// NewMapCommand returns a new server side mapreduce command.
func newMapCommand(serverHandler *ServerHandler, argc int,
- args []string) (mapCommand, *server.Aggregate, error) {
+ args []string) (mapCommand, *server.Aggregate, *server.TurboAggregate, error) {
m := mapCommand{server: serverHandler}
queryStr := strings.Join(args[1:], " ")
+
+ // If turbo mode is enabled, create a TurboAggregate
+ if config.Server.TurboModeEnable {
+ dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr)
+ turboAggregate, err := server.NewTurboAggregate(queryStr)
+ if err != nil {
+ return m, nil, nil, err
+ }
+ m.turboAggregate = turboAggregate
+ return m, nil, turboAggregate, nil
+ }
+
+ // Otherwise, create a regular Aggregate
aggregate, err := server.NewAggregate(queryStr)
if err != nil {
- return m, nil, err
+ return m, nil, nil, err
}
m.aggregate = aggregate
- return m, aggregate, nil
+ return m, aggregate, nil, nil
}
func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) {
- m.aggregate.Start(ctx, aggregatedMessages)
+ if m.turboAggregate != nil {
+ m.turboAggregate.Start(ctx, aggregatedMessages)
+ } else {
+ m.aggregate.Start(ctx, aggregatedMessages)
+ }
}