diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-04 22:01:28 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-04 22:01:28 +0200 |
| commit | 8b7116421dc30ddef2d8426f825cc7f50f3f0ccd (patch) | |
| tree | 34167ef2eabcbfbfc8a5a913639d6cc5f2f9450b | |
| parent | 9b147c4e49b8a6a378da6728cc74d9453ad2501f (diff) | |
fix: correct break/select flow and remove unreachable code (task 338)
| -rw-r--r-- | integrationtests/dtail_test.go | 9 | ||||
| -rw-r--r-- | internal/mapr/server/turbo_aggregate.go | 178 |
2 files changed, 93 insertions, 94 deletions
diff --git a/integrationtests/dtail_test.go b/integrationtests/dtail_test.go index 2e9da5b..cfa96c8 100644 --- a/integrationtests/dtail_test.go +++ b/integrationtests/dtail_test.go @@ -15,7 +15,7 @@ func TestDTailWithServer(t *testing.T) { testLogger := NewTestLogger("TestDTailWithServer") defer testLogger.WriteLogFile() cleanupTmpFiles(t) - + if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { t.Log("Skipping") return @@ -91,6 +91,7 @@ func TestDTailWithServer(t *testing.T) { var greetingsRecv []string +readLoop: for len(greetingsRecv) < len(greetings) { select { case line := <-serverCh: @@ -105,7 +106,7 @@ func TestDTailWithServer(t *testing.T) { } case <-ctx.Done(): t.Log("Done reading client and server pipes") - break + break readLoop } } @@ -157,7 +158,7 @@ func TestDTailColorTable(t *testing.T) { func testDTailColorTableServerless(t *testing.T, logger *TestLogger) { ctx := WithTestLogger(context.Background(), logger) - + outFile := "dtailcolortable.stdout.tmp" expectedOutFile := "dtailcolortable.expected" @@ -199,7 +200,7 @@ func testDTailColorTableWithServer(t *testing.T, logger *TestLogger) { // Give server time to start time.Sleep(500 * time.Millisecond) - _, err = runCommand(ctx, t, outFile, "../dtail", + _, err = runCommand(ctx, t, outFile, "../dtail", "--colorTable", "--servers", fmt.Sprintf("%s:%d", bindAddress, port), "--trustAllHosts") diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go index c3a5dd2..a317578 100644 --- a/internal/mapr/server/turbo_aggregate.go +++ b/internal/mapr/server/turbo_aggregate.go @@ -29,12 +29,12 @@ type TurboAggregate struct { // The mapr log format parser parser logformat.Parser // Group sets protected by mutex during serialization - groupSets sync.Map // map[string]*mapr.SafeAggregateSet - bufferMu sync.Mutex // Protects serialization + groupSets sync.Map // map[string]*mapr.SafeAggregateSet + bufferMu sync.Mutex // Protects serialization // Batch processing - batchMu sync.Mutex - batch []rawLine - batchSize int + batchMu sync.Mutex + batch []rawLine + batchSize int // Periodic serialization serializeTicker *time.Ticker serialize chan struct{} @@ -80,7 +80,7 @@ func NewTurboAggregate(queryStr string) (*TurboAggregate, error) { parserName = query.LogFormat } - dlog.Server.Info("Creating turbo log format parser", + dlog.Server.Info("Creating turbo log format parser", "parserName", parserName, "queryTable", query.Table, "queryLogFormat", query.LogFormat) @@ -93,14 +93,14 @@ func NewTurboAggregate(queryStr string) (*TurboAggregate, error) { } return &TurboAggregate{ - done: internal.NewDone(), - serialize: make(chan struct{}, 1), // Buffered to avoid blocking - hostname: s[0], - query: query, - parser: logParser, - groupSets: sync.Map{}, - batchSize: 100, // Process 100 lines at a time - batch: make([]rawLine, 0, 100), + done: internal.NewDone(), + serialize: make(chan struct{}, 1), // Buffered to avoid blocking + hostname: s[0], + query: query, + parser: logParser, + groupSets: sync.Map{}, + batchSize: 100, // Process 100 lines at a time + batch: make([]rawLine, 0, 100), fieldPool: sync.Pool{ New: func() interface{} { return make(map[string]string, 20) @@ -129,60 +129,60 @@ func min(a, b int) int { // Shutdown the aggregation engine. func (a *TurboAggregate) Shutdown() { - dlog.Server.Info("TurboAggregate: Shutdown called", + dlog.Server.Info("TurboAggregate: Shutdown called", "linesProcessed", a.linesProcessed.Load(), "filesProcessed", a.filesProcessed.Load(), "activeProcessors", a.activeProcessors.Load(), "currentGroups", a.countGroups()) - + // Signal shutdown a.done.Shutdown() - + // Stop the ticker if a.serializeTicker != nil { a.serializeTicker.Stop() } - + // Wait for active processors to finish for a.activeProcessors.Load() > 0 { - dlog.Server.Info("TurboAggregate: Waiting for active processors", + dlog.Server.Info("TurboAggregate: Waiting for active processors", "activeProcessors", a.activeProcessors.Load()) time.Sleep(10 * time.Millisecond) } - + // Process any remaining batch synchronously dlog.Server.Info("TurboAggregate: Processing final batch") a.processBatchAndWait() - + // Wait for all processing to complete dlog.Server.Info("TurboAggregate: Waiting for all processing to complete") a.processingWg.Wait() - - dlog.Server.Info("TurboAggregate: All processing complete, groups before final serialization", + + dlog.Server.Info("TurboAggregate: All processing complete, groups before final serialization", "groupCount", a.countGroups()) - + // Trigger final serialization after all processing is done // Use a longer timeout to ensure data gets through ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - + dlog.Server.Info("TurboAggregate: Triggering final serialization") a.doSerialize(ctx) - + // Give more time for messages to be sent and processed // This is crucial to ensure the baseHandler's Read method picks up the messages - dlog.Server.Info("TurboAggregate: Waiting for message delivery", + dlog.Server.Info("TurboAggregate: Waiting for message delivery", "channelLen", len(a.maprMessages)) - + // Wait for channel to drain or timeout timeout := time.After(2 * time.Second) ticker := time.NewTicker(50 * time.Millisecond) defer ticker.Stop() - + for { select { case <-timeout: - dlog.Server.Warn("TurboAggregate: Timeout waiting for message delivery", + dlog.Server.Warn("TurboAggregate: Timeout waiting for message delivery", "remainingMessages", len(a.maprMessages)) return case <-ticker.C: @@ -192,15 +192,13 @@ func (a *TurboAggregate) Shutdown() { } } } - - dlog.Server.Info("TurboAggregate: Shutdown complete") } // Start the turbo aggregation. func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) { a.maprMessages = maprMessages - - dlog.Server.Info("TurboAggregate: Starting", + + dlog.Server.Info("TurboAggregate: Starting", "interval", a.query.Interval) // Start periodic serialization @@ -209,7 +207,7 @@ func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) // Start batch processor go a.batchProcessorLoop(ctx) - + // Debug: Don't trigger immediate serialization - let data accumulate first dlog.Server.Info("TurboAggregate: Started, waiting for data") } @@ -219,17 +217,17 @@ func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) error { // Increment counter first a.linesProcessed.Add(1) - + // Debug: Track when lines are received totalLines := a.linesProcessed.Load() if totalLines == 1 || totalLines%1000 == 0 { - dlog.Server.Info("TurboAggregate: ProcessLineDirect called", + dlog.Server.Info("TurboAggregate: ProcessLineDirect called", "totalLinesReceived", totalLines, "sourceID", sourceID, "lineLength", len(lineContent), "linePreview", string(lineContent[:min(50, len(lineContent))])) } - + // Make a copy of the line content as the buffer will be recycled content := make([]byte, len(lineContent)) copy(content, lineContent) @@ -242,14 +240,14 @@ func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) a.batchMu.Unlock() if batchLen == 1 { - dlog.Server.Info("TurboAggregate: First line received in batch", + dlog.Server.Info("TurboAggregate: First line received in batch", "sourceID", sourceID, "batchSize", a.batchSize) } // Process batch if full if shouldProcess { - dlog.Server.Debug("TurboAggregate: Batch full, processing", + dlog.Server.Debug("TurboAggregate: Batch full, processing", "batchLen", batchLen) a.processBatch() } @@ -261,7 +259,7 @@ func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) func (a *TurboAggregate) batchProcessorLoop(ctx context.Context) { dlog.Server.Info("TurboAggregate: Batch processor loop started") defer dlog.Server.Info("TurboAggregate: Batch processor loop ended") - + ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() @@ -275,18 +273,18 @@ func (a *TurboAggregate) batchProcessorLoop(ctx context.Context) { case <-ticker.C: // Periodically process any accumulated batch a.processBatch() - + // Check if context is done but only exit if no pending work select { case <-ctx.Done(): a.batchMu.Lock() batchLen := len(a.batch) a.batchMu.Unlock() - + activeProcs := a.activeProcessors.Load() - + if batchLen > 0 || activeProcs > 0 { - dlog.Server.Info("TurboAggregate: Context cancelled but work pending", + dlog.Server.Info("TurboAggregate: Context cancelled but work pending", "batchLen", batchLen, "activeProcessors", activeProcs) // Continue processing @@ -313,7 +311,7 @@ func (a *TurboAggregate) processBatch() { a.batch = make([]rawLine, 0, a.batchSize) a.batchMu.Unlock() - dlog.Server.Info("TurboAggregate: Processing batch", + dlog.Server.Info("TurboAggregate: Processing batch", "batchSize", batchSize, "totalLinesProcessed", a.linesProcessed.Load()) @@ -334,8 +332,8 @@ func (a *TurboAggregate) processBatch() { } // Note: line count is already incremented in ProcessLineDirect } - - dlog.Server.Info("TurboAggregate: Batch processed", + + dlog.Server.Info("TurboAggregate: Batch processed", "successCount", successCount, "errorCount", errorCount, "totalLinesProcessed", a.linesProcessed.Load()) @@ -354,7 +352,7 @@ func (a *TurboAggregate) processBatchAndWait() { a.batch = make([]rawLine, 0, a.batchSize) a.batchMu.Unlock() - dlog.Server.Info("TurboAggregate: Processing batch synchronously", + dlog.Server.Info("TurboAggregate: Processing batch synchronously", "batchSize", batchSize, "totalLinesProcessed", a.linesProcessed.Load()) @@ -371,8 +369,8 @@ func (a *TurboAggregate) processBatchAndWait() { } // Note: line count is already incremented in ProcessLineDirect } - - dlog.Server.Info("TurboAggregate: Batch processed synchronously", + + dlog.Server.Info("TurboAggregate: Batch processed synchronously", "successCount", successCount, "errorCount", errorCount, "totalLinesProcessed", a.linesProcessed.Load()) @@ -385,7 +383,7 @@ func (a *TurboAggregate) processLine(lineContent []byte, sourceID string) error // Debug: Log sample lines if a.linesProcessed.Load()%1000 == 0 { - dlog.Server.Debug("TurboAggregate: Processing line", + dlog.Server.Debug("TurboAggregate: Processing line", "lineNumber", a.linesProcessed.Load(), "linePreview", maprLine[:min(100, len(maprLine))]) } @@ -404,7 +402,7 @@ func (a *TurboAggregate) processLine(lineContent []byte, sourceID string) error parsedFields, err := a.parser.MakeFields(maprLine) if err != nil { if err != logformat.ErrIgnoreFields { - dlog.Server.Debug("TurboAggregate: Parser error", + dlog.Server.Debug("TurboAggregate: Parser error", "error", err, "line", maprLine[:min(100, len(maprLine))]) return err @@ -416,10 +414,10 @@ func (a *TurboAggregate) processLine(lineContent []byte, sourceID string) error for k, v := range parsedFields { fields[k] = v } - + // Debug: Log parsed fields for first few lines if a.linesProcessed.Load() < 5 { - dlog.Server.Info("TurboAggregate: Parsed fields", + dlog.Server.Info("TurboAggregate: Parsed fields", "lineNumber", a.linesProcessed.Load(), "fieldCount", len(fields), "fields", fields) @@ -461,9 +459,9 @@ func (a *TurboAggregate) aggregate(fields map[string]string) { // Get or create the aggregate set setInterface, loaded := a.groupSets.LoadOrStore(groupKey, mapr.NewSafeAggregateSet()) set := setInterface.(*mapr.SafeAggregateSet) - + if !loaded { - dlog.Server.Info("TurboAggregate: New group created", + dlog.Server.Info("TurboAggregate: New group created", "groupKey", groupKey, "totalGroups", a.countGroups()) } @@ -474,7 +472,7 @@ func (a *TurboAggregate) aggregate(fields map[string]string) { for _, sc := range a.query.Select { if val, ok := fields[sc.Field]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { - dlog.Server.Error("TurboAggregate: Aggregation error", + dlog.Server.Error("TurboAggregate: Aggregation error", "field", sc.Field, "operation", sc.Operation, "error", err) @@ -489,7 +487,7 @@ func (a *TurboAggregate) aggregate(fields map[string]string) { set.IncrementSamples() // Debug: Log aggregation details for first few samples if a.linesProcessed.Load() < 10 { - dlog.Server.Info("TurboAggregate: Aggregated sample", + dlog.Server.Info("TurboAggregate: Aggregated sample", "groupKey", groupKey, "aggregatedFields", aggregatedFields, "sampleCount", set.GetSamples()) @@ -501,7 +499,7 @@ func (a *TurboAggregate) aggregate(fields map[string]string) { func (a *TurboAggregate) serializationLoop(ctx context.Context) { dlog.Server.Info("TurboAggregate: Serialization loop started") defer dlog.Server.Info("TurboAggregate: Serialization loop ended") - + for { select { case <-ctx.Done(): @@ -532,10 +530,10 @@ func (a *TurboAggregate) Serialize(ctx context.Context) { // doSerialize performs the actual serialization. func (a *TurboAggregate) doSerialize(ctx context.Context) { - dlog.Server.Info("TurboAggregate: Starting serialization", + dlog.Server.Info("TurboAggregate: Starting serialization", "linesProcessed", a.linesProcessed.Load(), "currentGroups", a.countGroups()) - + // Process any remaining batch synchronously before serialization dlog.Server.Info("TurboAggregate: Processing remaining batch before serialization") a.processBatchAndWait() @@ -550,9 +548,9 @@ func (a *TurboAggregate) doSerialize(ctx context.Context) { // Count groups before serialization groupsBeforeSerialization := a.countGroups() - dlog.Server.Info("TurboAggregate: Groups before serialization", + dlog.Server.Info("TurboAggregate: Groups before serialization", "count", groupsBeforeSerialization) - + if groupsBeforeSerialization == 0 { dlog.Server.Warn("TurboAggregate: No groups to serialize!") return @@ -567,35 +565,35 @@ func (a *TurboAggregate) doSerialize(ctx context.Context) { a.groupSets.Range(func(key, value interface{}) bool { groupKey := key.(string) safeSet := value.(*mapr.SafeAggregateSet) - + // Clone the safe set to get a regular AggregateSet clonedSet := safeSet.Clone() - + // Debug: Log details of first few groups if groupCount < 5 { - sampleDetails = append(sampleDetails, + sampleDetails = append(sampleDetails, fmt.Sprintf("group=%s, samples=%d", groupKey, clonedSet.Samples)) } - + // Add to the group set groupSet := group.GetSet(groupKey) *groupSet = *clonedSet groupCount++ - + return true }) - dlog.Server.Info("TurboAggregate: Serialization details", + dlog.Server.Info("TurboAggregate: Serialization details", "groupCount", groupCount, "sampleGroups", sampleDetails, "maprMessagesChannel", a.maprMessages != nil) - + // Check if we have a valid channel if a.maprMessages == nil { dlog.Server.Error("TurboAggregate: maprMessages channel is nil!") return } - + // Serialize the group - use a longer timeout context for final serialization // to ensure data is sent even during shutdown serializeCtx := ctx @@ -605,14 +603,14 @@ func (a *TurboAggregate) doSerialize(ctx context.Context) { serializeCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() } - - dlog.Server.Info("TurboAggregate: Calling group.Serialize", + + dlog.Server.Info("TurboAggregate: Calling group.Serialize", "channelCap", cap(a.maprMessages), "channelLen", len(a.maprMessages)) - + group.Serialize(serializeCtx, a.maprMessages) - - dlog.Server.Info("TurboAggregate: group.Serialize completed", + + dlog.Server.Info("TurboAggregate: group.Serialize completed", "sentGroups", groupCount, "channelLen", len(a.maprMessages)) @@ -626,10 +624,10 @@ func (a *TurboAggregate) doSerialize(ctx context.Context) { dlog.Server.Info("TurboAggregate: Clearing groupSets for next interval") a.groupSets = sync.Map{} } - + // Log the state after serialization groupsAfterSerialization := a.countGroups() - dlog.Server.Info("TurboAggregate: After serialization", + dlog.Server.Info("TurboAggregate: After serialization", "groupsRemaining", groupsAfterSerialization) } @@ -642,7 +640,7 @@ type TurboAggregateProcessor struct { // NewTurboAggregateProcessor creates a new turbo aggregate processor. func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *TurboAggregateProcessor { aggregate.activeProcessors.Add(1) - dlog.Server.Debug("TurboAggregate: New processor created", + dlog.Server.Debug("TurboAggregate: New processor created", "globID", globID, "activeProcessors", aggregate.activeProcessors.Load()) return &TurboAggregateProcessor{ @@ -660,30 +658,30 @@ func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum "sourceID", sourceID, "contentLen", lineContent.Len()) } - + // Process the line directly err := p.aggregate.ProcessLineDirect(lineContent.Bytes(), sourceID) - + // Recycle the buffer pool.RecycleBytesBuffer(lineContent) - + return err } // Flush ensures all buffered data is processed. func (p *TurboAggregateProcessor) Flush() error { // Log flush call for debugging - dlog.Server.Info("TurboAggregateProcessor: Flush called", + dlog.Server.Info("TurboAggregateProcessor: Flush called", "globID", p.globID, "linesProcessed", p.aggregate.linesProcessed.Load()) - + // Process any remaining batch synchronously p.aggregate.processBatchAndWait() - + // Increment files processed counter p.aggregate.filesProcessed.Add(1) - - dlog.Server.Info("TurboAggregateProcessor: Flush completed", + + dlog.Server.Info("TurboAggregateProcessor: Flush completed", "globID", p.globID, "linesProcessed", p.aggregate.linesProcessed.Load(), "filesProcessed", p.aggregate.filesProcessed.Load()) @@ -694,8 +692,8 @@ func (p *TurboAggregateProcessor) Flush() error { func (p *TurboAggregateProcessor) Close() error { err := p.Flush() p.aggregate.activeProcessors.Add(-1) - dlog.Server.Debug("TurboAggregate: Processor closed", + dlog.Server.Debug("TurboAggregate: Processor closed", "globID", p.globID, "activeProcessors", p.aggregate.activeProcessors.Load()) return err -}
\ No newline at end of file +} |
