diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
| commit | f1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch) | |
| tree | a9348d535148dae1a93f1b08e17d9870a30e7c75 /internal | |
| parent | a4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff) | |
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access
- Implement TurboAggregate for direct line processing without channels
- Fix race conditions in turbo mode MapReduce aggregation
- Add proper synchronization for batch processing completion
- Update shutdown sequence to ensure all data is serialized
- Add integration test configuration for high-concurrency scenarios
The turbo mode now correctly handles MapReduce queries with significant
performance improvements while maintaining data integrity and preventing
race conditions during concurrent aggregation.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/mapr/safe_aggregateset.go | 71 | ||||
| -rw-r--r-- | internal/mapr/safe_aggregateset_test.go | 147 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 70 | ||||
| -rw-r--r-- | internal/mapr/server/turbo_aggregate.go | 415 | ||||
| -rw-r--r-- | internal/server/handlers/basehandler.go | 15 | ||||
| -rw-r--r-- | internal/server/handlers/mapcommand.go | 32 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 36 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 3 |
8 files changed, 744 insertions, 45 deletions
diff --git a/internal/mapr/safe_aggregateset.go b/internal/mapr/safe_aggregateset.go new file mode 100644 index 0000000..b42814d --- /dev/null +++ b/internal/mapr/safe_aggregateset.go @@ -0,0 +1,71 @@ +package mapr + +import ( + "context" + "sync" +) + +// SafeAggregateSet is a thread-safe wrapper around AggregateSet for use in turbo mode. +// It uses a read-write mutex to allow concurrent reads but exclusive writes. +type SafeAggregateSet struct { + mu sync.RWMutex + set *AggregateSet +} + +// NewSafeAggregateSet creates a new thread-safe aggregate set. +func NewSafeAggregateSet() *SafeAggregateSet { + return &SafeAggregateSet{ + set: NewAggregateSet(), + } +} + +// Aggregate data to the aggregate set with thread safety. +func (s *SafeAggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.set.Aggregate(key, agg, value, clientAggregation) +} + +// IncrementSamples increments the sample count safely. +func (s *SafeAggregateSet) IncrementSamples() { + s.mu.Lock() + defer s.mu.Unlock() + s.set.Samples++ +} + +// Clone creates a deep copy of the aggregate set. +// This is useful for serialization to avoid holding locks for too long. +func (s *SafeAggregateSet) Clone() *AggregateSet { + s.mu.RLock() + defer s.mu.RUnlock() + + clone := &AggregateSet{ + Samples: s.set.Samples, + FValues: make(map[string]float64, len(s.set.FValues)), + SValues: make(map[string]string, len(s.set.SValues)), + } + + // Deep copy the maps + for k, v := range s.set.FValues { + clone.FValues[k] = v + } + for k, v := range s.set.SValues { + clone.SValues[k] = v + } + + return clone +} + +// Serialize the aggregate set safely. +func (s *SafeAggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) { + // Clone the set to avoid holding the lock during serialization + clone := s.Clone() + clone.Serialize(ctx, groupKey, ch) +} + +// GetSamples returns the current sample count safely. +func (s *SafeAggregateSet) GetSamples() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.set.Samples +}
\ No newline at end of file diff --git a/internal/mapr/safe_aggregateset_test.go b/internal/mapr/safe_aggregateset_test.go new file mode 100644 index 0000000..327987a --- /dev/null +++ b/internal/mapr/safe_aggregateset_test.go @@ -0,0 +1,147 @@ +package mapr + +import ( + "sync" + "testing" +) + +// TestSafeAggregateSetConcurrency tests that SafeAggregateSet handles concurrent operations correctly. +func TestSafeAggregateSetConcurrency(t *testing.T) { + safeSet := NewSafeAggregateSet() + + // Number of concurrent goroutines + numGoroutines := 100 + // Number of operations per goroutine + opsPerGoroutine := 1000 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + // Launch concurrent goroutines + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + + for j := 0; j < opsPerGoroutine; j++ { + // Test Count operation + err := safeSet.Aggregate("count", Count, "1", false) + if err != nil { + t.Errorf("Error in Count aggregation: %v", err) + } + + // Test Sum operation + err = safeSet.Aggregate("sum", Sum, "10.5", false) + if err != nil { + t.Errorf("Error in Sum aggregation: %v", err) + } + + // Test Last operation + err = safeSet.Aggregate("last", Last, "value", false) + if err != nil { + t.Errorf("Error in Last aggregation: %v", err) + } + + // Test Min operation + err = safeSet.Aggregate("min", Min, "5.0", false) + if err != nil { + t.Errorf("Error in Min aggregation: %v", err) + } + + // Test Max operation + err = safeSet.Aggregate("max", Max, "15.0", false) + if err != nil { + t.Errorf("Error in Max aggregation: %v", err) + } + + // Increment samples + safeSet.IncrementSamples() + } + }(i) + } + + // Wait for all goroutines to complete + wg.Wait() + + // Verify results + clone := safeSet.Clone() + + // Check Count + expectedCount := float64(numGoroutines * opsPerGoroutine) + if clone.FValues["count"] != expectedCount { + t.Errorf("Expected count %f, got %f", expectedCount, clone.FValues["count"]) + } + + // Check Sum + expectedSum := float64(numGoroutines * opsPerGoroutine) * 10.5 + if clone.FValues["sum"] != expectedSum { + t.Errorf("Expected sum %f, got %f", expectedSum, clone.FValues["sum"]) + } + + // Check Min + if clone.FValues["min"] != 5.0 { + t.Errorf("Expected min 5.0, got %f", clone.FValues["min"]) + } + + // Check Max + if clone.FValues["max"] != 15.0 { + t.Errorf("Expected max 15.0, got %f", clone.FValues["max"]) + } + + // Check Samples + expectedSamples := numGoroutines * opsPerGoroutine + if clone.Samples != expectedSamples { + t.Errorf("Expected samples %d, got %d", expectedSamples, clone.Samples) + } + + // Check Last (should be "value") + if clone.SValues["last"] != "value" { + t.Errorf("Expected last 'value', got '%s'", clone.SValues["last"]) + } +} + +// TestSafeAggregateSetClone tests that cloning creates an independent copy. +func TestSafeAggregateSetClone(t *testing.T) { + original := NewSafeAggregateSet() + + // Add some data + original.Aggregate("count", Count, "1", false) + original.Aggregate("sum", Sum, "100", false) + original.Aggregate("last", Last, "original", false) + original.IncrementSamples() + + // Clone the set + clone := original.Clone() + + // Modify the original + original.Aggregate("count", Count, "1", false) + original.Aggregate("sum", Sum, "50", false) + original.Aggregate("last", Last, "modified", false) + original.IncrementSamples() + + // Verify clone is unchanged + if clone.FValues["count"] != 1 { + t.Errorf("Clone count should be 1, got %f", clone.FValues["count"]) + } + + if clone.FValues["sum"] != 100 { + t.Errorf("Clone sum should be 100, got %f", clone.FValues["sum"]) + } + + if clone.SValues["last"] != "original" { + t.Errorf("Clone last should be 'original', got '%s'", clone.SValues["last"]) + } + + if clone.Samples != 1 { + t.Errorf("Clone samples should be 1, got %d", clone.Samples) + } + + // Verify original has changed + originalClone := original.Clone() + if originalClone.FValues["count"] != 2 { + t.Errorf("Original count should be 2, got %f", originalClone.FValues["count"]) + } + + if originalClone.FValues["sum"] != 150 { + t.Errorf("Original sum should be 150, got %f", originalClone.FValues["sum"]) + } +}
\ No newline at end of file diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index f055b9d..353cda5 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -68,7 +68,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { return &Aggregate{ done: internal.NewDone(), - NextLinesCh: make(chan chan *line.Line, 1000), + NextLinesCh: make(chan chan *line.Line, 10000), // Increased buffer for high concurrency serialize: make(chan struct{}), hostname: s[0], query: query, @@ -116,15 +116,15 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { } } -func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) { - dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels) +func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) { + dlog.Server.Trace("nextLine.enter", l, ok, noMoreChannels) // Protect channel operations with mutex to prevent race conditions a.mu.Lock() defer a.mu.Unlock() select { - case line, ok = <-a.linesCh: + case l, ok = <-a.linesCh: if !ok { // Channel is closed, go to next channel. select { @@ -140,40 +140,46 @@ func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) { oldLinesCh := a.linesCh a.linesCh = newLinesCh - // In turbo mode, synchronously put the channel back to avoid race conditions - if config.Server.TurboModeEnable { - select { - case a.NextLinesCh <- oldLinesCh: - // Successfully put back - default: - // Channel is full, start a goroutine with timeout - go func() { - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() - select { - case a.NextLinesCh <- oldLinesCh: - case <-timer.C: - dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh full") - } - }() - } - } else { - // Non-turbo mode: use goroutine as before - go func() { - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() + // Ensure the old channel is fully drained before recycling to prevent data mixing + go func(oldCh chan *line.Line) { + // First, drain any remaining lines from the old channel + drained := 0 + drainLoop: + for { select { - case a.NextLinesCh <- oldLinesCh: - case <-timer.C: - dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full") + case l, ok := <-oldCh: + if !ok { + // Channel is closed, safe to recycle + break drainLoop + } + if l != nil { + l.Recycle() + drained++ + } + default: + // No more lines to drain immediately + break drainLoop } - }() - } + } + + if drained > 0 { + dlog.Server.Debug("Drained", drained, "lines from recycled channel") + } + + // Now safely recycle the drained channel + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case a.NextLinesCh <- oldCh: + case <-timer.C: + dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full") + } + }(oldLinesCh) default: // No new lines channel found. } } - dlog.Server.Trace("nextLine.exit", line, ok, noMoreChannels) + dlog.Server.Trace("nextLine.exit", l, ok, noMoreChannels) return } diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go new file mode 100644 index 0000000..9a748f5 --- /dev/null +++ b/internal/mapr/server/turbo_aggregate.go @@ -0,0 +1,415 @@ +package server + +import ( + "bytes" + "context" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/mapr" + "github.com/mimecast/dtail/internal/mapr/logformat" + "github.com/mimecast/dtail/internal/protocol" +) + +// TurboAggregate is a high-performance aggregator for MapReduce operations in turbo mode. +// It processes lines directly without channels for maximum throughput. +type TurboAggregate struct { + done *internal.Done + // Hostname of the current server (used to populate $hostname field). + hostname string + // The mapr query + query *mapr.Query + // The mapr log format parser + parser logformat.Parser + // Group sets protected by mutex during serialization + groupSets sync.Map // map[string]*mapr.SafeAggregateSet + bufferMu sync.Mutex // Protects serialization + // Batch processing + batchMu sync.Mutex + batch []rawLine + batchSize int + // Periodic serialization + serializeTicker *time.Ticker + serialize chan struct{} + maprMessages chan<- string + // Stats + linesProcessed atomic.Uint64 + errors atomic.Uint64 + // Field map pool to reduce allocations + fieldPool sync.Pool + // Synchronization for clean shutdown + processingWg sync.WaitGroup +} + +type rawLine struct { + content []byte + sourceID string +} + +// NewTurboAggregate returns a new turbo mode aggregator. +func NewTurboAggregate(queryStr string) (*TurboAggregate, error) { + query, err := mapr.NewQuery(queryStr) + if err != nil { + return nil, err + } + + fqdn, err := config.Hostname() + if err != nil { + dlog.Server.Error(err) + } + s := strings.Split(fqdn, ".") + + var parserName string + switch query.LogFormat { + case "": + parserName = config.Server.MapreduceLogFormat + if query.Table == "" { + parserName = "generic" + } + default: + parserName = query.LogFormat + } + + dlog.Server.Info("Creating turbo log format parser", parserName) + logParser, err := logformat.NewParser(parserName, query) + if err != nil { + dlog.Server.Error("Could not create log format parser. Falling back to 'generic'", err) + if logParser, err = logformat.NewParser("generic", query); err != nil { + dlog.Server.FatalPanic("Could not create log format parser", err) + } + } + + return &TurboAggregate{ + done: internal.NewDone(), + serialize: make(chan struct{}), + hostname: s[0], + query: query, + parser: logParser, + groupSets: sync.Map{}, + batchSize: 100, // Process 100 lines at a time + batch: make([]rawLine, 0, 100), + fieldPool: sync.Pool{ + New: func() interface{} { + return make(map[string]string, 20) + }, + }, + }, nil +} + +// Shutdown the aggregation engine. +func (a *TurboAggregate) Shutdown() { + dlog.Server.Info("Shutting down turbo aggregate", "linesProcessed", a.linesProcessed.Load()) + + // Signal shutdown + a.done.Shutdown() + + // Stop the ticker + if a.serializeTicker != nil { + a.serializeTicker.Stop() + } + + // Process any remaining batch + a.processBatch() + + // Wait for all processing to complete + dlog.Server.Info("Waiting for all processing to complete") + a.processingWg.Wait() + + // Trigger final serialization after all processing is done + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + a.doSerialize(ctx) + + // Give time for messages to be sent + time.Sleep(100 * time.Millisecond) +} + +// Start the turbo aggregation. +func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) { + a.maprMessages = maprMessages + + dlog.Server.Info("Starting turbo aggregate", "interval", a.query.Interval) + + // Start periodic serialization + a.serializeTicker = time.NewTicker(a.query.Interval) + go a.serializationLoop(ctx) + + // Start batch processor + go a.batchProcessorLoop(ctx) + + // Also trigger an immediate serialization to ensure we capture data even if interval hasn't passed + go func() { + time.Sleep(50 * time.Millisecond) // Give time for some data to accumulate + a.Serialize(ctx) + }() +} + +// ProcessLineDirect processes a line directly without channels. +// This is called from the TurboAggregateProcessor. +func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) error { + // Make a copy of the line content as the buffer will be recycled + content := make([]byte, len(lineContent)) + copy(content, lineContent) + + // Add to batch + a.batchMu.Lock() + a.batch = append(a.batch, rawLine{content: content, sourceID: sourceID}) + shouldProcess := len(a.batch) >= a.batchSize + batchLen := len(a.batch) + a.batchMu.Unlock() + + if batchLen == 1 { + dlog.Server.Debug("TurboAggregate: First line received", "sourceID", sourceID) + } + + // Process batch if full + if shouldProcess { + a.processBatch() + } + + return nil +} + +// batchProcessorLoop continuously processes batches. +func (a *TurboAggregate) batchProcessorLoop(ctx context.Context) { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // Process any remaining batch before exiting + a.processBatch() + return + case <-a.done.Done(): + // Process any remaining batch before exiting + a.processBatch() + return + case <-ticker.C: + // Periodically process any accumulated batch + a.processBatch() + } + } +} + +// processBatch processes a batch of lines. +func (a *TurboAggregate) processBatch() { + a.batchMu.Lock() + if len(a.batch) == 0 { + a.batchMu.Unlock() + return + } + batch := a.batch + a.batch = make([]rawLine, 0, a.batchSize) + a.batchMu.Unlock() + + // Track this batch processing + a.processingWg.Add(1) + defer a.processingWg.Done() + + // Process each line in the batch + for _, line := range batch { + if err := a.processLine(line.content, line.sourceID); err != nil { + a.errors.Add(1) + dlog.Server.Error("Error processing line:", err) + } + a.linesProcessed.Add(1) + } +} + +// processLine processes a single line and aggregates it. +func (a *TurboAggregate) processLine(lineContent []byte, sourceID string) error { + // Trim whitespace + maprLine := strings.TrimSpace(string(lineContent)) + + // Get a field map from the pool + fields := a.fieldPool.Get().(map[string]string) + defer func() { + // Clear the map before returning to pool + for k := range fields { + delete(fields, k) + } + a.fieldPool.Put(fields) + }() + + // Parse the line + parsedFields, err := a.parser.MakeFields(maprLine) + if err != nil { + if err != logformat.ErrIgnoreFields { + return err + } + return nil + } + + // Copy parsed fields to our pooled map + for k, v := range parsedFields { + fields[k] = v + } + + // Apply where clause + if !a.query.WhereClause(fields) { + return nil + } + + // Apply set clause if needed + if len(a.query.Set) > 0 { + if err := a.query.SetClause(fields); err != nil { + return err + } + } + + // Aggregate the fields + a.aggregate(fields) + return nil +} + +// aggregate adds fields to the appropriate group. +func (a *TurboAggregate) aggregate(fields map[string]string) { + // Build group key + var sb strings.Builder + for i, field := range a.query.GroupBy { + if i > 0 { + sb.WriteString(protocol.AggregateGroupKeyCombinator) + } + if val, ok := fields[field]; ok { + sb.WriteString(val) + } + } + groupKey := sb.String() + + // Get or create the aggregate set + setInterface, loaded := a.groupSets.LoadOrStore(groupKey, mapr.NewSafeAggregateSet()) + set := setInterface.(*mapr.SafeAggregateSet) + + if !loaded { + dlog.Server.Debug("TurboAggregate: New group created", "groupKey", groupKey) + } + + // Aggregate the values + var addedSample bool + for _, sc := range a.query.Select { + if val, ok := fields[sc.Field]; ok { + if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { + dlog.Server.Error(err) + continue + } + addedSample = true + } + } + + if addedSample { + set.IncrementSamples() + } +} + +// serializationLoop handles periodic serialization. +func (a *TurboAggregate) serializationLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-a.done.Done(): + return + case <-a.serializeTicker.C: + a.Serialize(ctx) + case <-a.serialize: + a.doSerialize(ctx) + } + } +} + +// Serialize triggers serialization of all aggregated data. +func (a *TurboAggregate) Serialize(ctx context.Context) { + select { + case a.serialize <- struct{}{}: + case <-time.After(time.Minute): + dlog.Server.Warn("Starting to serialize mapreduce data takes over a minute") + case <-ctx.Done(): + } +} + +// doSerialize performs the actual serialization. +func (a *TurboAggregate) doSerialize(ctx context.Context) { + // Process any remaining batch + a.processBatch() + + // Wait a moment for any in-progress batch processing + time.Sleep(10 * time.Millisecond) + + dlog.Server.Info("Serializing turbo mapreduce result", "linesProcessed", a.linesProcessed.Load()) + + // Lock to prevent concurrent modifications during serialization + a.bufferMu.Lock() + defer a.bufferMu.Unlock() + + // Create a new group set for serialization + group := mapr.NewGroupSet() + + // Copy all aggregate sets from the groupSets + groupCount := 0 + a.groupSets.Range(func(key, value interface{}) bool { + groupKey := key.(string) + safeSet := value.(*mapr.SafeAggregateSet) + + // Clone the safe set to get a regular AggregateSet + clonedSet := safeSet.Clone() + + // Add to the group set + groupSet := group.GetSet(groupKey) + *groupSet = *clonedSet + groupCount++ + + return true + }) + + dlog.Server.Info("Serializing groups", "groupCount", groupCount) + + // Serialize the group + group.Serialize(ctx, a.maprMessages) + + // Clear the groupSets after serialization + a.groupSets = sync.Map{} +} + +// TurboAggregateProcessor implements the line processor interface for turbo mode aggregation. +type TurboAggregateProcessor struct { + aggregate *TurboAggregate + globID string +} + +// NewTurboAggregateProcessor creates a new turbo aggregate processor. +func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *TurboAggregateProcessor { + return &TurboAggregateProcessor{ + aggregate: aggregate, + globID: globID, + } +} + +// ProcessLine processes a line directly to the turbo aggregate. +func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error { + // Process the line directly + err := p.aggregate.ProcessLineDirect(lineContent.Bytes(), sourceID) + + // Recycle the buffer + pool.RecycleBytesBuffer(lineContent) + + return err +} + +// Flush ensures all buffered data is processed. +func (p *TurboAggregateProcessor) Flush() error { + // Process any remaining batch + p.aggregate.processBatch() + return nil +} + +// Close flushes any remaining data. +func (p *TurboAggregateProcessor) Close() error { + return p.Flush() +}
\ No newline at end of file diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index bfc7ec2..427ab6c 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -31,6 +31,7 @@ type baseHandler struct { handleCommandCb handleCommandCb lines chan *line.Line aggregate *server.Aggregate + turboAggregate *server.TurboAggregate // Turbo mode aggregate maprMessages chan string serverMessages chan string hostname string @@ -56,6 +57,16 @@ type baseHandler struct { // Shutdown the handler. func (h *baseHandler) Shutdown() { + // Shutdown turbo aggregate if present + if h.turboAggregate != nil { + dlog.Server.Info(h.user, "Shutting down turbo aggregate") + h.turboAggregate.Shutdown() + } + // Shutdown regular aggregate if present + if h.aggregate != nil { + dlog.Server.Info(h.user, "Shutting down regular aggregate") + h.aggregate.Shutdown() + } h.done.Shutdown() } @@ -372,6 +383,10 @@ func (h *baseHandler) flush() { if h.turboMode { maxIterations = 300 // Give more time for turbo mode to drain } + // Also increase iterations if we have MapReduce messages + if h.turboAggregate != nil || h.aggregate != nil { + maxIterations = 300 // Give more time for MapReduce results + } for i := 0; i < maxIterations; i++ { if numUnsentMessages() == 0 { diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go index 65e0ed8..5dc7b8f 100644 --- a/internal/server/handlers/mapcommand.go +++ b/internal/server/handlers/mapcommand.go @@ -4,29 +4,49 @@ import ( "context" "strings" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr/server" ) // Map command implements the mapreduce command server side. type mapCommand struct { - aggregate *server.Aggregate - server *ServerHandler + aggregate *server.Aggregate + turboAggregate *server.TurboAggregate + server *ServerHandler } // NewMapCommand returns a new server side mapreduce command. func newMapCommand(serverHandler *ServerHandler, argc int, - args []string) (mapCommand, *server.Aggregate, error) { + args []string) (mapCommand, *server.Aggregate, *server.TurboAggregate, error) { m := mapCommand{server: serverHandler} queryStr := strings.Join(args[1:], " ") + + // If turbo mode is enabled, create a TurboAggregate + if config.Server.TurboModeEnable { + dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr) + turboAggregate, err := server.NewTurboAggregate(queryStr) + if err != nil { + return m, nil, nil, err + } + m.turboAggregate = turboAggregate + return m, nil, turboAggregate, nil + } + + // Otherwise, create a regular Aggregate aggregate, err := server.NewAggregate(queryStr) if err != nil { - return m, nil, err + return m, nil, nil, err } m.aggregate = aggregate - return m, aggregate, nil + return m, aggregate, nil, nil } func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) { - m.aggregate.Start(ctx, aggregatedMessages) + if m.turboAggregate != nil { + m.turboAggregate.Start(ctx, aggregatedMessages) + } else { + m.aggregate.Start(ctx, aggregatedMessages) + } } diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index bdb7b8b..ce44996 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "context" "os" "path/filepath" @@ -14,6 +15,7 @@ import ( "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/lcontext" + "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" ) @@ -169,6 +171,14 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC // Check if we should trigger shutdown now // Only shutdown if no files are pending AND no commands are active if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 { + // If we have a turbo aggregate, trigger final serialization + if r.server.turboAggregate != nil { + dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization") + r.server.turboAggregate.Serialize(context.Background()) + // Give time for serialization to complete + time.Sleep(100 * time.Millisecond) + } + // Double-check that we really have no pending work // In turbo mode, there might be a race condition time.Sleep(10 * time.Millisecond) @@ -248,10 +258,11 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, } // Check if we should use the turbo boost optimizations - // Enable turbo boost for cat/grep/tail modes, but NOT for aggregate (MapReduce) operations - // MapReduce requires the traditional channel-based approach to work correctly - if config.Server.TurboModeEnable && r.server.aggregate == nil && - (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { + // Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations + // MapReduce now has a turbo mode implementation that bypasses channels + if config.Server.TurboModeEnable && + (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient || r.server.turboAggregate != nil) { + dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil) r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader) return } @@ -390,8 +401,21 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L for { dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration") - // Create a direct processor that writes without channels - processor := NewDirectLineProcessor(writer, globID) + // Create a processor based on whether we're doing MapReduce or not + var processor interface { + ProcessLine(*bytes.Buffer, uint64, string) error + Flush() error + Close() error + } + + if r.server.turboAggregate != nil { + // Use turbo aggregate processor for MapReduce operations + dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID) + processor = server.NewTurboAggregateProcessor(r.server.turboAggregate, globID) + } else { + // Use direct line processor for cat/grep/tail + processor = NewDirectLineProcessor(writer, globID) + } // Use the optimized reader dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start") diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index da27066..df227ab 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -93,7 +93,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon commandFinished() }() case "map": - command, aggregate, err := newMapCommand(h, argc, args) + command, aggregate, turboAggregate, err := newMapCommand(h, argc, args) if err != nil { h.sendln(h.serverMessages, err.Error()) dlog.Server.Error(h.user, err) @@ -101,6 +101,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon return } h.aggregate = aggregate + h.turboAggregate = turboAggregate go func() { command.Start(ctx, h.maprMessages) commandFinished() |
