diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
| commit | 13b21feb07c86f65760f7338f284f3b492364cd9 (patch) | |
| tree | c9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /internal/mapr/server | |
| parent | da8e581617a0240626d2bc922916416440e65bae (diff) | |
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'internal/mapr/server')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 14 | ||||
| -rw-r--r-- | internal/mapr/server/groupkey.go | 31 | ||||
| -rw-r--r-- | internal/mapr/server/turbo_aggregate.go | 528 | ||||
| -rw-r--r-- | internal/mapr/server/turbo_aggregate_test.go | 83 |
4 files changed, 195 insertions, 461 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 9a736a5..c9d4641 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -12,7 +12,6 @@ import ( "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" - "github.com/mimecast/dtail/internal/protocol" ) // Aggregate is for aggregating mapreduce data on the DTail server side. @@ -282,7 +281,7 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, serialize := func() { dlog.Server.Info("Serializing mapreduce result") group.Serialize(ctx, maprMessages) - group = mapr.NewGroupSet() + group.InitSet() } for { select { @@ -301,16 +300,7 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, } func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { - var sb strings.Builder - for i, field := range a.query.GroupBy { - if i > 0 { - sb.WriteString(protocol.AggregateGroupKeyCombinator) - } - if val, ok := fields[field]; ok { - sb.WriteString(val) - } - } - groupKey := sb.String() + groupKey := buildGroupKey(a.query.GroupBy, fields) set := group.GetSet(groupKey) var addedSample bool diff --git a/internal/mapr/server/groupkey.go b/internal/mapr/server/groupkey.go new file mode 100644 index 0000000..0963e4f --- /dev/null +++ b/internal/mapr/server/groupkey.go @@ -0,0 +1,31 @@ +package server + +import ( + "strings" + + "github.com/mimecast/dtail/internal/protocol" +) + +func buildGroupKey(groupBy []string, fields map[string]string) string { + if len(groupBy) == 0 { + return "" + } + + total := 0 + for _, field := range groupBy { + total += len(fields[field]) + } + total += (len(groupBy) - 1) * len(protocol.AggregateGroupKeyCombinator) + + var sb strings.Builder + sb.Grow(total) + + for i, field := range groupBy { + if i > 0 { + sb.WriteString(protocol.AggregateGroupKeyCombinator) + } + sb.WriteString(fields[field]) + } + + return sb.String() +} diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go index 188be1c..c3aaf32 100644 --- a/internal/mapr/server/turbo_aggregate.go +++ b/internal/mapr/server/turbo_aggregate.go @@ -3,7 +3,6 @@ package server import ( "bytes" "context" - "fmt" "strings" "sync" "sync/atomic" @@ -15,7 +14,6 @@ import ( "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" - "github.com/mimecast/dtail/internal/protocol" ) // TurboAggregate is a high-performance aggregator for MapReduce operations in turbo mode. @@ -28,9 +26,11 @@ type TurboAggregate struct { query *mapr.Query // The mapr log format parser parser logformat.Parser - // Group sets protected by mutex during serialization - groupSets sync.Map // map[string]*mapr.SafeAggregateSet - bufferMu sync.Mutex // Protects serialization + // Group sets are swapped out during serialization to avoid clone-heavy flushes. + groupMu sync.Mutex + groupSets map[string]*mapr.AggregateSet + // serializeMu ensures only one serialization runs at a time. + serializeMu sync.Mutex // Batch processing batchMu sync.Mutex batch []rawLine @@ -43,16 +43,16 @@ type TurboAggregate struct { linesProcessed atomic.Uint64 errors atomic.Uint64 filesProcessed atomic.Uint64 - // Field map pool to reduce allocations - fieldPool sync.Pool - // Synchronization for clean shutdown - processingWg sync.WaitGroup + // Synchronization for clean shutdown. + processorsWg sync.WaitGroup // Track active file processors activeProcessors atomic.Int32 + startOnce sync.Once + started chan struct{} } type rawLine struct { - content []byte + content *bytes.Buffer sourceID string } @@ -104,109 +104,34 @@ func NewTurboAggregate(queryStr string, defaultLogFormat string) (*TurboAggregat hostname: s[0], query: query, parser: logParser, - groupSets: sync.Map{}, + groupSets: make(map[string]*mapr.AggregateSet), batchSize: 100, // Process 100 lines at a time batch: make([]rawLine, 0, 100), - fieldPool: sync.Pool{ - New: func() interface{} { - return make(map[string]string, 20) - }, - }, + started: make(chan struct{}), }, nil } // countGroups returns the current number of groups in the aggregation. func (a *TurboAggregate) countGroups() int { - count := 0 - a.groupSets.Range(func(_, _ interface{}) bool { - count++ - return true - }) - return count -} - -// min returns the minimum of two integers. -func min(a, b int) int { - if a < b { - return a - } - return b + a.groupMu.Lock() + defer a.groupMu.Unlock() + return len(a.groupSets) } // Shutdown the aggregation engine. func (a *TurboAggregate) Shutdown() { - dlog.Server.Info("TurboAggregate: Shutdown called", - "linesProcessed", a.linesProcessed.Load(), - "filesProcessed", a.filesProcessed.Load(), - "activeProcessors", a.activeProcessors.Load(), - "currentGroups", a.countGroups()) - - // Signal shutdown a.done.Shutdown() - - // Stop the ticker a.stopSerializeTicker() - - // Wait for active processors to finish - for a.activeProcessors.Load() > 0 { - dlog.Server.Info("TurboAggregate: Waiting for active processors", - "activeProcessors", a.activeProcessors.Load()) - time.Sleep(10 * time.Millisecond) - } - - // Process any remaining batch synchronously - dlog.Server.Info("TurboAggregate: Processing final batch") + a.processorsWg.Wait() a.processBatchAndWait() - - // Wait for all processing to complete - dlog.Server.Info("TurboAggregate: Waiting for all processing to complete") - a.processingWg.Wait() - - dlog.Server.Info("TurboAggregate: All processing complete, groups before final serialization", - "groupCount", a.countGroups()) - - // Trigger final serialization after all processing is done - // Use a longer timeout to ensure data gets through ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - - dlog.Server.Info("TurboAggregate: Triggering final serialization") a.doSerialize(ctx) - - // Give more time for messages to be sent and processed - // This is crucial to ensure the baseHandler's Read method picks up the messages - dlog.Server.Info("TurboAggregate: Waiting for message delivery", - "channelLen", len(a.maprMessages)) - - // Wait for channel to drain or timeout - timeout := time.After(2 * time.Second) - ticker := time.NewTicker(50 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-timeout: - dlog.Server.Warn("TurboAggregate: Timeout waiting for message delivery", - "remainingMessages", len(a.maprMessages)) - return - case <-ticker.C: - if len(a.maprMessages) == 0 { - dlog.Server.Info("TurboAggregate: All messages delivered") - return - } - } - } } // Abort stops background processing without waiting for final serialization. // Session generation replacement uses this to preempt old query work immediately. func (a *TurboAggregate) Abort() { - dlog.Server.Info("TurboAggregate: Abort called", - "linesProcessed", a.linesProcessed.Load(), - "filesProcessed", a.filesProcessed.Load(), - "activeProcessors", a.activeProcessors.Load(), - "currentGroups", a.countGroups()) - a.done.Shutdown() a.stopSerializeTicker() } @@ -214,326 +139,148 @@ func (a *TurboAggregate) Abort() { // Start the turbo aggregation. func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) { a.maprMessages = maprMessages + interval := a.query.Interval + if interval <= 0 { + interval = time.Second + } + a.serializeTicker = time.NewTicker(interval) + a.startOnce.Do(func() { + if a.started != nil { + close(a.started) + } + }) + defer a.stopSerializeTicker() - dlog.Server.Info("TurboAggregate: Starting", - "interval", a.query.Interval) - - // Start periodic serialization - a.serializeTicker = time.NewTicker(a.query.Interval) go a.serializationLoop(ctx) - // Start batch processor - go a.batchProcessorLoop(ctx) - - // Debug: Don't trigger immediate serialization - let data accumulate first - dlog.Server.Info("TurboAggregate: Started, waiting for data") + select { + case <-ctx.Done(): + case <-a.done.Done(): + } } // ProcessLineDirect processes a line directly without channels. // This is called from the TurboAggregateProcessor. -func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) error { +func (a *TurboAggregate) ProcessLineDirect(lineContent *bytes.Buffer, sourceID string) error { if a.stopping() { + pool.RecycleBytesBuffer(lineContent) return nil } - // Increment counter first a.linesProcessed.Add(1) - // Debug: Track when lines are received - totalLines := a.linesProcessed.Load() - if totalLines == 1 || totalLines%1000 == 0 { - dlog.Server.Info("TurboAggregate: ProcessLineDirect called", - "totalLinesReceived", totalLines, - "sourceID", sourceID, - "lineLength", len(lineContent), - "linePreview", string(lineContent[:min(50, len(lineContent))])) - } - - // Make a copy of the line content as the buffer will be recycled - content := make([]byte, len(lineContent)) - copy(content, lineContent) - // Add to batch a.batchMu.Lock() - a.batch = append(a.batch, rawLine{content: content, sourceID: sourceID}) + a.batch = append(a.batch, rawLine{content: lineContent, sourceID: sourceID}) shouldProcess := len(a.batch) >= a.batchSize - batchLen := len(a.batch) a.batchMu.Unlock() - if batchLen == 1 { - dlog.Server.Info("TurboAggregate: First line received in batch", - "sourceID", sourceID, - "batchSize", a.batchSize) - } - - // Process batch if full if shouldProcess { - dlog.Server.Debug("TurboAggregate: Batch full, processing", - "batchLen", batchLen) a.processBatch() } return nil } -// batchProcessorLoop continuously processes batches. -func (a *TurboAggregate) batchProcessorLoop(ctx context.Context) { - dlog.Server.Info("TurboAggregate: Batch processor loop started") - defer dlog.Server.Info("TurboAggregate: Batch processor loop ended") - - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-a.done.Done(): - dlog.Server.Info("TurboAggregate: Batch processor stopped by shutdown") - // Process any remaining batch synchronously before exiting - a.processBatchAndWait() - return - case <-ticker.C: - // Periodically process any accumulated batch - a.processBatch() - - // Check if context is done but only exit if no pending work - select { - case <-ctx.Done(): - a.batchMu.Lock() - batchLen := len(a.batch) - a.batchMu.Unlock() - - activeProcs := a.activeProcessors.Load() - - if batchLen > 0 || activeProcs > 0 { - dlog.Server.Info("TurboAggregate: Context cancelled but work pending", - "batchLen", batchLen, - "activeProcessors", activeProcs) - // Continue processing - } else { - dlog.Server.Info("TurboAggregate: Context cancelled, no pending work") - return - } - default: - // Context not done, continue - } - } - } -} - -// processBatch processes a batch of lines asynchronously. +// processBatch processes a full batch immediately. func (a *TurboAggregate) processBatch() { - a.batchMu.Lock() - if len(a.batch) == 0 { - a.batchMu.Unlock() - return - } - batch := a.batch - batchSize := len(batch) - a.batch = make([]rawLine, 0, a.batchSize) - a.batchMu.Unlock() - - dlog.Server.Info("TurboAggregate: Processing batch", - "batchSize", batchSize, - "totalLinesProcessed", a.linesProcessed.Load()) - - // Track this batch processing - a.processingWg.Add(1) - defer a.processingWg.Done() - - // Process each line in the batch - successCount := 0 - errorCount := 0 - for i, line := range batch { - if err := a.processLine(line.content, line.sourceID); err != nil { - a.errors.Add(1) - errorCount++ - dlog.Server.Error("Error processing line:", err, "lineIndex", i) - } else { - successCount++ - } - // Note: line count is already incremented in ProcessLineDirect - } - - dlog.Server.Info("TurboAggregate: Batch processed", - "successCount", successCount, - "errorCount", errorCount, - "totalLinesProcessed", a.linesProcessed.Load()) + a.processRawBatch(a.takeBatch()) } // processBatchAndWait processes a batch of lines synchronously and waits for completion. // This is used when flushing to ensure all data is processed before continuing. func (a *TurboAggregate) processBatchAndWait() { + a.processRawBatch(a.takeBatch()) +} + +func (a *TurboAggregate) takeBatch() []rawLine { a.batchMu.Lock() if len(a.batch) == 0 { a.batchMu.Unlock() - return + return nil } batch := a.batch - batchSize := len(batch) a.batch = make([]rawLine, 0, a.batchSize) a.batchMu.Unlock() + return batch +} - dlog.Server.Info("TurboAggregate: Processing batch synchronously", - "batchSize", batchSize, - "totalLinesProcessed", a.linesProcessed.Load()) - - // Process each line in the batch (no goroutine, synchronous) - successCount := 0 - errorCount := 0 - for i, line := range batch { - if err := a.processLine(line.content, line.sourceID); err != nil { +func (a *TurboAggregate) processRawBatch(batch []rawLine) { + for i := range batch { + if err := a.processLine(batch[i].content, batch[i].sourceID); err != nil { a.errors.Add(1) - errorCount++ dlog.Server.Error("Error processing line:", err, "lineIndex", i) - } else { - successCount++ } - // Note: line count is already incremented in ProcessLineDirect + if batch[i].content != nil { + pool.RecycleBytesBuffer(batch[i].content) + } } - - dlog.Server.Info("TurboAggregate: Batch processed synchronously", - "successCount", successCount, - "errorCount", errorCount, - "totalLinesProcessed", a.linesProcessed.Load()) } // processLine processes a single line and aggregates it. -func (a *TurboAggregate) processLine(lineContent []byte, sourceID string) error { - // Trim whitespace - maprLine := strings.TrimSpace(string(lineContent)) - - // Debug: Log sample lines - if a.linesProcessed.Load()%1000 == 0 { - dlog.Server.Debug("TurboAggregate: Processing line", - "lineNumber", a.linesProcessed.Load(), - "linePreview", maprLine[:min(100, len(maprLine))]) - } - - // Get a field map from the pool - fields := a.fieldPool.Get().(map[string]string) - defer func() { - // Clear the map before returning to pool - for k := range fields { - delete(fields, k) - } - a.fieldPool.Put(fields) - }() - - // Parse the line +func (a *TurboAggregate) processLine(lineContent *bytes.Buffer, _ string) error { + maprLine := strings.TrimSpace(lineContent.String()) parsedFields, err := a.parser.MakeFields(maprLine) if err != nil { if err != logformat.ErrIgnoreFields { - dlog.Server.Debug("TurboAggregate: Parser error", - "error", err, - "line", maprLine[:min(100, len(maprLine))]) return err } return nil } - // Copy parsed fields to our pooled map - for k, v := range parsedFields { - fields[k] = v - } - - // Debug: Log parsed fields for first few lines - if a.linesProcessed.Load() < 5 { - dlog.Server.Info("TurboAggregate: Parsed fields", - "lineNumber", a.linesProcessed.Load(), - "fieldCount", len(fields), - "fields", fields) - } - // Apply where clause - if !a.query.WhereClause(fields) { - dlog.Server.Debug("TurboAggregate: Line filtered by WHERE clause") + if !a.query.WhereClause(parsedFields) { return nil } // Apply set clause if needed if len(a.query.Set) > 0 { - if err := a.query.SetClause(fields); err != nil { - dlog.Server.Error("TurboAggregate: SET clause error", err) + if err := a.query.SetClause(parsedFields); err != nil { return err } } // Aggregate the fields - a.aggregate(fields) + a.aggregate(parsedFields) return nil } // aggregate adds fields to the appropriate group. func (a *TurboAggregate) aggregate(fields map[string]string) { - // Build group key - var sb strings.Builder - for i, field := range a.query.GroupBy { - if i > 0 { - sb.WriteString(protocol.AggregateGroupKeyCombinator) - } - if val, ok := fields[field]; ok { - sb.WriteString(val) - } + groupKey := buildGroupKey(a.query.GroupBy, fields) + a.groupMu.Lock() + set, ok := a.groupSets[groupKey] + if !ok { + set = mapr.NewAggregateSet() + a.groupSets[groupKey] = set } - groupKey := sb.String() - - // Get or create the aggregate set - setInterface, loaded := a.groupSets.LoadOrStore(groupKey, mapr.NewSafeAggregateSet()) - set := setInterface.(*mapr.SafeAggregateSet) - - if !loaded { - dlog.Server.Info("TurboAggregate: New group created", - "groupKey", groupKey, - "totalGroups", a.countGroups()) - } - - // Aggregate the values var addedSample bool - aggregatedFields := []string{} for _, sc := range a.query.Select { if val, ok := fields[sc.Field]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { - dlog.Server.Error("TurboAggregate: Aggregation error", - "field", sc.Field, - "operation", sc.Operation, - "error", err) + dlog.Server.Error("TurboAggregate aggregation error", err, "field", sc.Field, "operation", sc.Operation) continue } addedSample = true - aggregatedFields = append(aggregatedFields, sc.Field) } } - if addedSample { - set.IncrementSamples() - // Debug: Log aggregation details for first few samples - if a.linesProcessed.Load() < 10 { - dlog.Server.Info("TurboAggregate: Aggregated sample", - "groupKey", groupKey, - "aggregatedFields", aggregatedFields, - "sampleCount", set.GetSamples()) - } + set.Samples++ } + a.groupMu.Unlock() } // serializationLoop handles periodic serialization. func (a *TurboAggregate) serializationLoop(ctx context.Context) { - dlog.Server.Info("TurboAggregate: Serialization loop started") - defer dlog.Server.Info("TurboAggregate: Serialization loop ended") - for { select { case <-ctx.Done(): - dlog.Server.Info("TurboAggregate: Serialization loop stopped by context") return case <-a.done.Done(): - dlog.Server.Info("TurboAggregate: Serialization loop stopped by shutdown") return case <-a.serializeTicker.C: - dlog.Server.Info("TurboAggregate: Periodic serialization triggered") a.Serialize(ctx) case <-a.serialize: - dlog.Server.Info("TurboAggregate: Manual serialization triggered") a.doSerialize(ctx) } } @@ -551,119 +298,60 @@ func (a *TurboAggregate) Serialize(ctx context.Context) { // doSerialize performs the actual serialization. func (a *TurboAggregate) doSerialize(ctx context.Context) { - dlog.Server.Info("TurboAggregate: Starting serialization", - "linesProcessed", a.linesProcessed.Load(), - "currentGroups", a.countGroups()) + a.serializeMu.Lock() + defer a.serializeMu.Unlock() - // Process any remaining batch synchronously before serialization - dlog.Server.Info("TurboAggregate: Processing remaining batch before serialization") a.processBatchAndWait() + if a.maprMessages == nil { + dlog.Server.Error("TurboAggregate maprMessages channel is nil") + return + } - // Wait a moment for any in-progress batch processing - dlog.Server.Info("TurboAggregate: Waiting for batch processing to complete") - time.Sleep(50 * time.Millisecond) // Increased wait time - - // Lock to prevent concurrent modifications during serialization - a.bufferMu.Lock() - defer a.bufferMu.Unlock() - - // Count groups before serialization - groupsBeforeSerialization := a.countGroups() - dlog.Server.Info("TurboAggregate: Groups before serialization", - "count", groupsBeforeSerialization) - - if groupsBeforeSerialization == 0 { - dlog.Server.Warn("TurboAggregate: No groups to serialize!") + snapshot := a.swapGroupSets() + if len(snapshot) == 0 { return } - // Create a new group set for serialization group := mapr.NewGroupSet() - - // Copy all aggregate sets from the groupSets - groupCount := 0 - sampleDetails := make([]string, 0) - a.groupSets.Range(func(key, value interface{}) bool { - groupKey := key.(string) - safeSet := value.(*mapr.SafeAggregateSet) - - // Clone the safe set to get a regular AggregateSet - clonedSet := safeSet.Clone() - - // Debug: Log details of first few groups - if groupCount < 5 { - sampleDetails = append(sampleDetails, - fmt.Sprintf("group=%s, samples=%d", groupKey, clonedSet.Samples)) - } - - // Add to the group set + for groupKey, aggregateSet := range snapshot { groupSet := group.GetSet(groupKey) - *groupSet = *clonedSet - groupCount++ - - return true - }) - - dlog.Server.Info("TurboAggregate: Serialization details", - "groupCount", groupCount, - "sampleGroups", sampleDetails, - "maprMessagesChannel", a.maprMessages != nil) - - // Check if we have a valid channel - if a.maprMessages == nil { - dlog.Server.Error("TurboAggregate: maprMessages channel is nil!") - return + *groupSet = *aggregateSet } - // Serialize the group - use a longer timeout context for final serialization - // to ensure data is sent even during shutdown serializeCtx := ctx if _, ok := ctx.Deadline(); ok { - // If context has a deadline, extend it for serialization var cancel context.CancelFunc serializeCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() } - - dlog.Server.Info("TurboAggregate: Calling group.Serialize", - "channelCap", cap(a.maprMessages), - "channelLen", len(a.maprMessages)) - group.Serialize(serializeCtx, a.maprMessages) +} - dlog.Server.Info("TurboAggregate: group.Serialize completed", - "sentGroups", groupCount, - "channelLen", len(a.maprMessages)) +func (a *TurboAggregate) swapGroupSets() map[string]*mapr.AggregateSet { + a.groupMu.Lock() + defer a.groupMu.Unlock() - // Clear the groupSets after serialization only if not shutting down - select { - case <-a.done.Done(): - // During shutdown, keep the data for potential final serialization - dlog.Server.Info("TurboAggregate: Keeping groupSets during shutdown") - default: - // Normal operation - clear for next interval - dlog.Server.Info("TurboAggregate: Clearing groupSets for next interval") - a.groupSets = sync.Map{} + if len(a.groupSets) == 0 { + return nil } - // Log the state after serialization - groupsAfterSerialization := a.countGroups() - dlog.Server.Info("TurboAggregate: After serialization", - "groupsRemaining", groupsAfterSerialization) + snapshot := a.groupSets + a.groupSets = make(map[string]*mapr.AggregateSet, len(snapshot)) + return snapshot } // TurboAggregateProcessor implements the line processor interface for turbo mode aggregation. type TurboAggregateProcessor struct { aggregate *TurboAggregate globID string + flushOnce sync.Once + closeOnce sync.Once } // NewTurboAggregateProcessor creates a new turbo aggregate processor. func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *TurboAggregateProcessor { + aggregate.processorsWg.Add(1) aggregate.activeProcessors.Add(1) - dlog.Server.Debug("TurboAggregate: New processor created", - "globID", globID, - "activeProcessors", aggregate.activeProcessors.Load()) return &TurboAggregateProcessor{ aggregate: aggregate, globID: globID, @@ -671,27 +359,12 @@ func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *Turbo } // ProcessLine processes a line directly to the turbo aggregate. -func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error { +func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, _ uint64, sourceID string) error { if p.aggregate.stopping() { pool.RecycleBytesBuffer(lineContent) return nil } - - // Debug: Log when ProcessLine is called - if lineNum == 1 || lineNum%1000 == 0 { - dlog.Server.Info("TurboAggregateProcessor: ProcessLine called", - "lineNum", lineNum, - "sourceID", sourceID, - "contentLen", lineContent.Len()) - } - - // Process the line directly - err := p.aggregate.ProcessLineDirect(lineContent.Bytes(), sourceID) - - // Recycle the buffer - pool.RecycleBytesBuffer(lineContent) - - return err + return p.aggregate.ProcessLineDirect(lineContent, sourceID) } // Flush ensures all buffered data is processed. @@ -700,30 +373,19 @@ func (p *TurboAggregateProcessor) Flush() error { return nil } - // Log flush call for debugging - dlog.Server.Info("TurboAggregateProcessor: Flush called", - "globID", p.globID, - "linesProcessed", p.aggregate.linesProcessed.Load()) - - // Process any remaining batch synchronously - p.aggregate.processBatchAndWait() - - // Increment files processed counter - p.aggregate.filesProcessed.Add(1) - - dlog.Server.Info("TurboAggregateProcessor: Flush completed", - "globID", p.globID, - "linesProcessed", p.aggregate.linesProcessed.Load(), - "filesProcessed", p.aggregate.filesProcessed.Load()) + p.flushOnce.Do(func() { + p.aggregate.processBatchAndWait() + p.aggregate.filesProcessed.Add(1) + }) return nil } // Close flushes any remaining data. func (p *TurboAggregateProcessor) Close() error { err := p.Flush() - p.aggregate.activeProcessors.Add(-1) - dlog.Server.Debug("TurboAggregate: Processor closed", - "globID", p.globID, - "activeProcessors", p.aggregate.activeProcessors.Load()) + p.closeOnce.Do(func() { + p.aggregate.activeProcessors.Add(-1) + p.aggregate.processorsWg.Done() + }) return err } diff --git a/internal/mapr/server/turbo_aggregate_test.go b/internal/mapr/server/turbo_aggregate_test.go index 7ae4b5a..e674252 100644 --- a/internal/mapr/server/turbo_aggregate_test.go +++ b/internal/mapr/server/turbo_aggregate_test.go @@ -62,8 +62,12 @@ func TestTurboAggregateVsRegular(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Start the turbo aggregate - turboAgg.Start(ctx, messages) + startDone := make(chan struct{}) + go func() { + defer close(startDone) + turboAgg.Start(ctx, messages) + }() + waitForTurboAggregateStart(t, turboAgg) // Process lines processor := NewTurboAggregateProcessor(turboAgg, "test") @@ -92,6 +96,7 @@ func TestTurboAggregateVsRegular(t *testing.T) { // Cancel context to stop background goroutines cancel() + <-startDone // Collect results with timeout done := make(chan struct{}) @@ -169,16 +174,21 @@ func TestTurboAggregateVsRegular(t *testing.T) { } close(lines) - // Wait for processing - time.Sleep(100 * time.Millisecond) - - // Shutdown - regularAgg.Shutdown() + // Wait for the aggregate to drain the closed line channel and serialize naturally. + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + regularAgg.Shutdown() + cancel() + t.Fatal("Timeout waiting for regular aggregate to finish") + } cancel() - // Wait for the Start goroutine to finish - wg.Wait() - // Collect results close(messages) @@ -232,10 +242,15 @@ func TestTurboAggregateConcurrency(t *testing.T) { // Channel to collect messages messages := make(chan string, 1000) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // Start the turbo aggregate - turboAgg.Start(ctx, messages) + startDone := make(chan struct{}) + go func() { + defer close(startDone) + turboAgg.Start(ctx, messages) + }() + waitForTurboAggregateStart(t, turboAgg) // Process multiple "files" concurrently var wg sync.WaitGroup @@ -269,6 +284,8 @@ func TestTurboAggregateConcurrency(t *testing.T) { // Shutdown and get results turboAgg.Shutdown() + cancel() + <-startDone // Collect results time.Sleep(200 * time.Millisecond) @@ -291,9 +308,8 @@ func TestTurboAggregateConcurrency(t *testing.T) { t.Errorf("Expected %d lines processed, got %d", expectedLines, turboAgg.linesProcessed.Load()) } - // Verify file count (may be higher if test was run multiple times) - if turboAgg.filesProcessed.Load() < uint64(numFiles) { - t.Errorf("Expected at least %d files processed, got %d", numFiles, turboAgg.filesProcessed.Load()) + if turboAgg.filesProcessed.Load() != uint64(numFiles) { + t.Errorf("Expected %d files processed, got %d", numFiles, turboAgg.filesProcessed.Load()) } // Parse result to check count @@ -330,3 +346,38 @@ func TestTurboAggregateAbortReturnsPromptlyWithActiveProcessors(t *testing.T) { t.Fatal("Abort did not return promptly while processors were still active") } } + +func TestTurboAggregateProcessorCountsFlushOnce(t *testing.T) { + aggregate := &TurboAggregate{ + done: internal.NewDone(), + batchSize: 16, + } + + processor := NewTurboAggregateProcessor(aggregate, "test") + if err := processor.Flush(); err != nil { + t.Fatalf("Flush failed: %v", err) + } + if err := processor.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + if got := aggregate.filesProcessed.Load(); got != 1 { + t.Fatalf("expected filesProcessed to be 1, got %d", got) + } + if got := aggregate.activeProcessors.Load(); got != 0 { + t.Fatalf("expected activeProcessors to be 0, got %d", got) + } +} + +func waitForTurboAggregateStart(t *testing.T, aggregate *TurboAggregate) { + t.Helper() + + if aggregate.started == nil { + t.Fatal("turbo aggregate missing start signal") + } + select { + case <-aggregate.started: + case <-time.After(500 * time.Millisecond): + t.Fatal("turbo aggregate did not finish Start initialization") + } +} |
