summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
committerPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
commitf1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch)
treea9348d535148dae1a93f1b08e17d9870a30e7c75 /internal
parenta4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff)
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access - Implement TurboAggregate for direct line processing without channels - Fix race conditions in turbo mode MapReduce aggregation - Add proper synchronization for batch processing completion - Update shutdown sequence to ensure all data is serialized - Add integration test configuration for high-concurrency scenarios The turbo mode now correctly handles MapReduce queries with significant performance improvements while maintaining data integrity and preventing race conditions during concurrent aggregation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal')
-rw-r--r--internal/mapr/safe_aggregateset.go71
-rw-r--r--internal/mapr/safe_aggregateset_test.go147
-rw-r--r--internal/mapr/server/aggregate.go70
-rw-r--r--internal/mapr/server/turbo_aggregate.go415
-rw-r--r--internal/server/handlers/basehandler.go15
-rw-r--r--internal/server/handlers/mapcommand.go32
-rw-r--r--internal/server/handlers/readcommand.go36
-rw-r--r--internal/server/handlers/serverhandler.go3
8 files changed, 744 insertions, 45 deletions
diff --git a/internal/mapr/safe_aggregateset.go b/internal/mapr/safe_aggregateset.go
new file mode 100644
index 0000000..b42814d
--- /dev/null
+++ b/internal/mapr/safe_aggregateset.go
@@ -0,0 +1,71 @@
+package mapr
+
+import (
+ "context"
+ "sync"
+)
+
+// SafeAggregateSet is a thread-safe wrapper around AggregateSet for use in turbo mode.
+// It uses a read-write mutex to allow concurrent reads but exclusive writes.
+type SafeAggregateSet struct {
+ mu sync.RWMutex
+ set *AggregateSet
+}
+
+// NewSafeAggregateSet creates a new thread-safe aggregate set.
+func NewSafeAggregateSet() *SafeAggregateSet {
+ return &SafeAggregateSet{
+ set: NewAggregateSet(),
+ }
+}
+
+// Aggregate data to the aggregate set with thread safety.
+func (s *SafeAggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.set.Aggregate(key, agg, value, clientAggregation)
+}
+
+// IncrementSamples increments the sample count safely.
+func (s *SafeAggregateSet) IncrementSamples() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.set.Samples++
+}
+
+// Clone creates a deep copy of the aggregate set.
+// This is useful for serialization to avoid holding locks for too long.
+func (s *SafeAggregateSet) Clone() *AggregateSet {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ clone := &AggregateSet{
+ Samples: s.set.Samples,
+ FValues: make(map[string]float64, len(s.set.FValues)),
+ SValues: make(map[string]string, len(s.set.SValues)),
+ }
+
+ // Deep copy the maps
+ for k, v := range s.set.FValues {
+ clone.FValues[k] = v
+ }
+ for k, v := range s.set.SValues {
+ clone.SValues[k] = v
+ }
+
+ return clone
+}
+
+// Serialize the aggregate set safely.
+func (s *SafeAggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) {
+ // Clone the set to avoid holding the lock during serialization
+ clone := s.Clone()
+ clone.Serialize(ctx, groupKey, ch)
+}
+
+// GetSamples returns the current sample count safely.
+func (s *SafeAggregateSet) GetSamples() int {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return s.set.Samples
+} \ No newline at end of file
diff --git a/internal/mapr/safe_aggregateset_test.go b/internal/mapr/safe_aggregateset_test.go
new file mode 100644
index 0000000..327987a
--- /dev/null
+++ b/internal/mapr/safe_aggregateset_test.go
@@ -0,0 +1,147 @@
+package mapr
+
+import (
+ "sync"
+ "testing"
+)
+
+// TestSafeAggregateSetConcurrency tests that SafeAggregateSet handles concurrent operations correctly.
+func TestSafeAggregateSetConcurrency(t *testing.T) {
+ safeSet := NewSafeAggregateSet()
+
+ // Number of concurrent goroutines
+ numGoroutines := 100
+ // Number of operations per goroutine
+ opsPerGoroutine := 1000
+
+ var wg sync.WaitGroup
+ wg.Add(numGoroutines)
+
+ // Launch concurrent goroutines
+ for i := 0; i < numGoroutines; i++ {
+ go func(id int) {
+ defer wg.Done()
+
+ for j := 0; j < opsPerGoroutine; j++ {
+ // Test Count operation
+ err := safeSet.Aggregate("count", Count, "1", false)
+ if err != nil {
+ t.Errorf("Error in Count aggregation: %v", err)
+ }
+
+ // Test Sum operation
+ err = safeSet.Aggregate("sum", Sum, "10.5", false)
+ if err != nil {
+ t.Errorf("Error in Sum aggregation: %v", err)
+ }
+
+ // Test Last operation
+ err = safeSet.Aggregate("last", Last, "value", false)
+ if err != nil {
+ t.Errorf("Error in Last aggregation: %v", err)
+ }
+
+ // Test Min operation
+ err = safeSet.Aggregate("min", Min, "5.0", false)
+ if err != nil {
+ t.Errorf("Error in Min aggregation: %v", err)
+ }
+
+ // Test Max operation
+ err = safeSet.Aggregate("max", Max, "15.0", false)
+ if err != nil {
+ t.Errorf("Error in Max aggregation: %v", err)
+ }
+
+ // Increment samples
+ safeSet.IncrementSamples()
+ }
+ }(i)
+ }
+
+ // Wait for all goroutines to complete
+ wg.Wait()
+
+ // Verify results
+ clone := safeSet.Clone()
+
+ // Check Count
+ expectedCount := float64(numGoroutines * opsPerGoroutine)
+ if clone.FValues["count"] != expectedCount {
+ t.Errorf("Expected count %f, got %f", expectedCount, clone.FValues["count"])
+ }
+
+ // Check Sum
+ expectedSum := float64(numGoroutines * opsPerGoroutine) * 10.5
+ if clone.FValues["sum"] != expectedSum {
+ t.Errorf("Expected sum %f, got %f", expectedSum, clone.FValues["sum"])
+ }
+
+ // Check Min
+ if clone.FValues["min"] != 5.0 {
+ t.Errorf("Expected min 5.0, got %f", clone.FValues["min"])
+ }
+
+ // Check Max
+ if clone.FValues["max"] != 15.0 {
+ t.Errorf("Expected max 15.0, got %f", clone.FValues["max"])
+ }
+
+ // Check Samples
+ expectedSamples := numGoroutines * opsPerGoroutine
+ if clone.Samples != expectedSamples {
+ t.Errorf("Expected samples %d, got %d", expectedSamples, clone.Samples)
+ }
+
+ // Check Last (should be "value")
+ if clone.SValues["last"] != "value" {
+ t.Errorf("Expected last 'value', got '%s'", clone.SValues["last"])
+ }
+}
+
+// TestSafeAggregateSetClone tests that cloning creates an independent copy.
+func TestSafeAggregateSetClone(t *testing.T) {
+ original := NewSafeAggregateSet()
+
+ // Add some data
+ original.Aggregate("count", Count, "1", false)
+ original.Aggregate("sum", Sum, "100", false)
+ original.Aggregate("last", Last, "original", false)
+ original.IncrementSamples()
+
+ // Clone the set
+ clone := original.Clone()
+
+ // Modify the original
+ original.Aggregate("count", Count, "1", false)
+ original.Aggregate("sum", Sum, "50", false)
+ original.Aggregate("last", Last, "modified", false)
+ original.IncrementSamples()
+
+ // Verify clone is unchanged
+ if clone.FValues["count"] != 1 {
+ t.Errorf("Clone count should be 1, got %f", clone.FValues["count"])
+ }
+
+ if clone.FValues["sum"] != 100 {
+ t.Errorf("Clone sum should be 100, got %f", clone.FValues["sum"])
+ }
+
+ if clone.SValues["last"] != "original" {
+ t.Errorf("Clone last should be 'original', got '%s'", clone.SValues["last"])
+ }
+
+ if clone.Samples != 1 {
+ t.Errorf("Clone samples should be 1, got %d", clone.Samples)
+ }
+
+ // Verify original has changed
+ originalClone := original.Clone()
+ if originalClone.FValues["count"] != 2 {
+ t.Errorf("Original count should be 2, got %f", originalClone.FValues["count"])
+ }
+
+ if originalClone.FValues["sum"] != 150 {
+ t.Errorf("Original sum should be 150, got %f", originalClone.FValues["sum"])
+ }
+} \ No newline at end of file
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index f055b9d..353cda5 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -68,7 +68,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
return &Aggregate{
done: internal.NewDone(),
- NextLinesCh: make(chan chan *line.Line, 1000),
+ NextLinesCh: make(chan chan *line.Line, 10000), // Increased buffer for high concurrency
serialize: make(chan struct{}),
hostname: s[0],
query: query,
@@ -116,15 +116,15 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) {
}
}
-func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) {
- dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels)
+func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) {
+ dlog.Server.Trace("nextLine.enter", l, ok, noMoreChannels)
// Protect channel operations with mutex to prevent race conditions
a.mu.Lock()
defer a.mu.Unlock()
select {
- case line, ok = <-a.linesCh:
+ case l, ok = <-a.linesCh:
if !ok {
// Channel is closed, go to next channel.
select {
@@ -140,40 +140,46 @@ func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) {
oldLinesCh := a.linesCh
a.linesCh = newLinesCh
- // In turbo mode, synchronously put the channel back to avoid race conditions
- if config.Server.TurboModeEnable {
- select {
- case a.NextLinesCh <- oldLinesCh:
- // Successfully put back
- default:
- // Channel is full, start a goroutine with timeout
- go func() {
- timer := time.NewTimer(5 * time.Second)
- defer timer.Stop()
- select {
- case a.NextLinesCh <- oldLinesCh:
- case <-timer.C:
- dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh full")
- }
- }()
- }
- } else {
- // Non-turbo mode: use goroutine as before
- go func() {
- timer := time.NewTimer(5 * time.Second)
- defer timer.Stop()
+ // Ensure the old channel is fully drained before recycling to prevent data mixing
+ go func(oldCh chan *line.Line) {
+ // First, drain any remaining lines from the old channel
+ drained := 0
+ drainLoop:
+ for {
select {
- case a.NextLinesCh <- oldLinesCh:
- case <-timer.C:
- dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full")
+ case l, ok := <-oldCh:
+ if !ok {
+ // Channel is closed, safe to recycle
+ break drainLoop
+ }
+ if l != nil {
+ l.Recycle()
+ drained++
+ }
+ default:
+ // No more lines to drain immediately
+ break drainLoop
}
- }()
- }
+ }
+
+ if drained > 0 {
+ dlog.Server.Debug("Drained", drained, "lines from recycled channel")
+ }
+
+ // Now safely recycle the drained channel
+ timer := time.NewTimer(5 * time.Second)
+ defer timer.Stop()
+ select {
+ case a.NextLinesCh <- oldCh:
+ case <-timer.C:
+ dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full")
+ }
+ }(oldLinesCh)
default:
// No new lines channel found.
}
}
- dlog.Server.Trace("nextLine.exit", line, ok, noMoreChannels)
+ dlog.Server.Trace("nextLine.exit", l, ok, noMoreChannels)
return
}
diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go
new file mode 100644
index 0000000..9a748f5
--- /dev/null
+++ b/internal/mapr/server/turbo_aggregate.go
@@ -0,0 +1,415 @@
+package server
+
+import (
+ "bytes"
+ "context"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/mapr"
+ "github.com/mimecast/dtail/internal/mapr/logformat"
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+// TurboAggregate is a high-performance aggregator for MapReduce operations in turbo mode.
+// It processes lines directly without channels for maximum throughput.
+type TurboAggregate struct {
+ done *internal.Done
+ // Hostname of the current server (used to populate $hostname field).
+ hostname string
+ // The mapr query
+ query *mapr.Query
+ // The mapr log format parser
+ parser logformat.Parser
+ // Group sets protected by mutex during serialization
+ groupSets sync.Map // map[string]*mapr.SafeAggregateSet
+ bufferMu sync.Mutex // Protects serialization
+ // Batch processing
+ batchMu sync.Mutex
+ batch []rawLine
+ batchSize int
+ // Periodic serialization
+ serializeTicker *time.Ticker
+ serialize chan struct{}
+ maprMessages chan<- string
+ // Stats
+ linesProcessed atomic.Uint64
+ errors atomic.Uint64
+ // Field map pool to reduce allocations
+ fieldPool sync.Pool
+ // Synchronization for clean shutdown
+ processingWg sync.WaitGroup
+}
+
+type rawLine struct {
+ content []byte
+ sourceID string
+}
+
+// NewTurboAggregate returns a new turbo mode aggregator.
+func NewTurboAggregate(queryStr string) (*TurboAggregate, error) {
+ query, err := mapr.NewQuery(queryStr)
+ if err != nil {
+ return nil, err
+ }
+
+ fqdn, err := config.Hostname()
+ if err != nil {
+ dlog.Server.Error(err)
+ }
+ s := strings.Split(fqdn, ".")
+
+ var parserName string
+ switch query.LogFormat {
+ case "":
+ parserName = config.Server.MapreduceLogFormat
+ if query.Table == "" {
+ parserName = "generic"
+ }
+ default:
+ parserName = query.LogFormat
+ }
+
+ dlog.Server.Info("Creating turbo log format parser", parserName)
+ logParser, err := logformat.NewParser(parserName, query)
+ if err != nil {
+ dlog.Server.Error("Could not create log format parser. Falling back to 'generic'", err)
+ if logParser, err = logformat.NewParser("generic", query); err != nil {
+ dlog.Server.FatalPanic("Could not create log format parser", err)
+ }
+ }
+
+ return &TurboAggregate{
+ done: internal.NewDone(),
+ serialize: make(chan struct{}),
+ hostname: s[0],
+ query: query,
+ parser: logParser,
+ groupSets: sync.Map{},
+ batchSize: 100, // Process 100 lines at a time
+ batch: make([]rawLine, 0, 100),
+ fieldPool: sync.Pool{
+ New: func() interface{} {
+ return make(map[string]string, 20)
+ },
+ },
+ }, nil
+}
+
+// Shutdown the aggregation engine.
+func (a *TurboAggregate) Shutdown() {
+ dlog.Server.Info("Shutting down turbo aggregate", "linesProcessed", a.linesProcessed.Load())
+
+ // Signal shutdown
+ a.done.Shutdown()
+
+ // Stop the ticker
+ if a.serializeTicker != nil {
+ a.serializeTicker.Stop()
+ }
+
+ // Process any remaining batch
+ a.processBatch()
+
+ // Wait for all processing to complete
+ dlog.Server.Info("Waiting for all processing to complete")
+ a.processingWg.Wait()
+
+ // Trigger final serialization after all processing is done
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ a.doSerialize(ctx)
+
+ // Give time for messages to be sent
+ time.Sleep(100 * time.Millisecond)
+}
+
+// Start the turbo aggregation.
+func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) {
+ a.maprMessages = maprMessages
+
+ dlog.Server.Info("Starting turbo aggregate", "interval", a.query.Interval)
+
+ // Start periodic serialization
+ a.serializeTicker = time.NewTicker(a.query.Interval)
+ go a.serializationLoop(ctx)
+
+ // Start batch processor
+ go a.batchProcessorLoop(ctx)
+
+ // Also trigger an immediate serialization to ensure we capture data even if interval hasn't passed
+ go func() {
+ time.Sleep(50 * time.Millisecond) // Give time for some data to accumulate
+ a.Serialize(ctx)
+ }()
+}
+
+// ProcessLineDirect processes a line directly without channels.
+// This is called from the TurboAggregateProcessor.
+func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) error {
+ // Make a copy of the line content as the buffer will be recycled
+ content := make([]byte, len(lineContent))
+ copy(content, lineContent)
+
+ // Add to batch
+ a.batchMu.Lock()
+ a.batch = append(a.batch, rawLine{content: content, sourceID: sourceID})
+ shouldProcess := len(a.batch) >= a.batchSize
+ batchLen := len(a.batch)
+ a.batchMu.Unlock()
+
+ if batchLen == 1 {
+ dlog.Server.Debug("TurboAggregate: First line received", "sourceID", sourceID)
+ }
+
+ // Process batch if full
+ if shouldProcess {
+ a.processBatch()
+ }
+
+ return nil
+}
+
+// batchProcessorLoop continuously processes batches.
+func (a *TurboAggregate) batchProcessorLoop(ctx context.Context) {
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ // Process any remaining batch before exiting
+ a.processBatch()
+ return
+ case <-a.done.Done():
+ // Process any remaining batch before exiting
+ a.processBatch()
+ return
+ case <-ticker.C:
+ // Periodically process any accumulated batch
+ a.processBatch()
+ }
+ }
+}
+
+// processBatch processes a batch of lines.
+func (a *TurboAggregate) processBatch() {
+ a.batchMu.Lock()
+ if len(a.batch) == 0 {
+ a.batchMu.Unlock()
+ return
+ }
+ batch := a.batch
+ a.batch = make([]rawLine, 0, a.batchSize)
+ a.batchMu.Unlock()
+
+ // Track this batch processing
+ a.processingWg.Add(1)
+ defer a.processingWg.Done()
+
+ // Process each line in the batch
+ for _, line := range batch {
+ if err := a.processLine(line.content, line.sourceID); err != nil {
+ a.errors.Add(1)
+ dlog.Server.Error("Error processing line:", err)
+ }
+ a.linesProcessed.Add(1)
+ }
+}
+
+// processLine processes a single line and aggregates it.
+func (a *TurboAggregate) processLine(lineContent []byte, sourceID string) error {
+ // Trim whitespace
+ maprLine := strings.TrimSpace(string(lineContent))
+
+ // Get a field map from the pool
+ fields := a.fieldPool.Get().(map[string]string)
+ defer func() {
+ // Clear the map before returning to pool
+ for k := range fields {
+ delete(fields, k)
+ }
+ a.fieldPool.Put(fields)
+ }()
+
+ // Parse the line
+ parsedFields, err := a.parser.MakeFields(maprLine)
+ if err != nil {
+ if err != logformat.ErrIgnoreFields {
+ return err
+ }
+ return nil
+ }
+
+ // Copy parsed fields to our pooled map
+ for k, v := range parsedFields {
+ fields[k] = v
+ }
+
+ // Apply where clause
+ if !a.query.WhereClause(fields) {
+ return nil
+ }
+
+ // Apply set clause if needed
+ if len(a.query.Set) > 0 {
+ if err := a.query.SetClause(fields); err != nil {
+ return err
+ }
+ }
+
+ // Aggregate the fields
+ a.aggregate(fields)
+ return nil
+}
+
+// aggregate adds fields to the appropriate group.
+func (a *TurboAggregate) aggregate(fields map[string]string) {
+ // Build group key
+ var sb strings.Builder
+ for i, field := range a.query.GroupBy {
+ if i > 0 {
+ sb.WriteString(protocol.AggregateGroupKeyCombinator)
+ }
+ if val, ok := fields[field]; ok {
+ sb.WriteString(val)
+ }
+ }
+ groupKey := sb.String()
+
+ // Get or create the aggregate set
+ setInterface, loaded := a.groupSets.LoadOrStore(groupKey, mapr.NewSafeAggregateSet())
+ set := setInterface.(*mapr.SafeAggregateSet)
+
+ if !loaded {
+ dlog.Server.Debug("TurboAggregate: New group created", "groupKey", groupKey)
+ }
+
+ // Aggregate the values
+ var addedSample bool
+ for _, sc := range a.query.Select {
+ if val, ok := fields[sc.Field]; ok {
+ if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
+ dlog.Server.Error(err)
+ continue
+ }
+ addedSample = true
+ }
+ }
+
+ if addedSample {
+ set.IncrementSamples()
+ }
+}
+
+// serializationLoop handles periodic serialization.
+func (a *TurboAggregate) serializationLoop(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-a.done.Done():
+ return
+ case <-a.serializeTicker.C:
+ a.Serialize(ctx)
+ case <-a.serialize:
+ a.doSerialize(ctx)
+ }
+ }
+}
+
+// Serialize triggers serialization of all aggregated data.
+func (a *TurboAggregate) Serialize(ctx context.Context) {
+ select {
+ case a.serialize <- struct{}{}:
+ case <-time.After(time.Minute):
+ dlog.Server.Warn("Starting to serialize mapreduce data takes over a minute")
+ case <-ctx.Done():
+ }
+}
+
+// doSerialize performs the actual serialization.
+func (a *TurboAggregate) doSerialize(ctx context.Context) {
+ // Process any remaining batch
+ a.processBatch()
+
+ // Wait a moment for any in-progress batch processing
+ time.Sleep(10 * time.Millisecond)
+
+ dlog.Server.Info("Serializing turbo mapreduce result", "linesProcessed", a.linesProcessed.Load())
+
+ // Lock to prevent concurrent modifications during serialization
+ a.bufferMu.Lock()
+ defer a.bufferMu.Unlock()
+
+ // Create a new group set for serialization
+ group := mapr.NewGroupSet()
+
+ // Copy all aggregate sets from the groupSets
+ groupCount := 0
+ a.groupSets.Range(func(key, value interface{}) bool {
+ groupKey := key.(string)
+ safeSet := value.(*mapr.SafeAggregateSet)
+
+ // Clone the safe set to get a regular AggregateSet
+ clonedSet := safeSet.Clone()
+
+ // Add to the group set
+ groupSet := group.GetSet(groupKey)
+ *groupSet = *clonedSet
+ groupCount++
+
+ return true
+ })
+
+ dlog.Server.Info("Serializing groups", "groupCount", groupCount)
+
+ // Serialize the group
+ group.Serialize(ctx, a.maprMessages)
+
+ // Clear the groupSets after serialization
+ a.groupSets = sync.Map{}
+}
+
+// TurboAggregateProcessor implements the line processor interface for turbo mode aggregation.
+type TurboAggregateProcessor struct {
+ aggregate *TurboAggregate
+ globID string
+}
+
+// NewTurboAggregateProcessor creates a new turbo aggregate processor.
+func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *TurboAggregateProcessor {
+ return &TurboAggregateProcessor{
+ aggregate: aggregate,
+ globID: globID,
+ }
+}
+
+// ProcessLine processes a line directly to the turbo aggregate.
+func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error {
+ // Process the line directly
+ err := p.aggregate.ProcessLineDirect(lineContent.Bytes(), sourceID)
+
+ // Recycle the buffer
+ pool.RecycleBytesBuffer(lineContent)
+
+ return err
+}
+
+// Flush ensures all buffered data is processed.
+func (p *TurboAggregateProcessor) Flush() error {
+ // Process any remaining batch
+ p.aggregate.processBatch()
+ return nil
+}
+
+// Close flushes any remaining data.
+func (p *TurboAggregateProcessor) Close() error {
+ return p.Flush()
+} \ No newline at end of file
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index bfc7ec2..427ab6c 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -31,6 +31,7 @@ type baseHandler struct {
handleCommandCb handleCommandCb
lines chan *line.Line
aggregate *server.Aggregate
+ turboAggregate *server.TurboAggregate // Turbo mode aggregate
maprMessages chan string
serverMessages chan string
hostname string
@@ -56,6 +57,16 @@ type baseHandler struct {
// Shutdown the handler.
func (h *baseHandler) Shutdown() {
+ // Shutdown turbo aggregate if present
+ if h.turboAggregate != nil {
+ dlog.Server.Info(h.user, "Shutting down turbo aggregate")
+ h.turboAggregate.Shutdown()
+ }
+ // Shutdown regular aggregate if present
+ if h.aggregate != nil {
+ dlog.Server.Info(h.user, "Shutting down regular aggregate")
+ h.aggregate.Shutdown()
+ }
h.done.Shutdown()
}
@@ -372,6 +383,10 @@ func (h *baseHandler) flush() {
if h.turboMode {
maxIterations = 300 // Give more time for turbo mode to drain
}
+ // Also increase iterations if we have MapReduce messages
+ if h.turboAggregate != nil || h.aggregate != nil {
+ maxIterations = 300 // Give more time for MapReduce results
+ }
for i := 0; i < maxIterations; i++ {
if numUnsentMessages() == 0 {
diff --git a/internal/server/handlers/mapcommand.go b/internal/server/handlers/mapcommand.go
index 65e0ed8..5dc7b8f 100644
--- a/internal/server/handlers/mapcommand.go
+++ b/internal/server/handlers/mapcommand.go
@@ -4,29 +4,49 @@ import (
"context"
"strings"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr/server"
)
// Map command implements the mapreduce command server side.
type mapCommand struct {
- aggregate *server.Aggregate
- server *ServerHandler
+ aggregate *server.Aggregate
+ turboAggregate *server.TurboAggregate
+ server *ServerHandler
}
// NewMapCommand returns a new server side mapreduce command.
func newMapCommand(serverHandler *ServerHandler, argc int,
- args []string) (mapCommand, *server.Aggregate, error) {
+ args []string) (mapCommand, *server.Aggregate, *server.TurboAggregate, error) {
m := mapCommand{server: serverHandler}
queryStr := strings.Join(args[1:], " ")
+
+ // If turbo mode is enabled, create a TurboAggregate
+ if config.Server.TurboModeEnable {
+ dlog.Server.Info("Creating turbo aggregate for MapReduce", "query", queryStr)
+ turboAggregate, err := server.NewTurboAggregate(queryStr)
+ if err != nil {
+ return m, nil, nil, err
+ }
+ m.turboAggregate = turboAggregate
+ return m, nil, turboAggregate, nil
+ }
+
+ // Otherwise, create a regular Aggregate
aggregate, err := server.NewAggregate(queryStr)
if err != nil {
- return m, nil, err
+ return m, nil, nil, err
}
m.aggregate = aggregate
- return m, aggregate, nil
+ return m, aggregate, nil, nil
}
func (m mapCommand) Start(ctx context.Context, aggregatedMessages chan<- string) {
- m.aggregate.Start(ctx, aggregatedMessages)
+ if m.turboAggregate != nil {
+ m.turboAggregate.Start(ctx, aggregatedMessages)
+ } else {
+ m.aggregate.Start(ctx, aggregatedMessages)
+ }
}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index bdb7b8b..ce44996 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"context"
"os"
"path/filepath"
@@ -14,6 +15,7 @@ import (
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
)
@@ -169,6 +171,14 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
// Check if we should trigger shutdown now
// Only shutdown if no files are pending AND no commands are active
if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 {
+ // If we have a turbo aggregate, trigger final serialization
+ if r.server.turboAggregate != nil {
+ dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization")
+ r.server.turboAggregate.Serialize(context.Background())
+ // Give time for serialization to complete
+ time.Sleep(100 * time.Millisecond)
+ }
+
// Double-check that we really have no pending work
// In turbo mode, there might be a race condition
time.Sleep(10 * time.Millisecond)
@@ -248,10 +258,11 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
}
// Check if we should use the turbo boost optimizations
- // Enable turbo boost for cat/grep/tail modes, but NOT for aggregate (MapReduce) operations
- // MapReduce requires the traditional channel-based approach to work correctly
- if config.Server.TurboModeEnable && r.server.aggregate == nil &&
- (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
+ // Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations
+ // MapReduce now has a turbo mode implementation that bypasses channels
+ if config.Server.TurboModeEnable &&
+ (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient || r.server.turboAggregate != nil) {
+ dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil)
r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader)
return
}
@@ -390,8 +401,21 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
for {
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration")
- // Create a direct processor that writes without channels
- processor := NewDirectLineProcessor(writer, globID)
+ // Create a processor based on whether we're doing MapReduce or not
+ var processor interface {
+ ProcessLine(*bytes.Buffer, uint64, string) error
+ Flush() error
+ Close() error
+ }
+
+ if r.server.turboAggregate != nil {
+ // Use turbo aggregate processor for MapReduce operations
+ dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID)
+ processor = server.NewTurboAggregateProcessor(r.server.turboAggregate, globID)
+ } else {
+ // Use direct line processor for cat/grep/tail
+ processor = NewDirectLineProcessor(writer, globID)
+ }
// Use the optimized reader
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start")
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index da27066..df227ab 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -93,7 +93,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon
commandFinished()
}()
case "map":
- command, aggregate, err := newMapCommand(h, argc, args)
+ command, aggregate, turboAggregate, err := newMapCommand(h, argc, args)
if err != nil {
h.sendln(h.serverMessages, err.Error())
dlog.Server.Error(h.user, err)
@@ -101,6 +101,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon
return
}
h.aggregate = aggregate
+ h.turboAggregate = turboAggregate
go func() {
command.Start(ctx, h.maprMessages)
commandFinished()