summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
committerPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
commitf1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch)
treea9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/server
parenta4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff)
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access - Implement TurboAggregate for direct line processing without channels - Fix race conditions in turbo mode MapReduce aggregation - Add proper synchronization for batch processing completion - Update shutdown sequence to ensure all data is serialized - Add integration test configuration for high-concurrency scenarios The turbo mode now correctly handles MapReduce queries with significant performance improvements while maintaining data integrity and preventing race conditions during concurrent aggregation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/basehandler.go15
-rw-r--r--internal/server/handlers/mapcommand.go32
-rw-r--r--internal/server/handlers/readcommand.go36
-rw-r--r--internal/server/handlers/serverhandler.go3
4 files changed, 73 insertions, 13 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index bfc7ec2..427ab6c 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -31,6 +31,7 @@ type baseHandler struct {
handleCommandCb handleCommandCb
lines chan *line.Line
aggregate *server.Aggregate
+ turboAggregate *server.TurboAggregate // Turbo mode aggregate
maprMessages chan string
serverMessages chan string
hostname string
@@ -56,6 +57,16 @@ type baseHandler struct {
// Shutdown the handler.
func (h *baseHandler) Shutdown() {
+ // Shutdown turbo aggregate if present
+ if h.turboAggregate != nil {
+ dlog.Server.Info(h.user, "Shutting down turbo aggregate")
+ h.turboAggregate.Shutdown()
+ }
+ // Shutdown regular aggregate if present
+ if h.aggregate != nil {
+ dlog.Server.Info(h.user, "Shutting down regular aggregate")
+ h.aggregate.Shutdown()
+ }
h.done.Shutdown()
}
@@ -372,6 +383,10 @@ func (h *baseHandler) flush() {
if h.turboMode {
maxIterations = 300 // Give more time for turbo mode to drain
}
+ // Also increase iterations if we have MapReduce messages
+ if h.turboAggregate != nil || h.aggregate != nil {
+ maxIterations = 300 // Give more time for MapReduce results
+ }
for i := 0; i < maxIterations; i++ {
if numUnsentMessages() == 0 {
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go
index 65e0ed8..5dc7b8f 100644
--- a/internal/server/handlers/mapcommand.go
+++ b/internal/server/handlers/mapcommand.go
@@ -4,29 +4,49 @@ import (
"context"
"strings"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr/server"
)
// Map command implements the mapreduce command server side.
type mapCommand struct {
- aggregate *server.Aggregate
- server *ServerHandler
+ aggregate *server.Aggregate
+ turboAggregate *server.TurboAggregate
+ server *ServerHandler
}
// NewMapCommand returns a new server side mapreduce command.
func newMapCommand(serverHandler *ServerHandler, argc int,
- args []string) (mapCommand, *server.Aggregate, error) {
+ args []string) (mapCommand, *server.Aggregate, *server.TurboAggregate, error) {
m := mapCommand{server: serverHandler}
queryStr := strings.Join(args[1:], " ")
+
+ // If turbo mode is enabled, create a TurboAggregate
+ if config.Server.TurboModeEnable {
+ dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr)
+ turboAggregate, err := server.NewTurboAggregate(queryStr)
+ if err != nil {
+ return m, nil, nil, err
+ }
+ m.turboAggregate = turboAggregate
+ return m, nil, turboAggregate, nil
+ }
+
+ // Otherwise, create a regular Aggregate
aggregate, err := server.NewAggregate(queryStr)
if err != nil {
- return m, nil, err
+ return m, nil, nil, err
}
m.aggregate = aggregate
- return m, aggregate, nil
+ return m, aggregate, nil, nil
}
func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) {
- m.aggregate.Start(ctx, aggregatedMessages)
+ if m.turboAggregate != nil {
+ m.turboAggregate.Start(ctx, aggregatedMessages)
+ } else {
+ m.aggregate.Start(ctx, aggregatedMessages)
+ }
}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index bdb7b8b..ce44996 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"context"
"os"
"path/filepath"
@@ -14,6 +15,7 @@ import (
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
)
@@ -169,6 +171,14 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
// Check if we should trigger shutdown now
// Only shutdown if no files are pending AND no commands are active
if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 {
+ // If we have a turbo aggregate, trigger final serialization
+ if r.server.turboAggregate != nil {
+ dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization")
+ r.server.turboAggregate.Serialize(context.Background())
+ // Give time for serialization to complete
+ time.Sleep(100 * time.Millisecond)
+ }
+
// Double-check that we really have no pending work
// In turbo mode, there might be a race condition
time.Sleep(10 * time.Millisecond)
@@ -248,10 +258,11 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
}
// Check if we should use the turbo boost optimizations
- // Enable turbo boost for cat/grep/tail modes, but NOT for aggregate (MapReduce) operations
- // MapReduce requires the traditional channel-based approach to work correctly
- if config.Server.TurboModeEnable && r.server.aggregate == nil &&
- (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
+ // Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations
+ // MapReduce now has a turbo mode implementation that bypasses channels
+ if config.Server.TurboModeEnable &&
+ (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient || r.server.turboAggregate != nil) {
+ dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil)
r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader)
return
}
@@ -390,8 +401,21 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
for {
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration")
- // Create a direct processor that writes without channels
- processor := NewDirectLineProcessor(writer, globID)
+ // Create a processor based on whether we're doing MapReduce or not
+ var processor interface {
+ ProcessLine(*bytes.Buffer, uint64, string) error
+ Flush() error
+ Close() error
+ }
+
+ if r.server.turboAggregate != nil {
+ // Use turbo aggregate processor for MapReduce operations
+ dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID)
+ processor = server.NewTurboAggregateProcessor(r.server.turboAggregate, globID)
+ } else {
+ // Use direct line processor for cat/grep/tail
+ processor = NewDirectLineProcessor(writer, globID)
+ }
// Use the optimized reader
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start")
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index da27066..df227ab 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -93,7 +93,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon
commandFinished()
}()
case "map":
- command, aggregate, err := newMapCommand(h, argc, args)
+ command, aggregate, turboAggregate, err := newMapCommand(h, argc, args)
if err != nil {
h.sendln(h.serverMessages, err.Error())
dlog.Server.Error(h.user, err)
@@ -101,6 +101,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon
return
}
h.aggregate = aggregate
+ h.turboAggregate = turboAggregate
go func() {
command.Start(ctx, h.maprMessages)
commandFinished()