summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-04 22:01:28 +0200
committerPaul Buetow <paul@buetow.org>2026-03-04 22:01:28 +0200
commit8b7116421dc30ddef2d8426f825cc7f50f3f0ccd (patch)
tree34167ef2eabcbfbfc8a5a913639d6cc5f2f9450b
parent9b147c4e49b8a6a378da6728cc74d9453ad2501f (diff)
fix: correct break/select flow and remove unreachable code (task 338)
-rw-r--r--integrationtests/dtail_test.go9
-rw-r--r--internal/mapr/server/turbo_aggregate.go178
2 files changed, 93 insertions, 94 deletions
diff --git a/integrationtests/dtail_test.go b/integrationtests/dtail_test.go
index 2e9da5b..cfa96c8 100644
--- a/integrationtests/dtail_test.go
+++ b/integrationtests/dtail_test.go
@@ -15,7 +15,7 @@ func TestDTailWithServer(t *testing.T) {
testLogger := NewTestLogger("TestDTailWithServer")
defer testLogger.WriteLogFile()
cleanupTmpFiles(t)
-
+
if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") {
t.Log("Skipping")
return
@@ -91,6 +91,7 @@ func TestDTailWithServer(t *testing.T) {
var greetingsRecv []string
+readLoop:
for len(greetingsRecv) < len(greetings) {
select {
case line := <-serverCh:
@@ -105,7 +106,7 @@ func TestDTailWithServer(t *testing.T) {
}
case <-ctx.Done():
t.Log("Done reading client and server pipes")
- break
+ break readLoop
}
}
@@ -157,7 +158,7 @@ func TestDTailColorTable(t *testing.T) {
func testDTailColorTableServerless(t *testing.T, logger *TestLogger) {
ctx := WithTestLogger(context.Background(), logger)
-
+
outFile := "dtailcolortable.stdout.tmp"
expectedOutFile := "dtailcolortable.expected"
@@ -199,7 +200,7 @@ func testDTailColorTableWithServer(t *testing.T, logger *TestLogger) {
// Give server time to start
time.Sleep(500 * time.Millisecond)
- _, err = runCommand(ctx, t, outFile, "../dtail",
+ _, err = runCommand(ctx, t, outFile, "../dtail",
"--colorTable",
"--servers", fmt.Sprintf("%s:%d", bindAddress, port),
"--trustAllHosts")
diff --git a/internal/mapr/server/turbo_aggregate.go b/internal/mapr/server/turbo_aggregate.go
index c3a5dd2..a317578 100644
--- a/internal/mapr/server/turbo_aggregate.go
+++ b/internal/mapr/server/turbo_aggregate.go
@@ -29,12 +29,12 @@ type TurboAggregate struct {
// The mapr log format parser
parser logformat.Parser
// Group sets protected by mutex during serialization
- groupSets sync.Map // map[string]*mapr.SafeAggregateSet
- bufferMu sync.Mutex // Protects serialization
+ groupSets sync.Map // map[string]*mapr.SafeAggregateSet
+ bufferMu sync.Mutex // Protects serialization
// Batch processing
- batchMu sync.Mutex
- batch []rawLine
- batchSize int
+ batchMu sync.Mutex
+ batch []rawLine
+ batchSize int
// Periodic serialization
serializeTicker *time.Ticker
serialize chan struct{}
@@ -80,7 +80,7 @@ func NewTurboAggregate(queryStr string) (*TurboAggregate, error) {
parserName = query.LogFormat
}
- dlog.Server.Info("Creating turbo log format parser",
+ dlog.Server.Info("Creating turbo log format parser",
"parserName", parserName,
"queryTable", query.Table,
"queryLogFormat", query.LogFormat)
@@ -93,14 +93,14 @@ func NewTurboAggregate(queryStr string) (*TurboAggregate, error) {
}
return &TurboAggregate{
- done: internal.NewDone(),
- serialize: make(chan struct{}, 1), // Buffered to avoid blocking
- hostname: s[0],
- query: query,
- parser: logParser,
- groupSets: sync.Map{},
- batchSize: 100, // Process 100 lines at a time
- batch: make([]rawLine, 0, 100),
+ done: internal.NewDone(),
+ serialize: make(chan struct{}, 1), // Buffered to avoid blocking
+ hostname: s[0],
+ query: query,
+ parser: logParser,
+ groupSets: sync.Map{},
+ batchSize: 100, // Process 100 lines at a time
+ batch: make([]rawLine, 0, 100),
fieldPool: sync.Pool{
New: func() interface{} {
return make(map[string]string, 20)
@@ -129,60 +129,60 @@ func min(a, b int) int {
// Shutdown the aggregation engine.
func (a *TurboAggregate) Shutdown() {
- dlog.Server.Info("TurboAggregate: Shutdown called",
+ dlog.Server.Info("TurboAggregate: Shutdown called",
"linesProcessed", a.linesProcessed.Load(),
"filesProcessed", a.filesProcessed.Load(),
"activeProcessors", a.activeProcessors.Load(),
"currentGroups", a.countGroups())
-
+
// Signal shutdown
a.done.Shutdown()
-
+
// Stop the ticker
if a.serializeTicker != nil {
a.serializeTicker.Stop()
}
-
+
// Wait for active processors to finish
for a.activeProcessors.Load() > 0 {
- dlog.Server.Info("TurboAggregate: Waiting for active processors",
+ dlog.Server.Info("TurboAggregate: Waiting for active processors",
"activeProcessors", a.activeProcessors.Load())
time.Sleep(10 * time.Millisecond)
}
-
+
// Process any remaining batch synchronously
dlog.Server.Info("TurboAggregate: Processing final batch")
a.processBatchAndWait()
-
+
// Wait for all processing to complete
dlog.Server.Info("TurboAggregate: Waiting for all processing to complete")
a.processingWg.Wait()
-
- dlog.Server.Info("TurboAggregate: All processing complete, groups before final serialization",
+
+ dlog.Server.Info("TurboAggregate: All processing complete, groups before final serialization",
"groupCount", a.countGroups())
-
+
// Trigger final serialization after all processing is done
// Use a longer timeout to ensure data gets through
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
-
+
dlog.Server.Info("TurboAggregate: Triggering final serialization")
a.doSerialize(ctx)
-
+
// Give more time for messages to be sent and processed
// This is crucial to ensure the baseHandler's Read method picks up the messages
- dlog.Server.Info("TurboAggregate: Waiting for message delivery",
+ dlog.Server.Info("TurboAggregate: Waiting for message delivery",
"channelLen", len(a.maprMessages))
-
+
// Wait for channel to drain or timeout
timeout := time.After(2 * time.Second)
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
-
+
for {
select {
case <-timeout:
- dlog.Server.Warn("TurboAggregate: Timeout waiting for message delivery",
+ dlog.Server.Warn("TurboAggregate: Timeout waiting for message delivery",
"remainingMessages", len(a.maprMessages))
return
case <-ticker.C:
@@ -192,15 +192,13 @@ func (a *TurboAggregate) Shutdown() {
}
}
}
-
- dlog.Server.Info("TurboAggregate: Shutdown complete")
}
// Start the turbo aggregation.
func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string) {
a.maprMessages = maprMessages
-
- dlog.Server.Info("TurboAggregate: Starting",
+
+ dlog.Server.Info("TurboAggregate: Starting",
"interval", a.query.Interval)
// Start periodic serialization
@@ -209,7 +207,7 @@ func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string)
// Start batch processor
go a.batchProcessorLoop(ctx)
-
+
// Debug: Don't trigger immediate serialization - let data accumulate first
dlog.Server.Info("TurboAggregate: Started, waiting for data")
}
@@ -219,17 +217,17 @@ func (a *TurboAggregate) Start(ctx context.Context, maprMessages chan<- string)
func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string) error {
// Increment counter first
a.linesProcessed.Add(1)
-
+
// Debug: Track when lines are received
totalLines := a.linesProcessed.Load()
if totalLines == 1 || totalLines%1000 == 0 {
- dlog.Server.Info("TurboAggregate: ProcessLineDirect called",
+ dlog.Server.Info("TurboAggregate: ProcessLineDirect called",
"totalLinesReceived", totalLines,
"sourceID", sourceID,
"lineLength", len(lineContent),
"linePreview", string(lineContent[:min(50, len(lineContent))]))
}
-
+
// Make a copy of the line content as the buffer will be recycled
content := make([]byte, len(lineContent))
copy(content, lineContent)
@@ -242,14 +240,14 @@ func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string)
a.batchMu.Unlock()
if batchLen == 1 {
- dlog.Server.Info("TurboAggregate: First line received in batch",
+ dlog.Server.Info("TurboAggregate: First line received in batch",
"sourceID", sourceID,
"batchSize", a.batchSize)
}
// Process batch if full
if shouldProcess {
- dlog.Server.Debug("TurboAggregate: Batch full, processing",
+ dlog.Server.Debug("TurboAggregate: Batch full, processing",
"batchLen", batchLen)
a.processBatch()
}
@@ -261,7 +259,7 @@ func (a *TurboAggregate) ProcessLineDirect(lineContent []byte, sourceID string)
func (a *TurboAggregate) batchProcessorLoop(ctx context.Context) {
dlog.Server.Info("TurboAggregate: Batch processor loop started")
defer dlog.Server.Info("TurboAggregate: Batch processor loop ended")
-
+
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
@@ -275,18 +273,18 @@ func (a *TurboAggregate) batchProcessorLoop(ctx context.Context) {
case <-ticker.C:
// Periodically process any accumulated batch
a.processBatch()
-
+
// Check if context is done but only exit if no pending work
select {
case <-ctx.Done():
a.batchMu.Lock()
batchLen := len(a.batch)
a.batchMu.Unlock()
-
+
activeProcs := a.activeProcessors.Load()
-
+
if batchLen > 0 || activeProcs > 0 {
- dlog.Server.Info("TurboAggregate: Context cancelled but work pending",
+ dlog.Server.Info("TurboAggregate: Context cancelled but work pending",
"batchLen", batchLen,
"activeProcessors", activeProcs)
// Continue processing
@@ -313,7 +311,7 @@ func (a *TurboAggregate) processBatch() {
a.batch = make([]rawLine, 0, a.batchSize)
a.batchMu.Unlock()
- dlog.Server.Info("TurboAggregate: Processing batch",
+ dlog.Server.Info("TurboAggregate: Processing batch",
"batchSize", batchSize,
"totalLinesProcessed", a.linesProcessed.Load())
@@ -334,8 +332,8 @@ func (a *TurboAggregate) processBatch() {
}
// Note: line count is already incremented in ProcessLineDirect
}
-
- dlog.Server.Info("TurboAggregate: Batch processed",
+
+ dlog.Server.Info("TurboAggregate: Batch processed",
"successCount", successCount,
"errorCount", errorCount,
"totalLinesProcessed", a.linesProcessed.Load())
@@ -354,7 +352,7 @@ func (a *TurboAggregate) processBatchAndWait() {
a.batch = make([]rawLine, 0, a.batchSize)
a.batchMu.Unlock()
- dlog.Server.Info("TurboAggregate: Processing batch synchronously",
+ dlog.Server.Info("TurboAggregate: Processing batch synchronously",
"batchSize", batchSize,
"totalLinesProcessed", a.linesProcessed.Load())
@@ -371,8 +369,8 @@ func (a *TurboAggregate) processBatchAndWait() {
}
// Note: line count is already incremented in ProcessLineDirect
}
-
- dlog.Server.Info("TurboAggregate: Batch processed synchronously",
+
+ dlog.Server.Info("TurboAggregate: Batch processed synchronously",
"successCount", successCount,
"errorCount", errorCount,
"totalLinesProcessed", a.linesProcessed.Load())
@@ -385,7 +383,7 @@ func (a *TurboAggregate) processLine(lineContent []byte, sourceID string) error
// Debug: Log sample lines
if a.linesProcessed.Load()%1000 == 0 {
- dlog.Server.Debug("TurboAggregate: Processing line",
+ dlog.Server.Debug("TurboAggregate: Processing line",
"lineNumber", a.linesProcessed.Load(),
"linePreview", maprLine[:min(100, len(maprLine))])
}
@@ -404,7 +402,7 @@ func (a *TurboAggregate) processLine(lineContent []byte, sourceID string) error
parsedFields, err := a.parser.MakeFields(maprLine)
if err != nil {
if err != logformat.ErrIgnoreFields {
- dlog.Server.Debug("TurboAggregate: Parser error",
+ dlog.Server.Debug("TurboAggregate: Parser error",
"error", err,
"line", maprLine[:min(100, len(maprLine))])
return err
@@ -416,10 +414,10 @@ func (a *TurboAggregate) processLine(lineContent []byte, sourceID string) error
for k, v := range parsedFields {
fields[k] = v
}
-
+
// Debug: Log parsed fields for first few lines
if a.linesProcessed.Load() < 5 {
- dlog.Server.Info("TurboAggregate: Parsed fields",
+ dlog.Server.Info("TurboAggregate: Parsed fields",
"lineNumber", a.linesProcessed.Load(),
"fieldCount", len(fields),
"fields", fields)
@@ -461,9 +459,9 @@ func (a *TurboAggregate) aggregate(fields map[string]string) {
// Get or create the aggregate set
setInterface, loaded := a.groupSets.LoadOrStore(groupKey, mapr.NewSafeAggregateSet())
set := setInterface.(*mapr.SafeAggregateSet)
-
+
if !loaded {
- dlog.Server.Info("TurboAggregate: New group created",
+ dlog.Server.Info("TurboAggregate: New group created",
"groupKey", groupKey,
"totalGroups", a.countGroups())
}
@@ -474,7 +472,7 @@ func (a *TurboAggregate) aggregate(fields map[string]string) {
for _, sc := range a.query.Select {
if val, ok := fields[sc.Field]; ok {
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
- dlog.Server.Error("TurboAggregate: Aggregation error",
+ dlog.Server.Error("TurboAggregate: Aggregation error",
"field", sc.Field,
"operation", sc.Operation,
"error", err)
@@ -489,7 +487,7 @@ func (a *TurboAggregate) aggregate(fields map[string]string) {
set.IncrementSamples()
// Debug: Log aggregation details for first few samples
if a.linesProcessed.Load() < 10 {
- dlog.Server.Info("TurboAggregate: Aggregated sample",
+ dlog.Server.Info("TurboAggregate: Aggregated sample",
"groupKey", groupKey,
"aggregatedFields", aggregatedFields,
"sampleCount", set.GetSamples())
@@ -501,7 +499,7 @@ func (a *TurboAggregate) aggregate(fields map[string]string) {
func (a *TurboAggregate) serializationLoop(ctx context.Context) {
dlog.Server.Info("TurboAggregate: Serialization loop started")
defer dlog.Server.Info("TurboAggregate: Serialization loop ended")
-
+
for {
select {
case <-ctx.Done():
@@ -532,10 +530,10 @@ func (a *TurboAggregate) Serialize(ctx context.Context) {
// doSerialize performs the actual serialization.
func (a *TurboAggregate) doSerialize(ctx context.Context) {
- dlog.Server.Info("TurboAggregate: Starting serialization",
+ dlog.Server.Info("TurboAggregate: Starting serialization",
"linesProcessed", a.linesProcessed.Load(),
"currentGroups", a.countGroups())
-
+
// Process any remaining batch synchronously before serialization
dlog.Server.Info("TurboAggregate: Processing remaining batch before serialization")
a.processBatchAndWait()
@@ -550,9 +548,9 @@ func (a *TurboAggregate) doSerialize(ctx context.Context) {
// Count groups before serialization
groupsBeforeSerialization := a.countGroups()
- dlog.Server.Info("TurboAggregate: Groups before serialization",
+ dlog.Server.Info("TurboAggregate: Groups before serialization",
"count", groupsBeforeSerialization)
-
+
if groupsBeforeSerialization == 0 {
dlog.Server.Warn("TurboAggregate: No groups to serialize!")
return
@@ -567,35 +565,35 @@ func (a *TurboAggregate) doSerialize(ctx context.Context) {
a.groupSets.Range(func(key, value interface{}) bool {
groupKey := key.(string)
safeSet := value.(*mapr.SafeAggregateSet)
-
+
// Clone the safe set to get a regular AggregateSet
clonedSet := safeSet.Clone()
-
+
// Debug: Log details of first few groups
if groupCount < 5 {
- sampleDetails = append(sampleDetails,
+ sampleDetails = append(sampleDetails,
fmt.Sprintf("group=%s, samples=%d", groupKey, clonedSet.Samples))
}
-
+
// Add to the group set
groupSet := group.GetSet(groupKey)
*groupSet = *clonedSet
groupCount++
-
+
return true
})
- dlog.Server.Info("TurboAggregate: Serialization details",
+ dlog.Server.Info("TurboAggregate: Serialization details",
"groupCount", groupCount,
"sampleGroups", sampleDetails,
"maprMessagesChannel", a.maprMessages != nil)
-
+
// Check if we have a valid channel
if a.maprMessages == nil {
dlog.Server.Error("TurboAggregate: maprMessages channel is nil!")
return
}
-
+
// Serialize the group - use a longer timeout context for final serialization
// to ensure data is sent even during shutdown
serializeCtx := ctx
@@ -605,14 +603,14 @@ func (a *TurboAggregate) doSerialize(ctx context.Context) {
serializeCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
}
-
- dlog.Server.Info("TurboAggregate: Calling group.Serialize",
+
+ dlog.Server.Info("TurboAggregate: Calling group.Serialize",
"channelCap", cap(a.maprMessages),
"channelLen", len(a.maprMessages))
-
+
group.Serialize(serializeCtx, a.maprMessages)
-
- dlog.Server.Info("TurboAggregate: group.Serialize completed",
+
+ dlog.Server.Info("TurboAggregate: group.Serialize completed",
"sentGroups", groupCount,
"channelLen", len(a.maprMessages))
@@ -626,10 +624,10 @@ func (a *TurboAggregate) doSerialize(ctx context.Context) {
dlog.Server.Info("TurboAggregate: Clearing groupSets for next interval")
a.groupSets = sync.Map{}
}
-
+
// Log the state after serialization
groupsAfterSerialization := a.countGroups()
- dlog.Server.Info("TurboAggregate: After serialization",
+ dlog.Server.Info("TurboAggregate: After serialization",
"groupsRemaining", groupsAfterSerialization)
}
@@ -642,7 +640,7 @@ type TurboAggregateProcessor struct {
// NewTurboAggregateProcessor creates a new turbo aggregate processor.
func NewTurboAggregateProcessor(aggregate *TurboAggregate, globID string) *TurboAggregateProcessor {
aggregate.activeProcessors.Add(1)
- dlog.Server.Debug("TurboAggregate: New processor created",
+ dlog.Server.Debug("TurboAggregate: New processor created",
"globID", globID,
"activeProcessors", aggregate.activeProcessors.Load())
return &TurboAggregateProcessor{
@@ -660,30 +658,30 @@ func (p *TurboAggregateProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum
"sourceID", sourceID,
"contentLen", lineContent.Len())
}
-
+
// Process the line directly
err := p.aggregate.ProcessLineDirect(lineContent.Bytes(), sourceID)
-
+
// Recycle the buffer
pool.RecycleBytesBuffer(lineContent)
-
+
return err
}
// Flush ensures all buffered data is processed.
func (p *TurboAggregateProcessor) Flush() error {
// Log flush call for debugging
- dlog.Server.Info("TurboAggregateProcessor: Flush called",
+ dlog.Server.Info("TurboAggregateProcessor: Flush called",
"globID", p.globID,
"linesProcessed", p.aggregate.linesProcessed.Load())
-
+
// Process any remaining batch synchronously
p.aggregate.processBatchAndWait()
-
+
// Increment files processed counter
p.aggregate.filesProcessed.Add(1)
-
- dlog.Server.Info("TurboAggregateProcessor: Flush completed",
+
+ dlog.Server.Info("TurboAggregateProcessor: Flush completed",
"globID", p.globID,
"linesProcessed", p.aggregate.linesProcessed.Load(),
"filesProcessed", p.aggregate.filesProcessed.Load())
@@ -694,8 +692,8 @@ func (p *TurboAggregateProcessor) Flush() error {
func (p *TurboAggregateProcessor) Close() error {
err := p.Flush()
p.aggregate.activeProcessors.Add(-1)
- dlog.Server.Debug("TurboAggregate: Processor closed",
+ dlog.Server.Debug("TurboAggregate: Processor closed",
"globID", p.globID,
"activeProcessors", p.aggregate.activeProcessors.Load())
return err
-} \ No newline at end of file
+}