summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
committerPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
commitf1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch)
treea9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/server/handlers/readcommand.go
parenta4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff)
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access - Implement TurboAggregate for direct line processing without channels - Fix race conditions in turbo mode MapReduce aggregation - Add proper synchronization for batch processing completion - Update shutdown sequence to ensure all data is serialized - Add integration test configuration for high-concurrency scenarios The turbo mode now correctly handles MapReduce queries with significant performance improvements while maintaining data integrity and preventing race conditions during concurrent aggregation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go36
1 files changed, 30 insertions, 6 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index bdb7b8b..ce44996 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"context"
"os"
"path/filepath"
@@ -14,6 +15,7 @@ import (
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
)
@@ -169,6 +171,14 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
// Check if we should trigger shutdown now
// Only shutdown if no files are pending AND no commands are active
if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 {
+ // If we have a turbo aggregate, trigger final serialization
+ if r.server.turboAggregate != nil {
+ dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization")
+ r.server.turboAggregate.Serialize(context.Background())
+ // Give time for serialization to complete
+ time.Sleep(100 * time.Millisecond)
+ }
+
// Double-check that we really have no pending work
// In turbo mode, there might be a race condition
time.Sleep(10 * time.Millisecond)
@@ -248,10 +258,11 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
}
// Check if we should use the turbo boost optimizations
- // Enable turbo boost for cat/grep/tail modes, but NOT for aggregate (MapReduce) operations
- // MapReduce requires the traditional channel-based approach to work correctly
- if config.Server.TurboModeEnable && r.server.aggregate == nil &&
- (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
+ // Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations
+ // MapReduce now has a turbo mode implementation that bypasses channels
+ if config.Server.TurboModeEnable &&
+ (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient || r.server.turboAggregate != nil) {
+ dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil)
r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader)
return
}
@@ -390,8 +401,21 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
for {
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration")
- // Create a direct processor that writes without channels
- processor := NewDirectLineProcessor(writer, globID)
+ // Create a processor based on whether we're doing MapReduce or not
+ var processor interface {
+ ProcessLine(*bytes.Buffer, uint64, string) error
+ Flush() error
+ Close() error
+ }
+
+ if r.server.turboAggregate != nil {
+ // Use turbo aggregate processor for MapReduce operations
+ dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID)
+ processor = server.NewTurboAggregateProcessor(r.server.turboAggregate, globID)
+ } else {
+ // Use direct line processor for cat/grep/tail
+ processor = NewDirectLineProcessor(writer, globID)
+ }
// Use the optimized reader
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start")