summaryrefslogtreecommitdiff
path: root/internal/mapr/server/aggregate.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-29 21:55:36 +0300
committerPaul Buetow <paul@buetow.org>2025-06-29 21:55:36 +0300
commita688faabdd2f1ddca1e28744eb1efe11a23f29d3 (patch)
tree7edc0d361b22816e769bc2f2f1dde30b5048da45 /internal/mapr/server/aggregate.go
parent2140ed9dcbd180cd5e810eaabd2f3c2fbce55a57 (diff)
fix: improve aggregate channel switching for MapReduce operations
- Add mutex protection to prevent race conditions in nextLine() - Implement synchronous channel put-back in turbo mode when possible - Add timeout mechanism to prevent goroutine leaks - Increase NextLinesCh buffer size to 1000 for better concurrency handling - Document known limitation with turbo mode and high-concurrency MapReduce These changes ensure TestDMap3 passes consistently without turbo mode. With turbo mode, extreme concurrency (100+ files) may still have issues due to the fundamental mismatch between turbo mode's speed and the aggregate's channel rotation design. Workarounds are documented. Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/mapr/server/aggregate.go')
-rw-r--r--internal/mapr/server/aggregate.go45
1 files changed, 43 insertions, 2 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 4f14751..1f735ac 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -3,6 +3,7 @@ package server
import (
"context"
"strings"
+ "sync"
"time"
"github.com/mimecast/dtail/internal"
@@ -28,6 +29,8 @@ type Aggregate struct {
query *mapr.Query
// The mapr log format parser
parser logformat.Parser
+ // mu protects concurrent access to channel switching
+ mu sync.Mutex
}
// NewAggregate return a new server side aggregator.
@@ -65,7 +68,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
return &Aggregate{
done: internal.NewDone(),
- NextLinesCh: make(chan chan *line.Line, 100),
+ NextLinesCh: make(chan chan *line.Line, 1000),
serialize: make(chan struct{}),
hostname: s[0],
query: query,
@@ -116,6 +119,10 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) {
func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) {
dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels)
+ // Protect channel operations with mutex to prevent race conditions
+ a.mu.Lock()
+ defer a.mu.Unlock()
+
select {
case line, ok = <-a.linesCh:
if !ok {
@@ -131,8 +138,37 @@ func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) {
select {
case newLinesCh := <-a.NextLinesCh:
oldLinesCh := a.linesCh
- go func() { a.NextLinesCh <- oldLinesCh }()
a.linesCh = newLinesCh
+
+ // In turbo mode, synchronously put the channel back to avoid race conditions
+ if config.Env("DTAIL_TURBOBOOST_ENABLE") {
+ select {
+ case a.NextLinesCh <- oldLinesCh:
+ // Successfully put back
+ default:
+ // Channel is full, start a goroutine with timeout
+ go func() {
+ timer := time.NewTimer(5 * time.Second)
+ defer timer.Stop()
+ select {
+ case a.NextLinesCh <- oldLinesCh:
+ case <-timer.C:
+ dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh full")
+ }
+ }()
+ }
+ } else {
+ // Non-turbo mode: use goroutine as before
+ go func() {
+ timer := time.NewTimer(5 * time.Second)
+ defer timer.Stop()
+ select {
+ case a.NextLinesCh <- oldLinesCh:
+ case <-timer.C:
+ dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full")
+ }
+ }()
+ }
default:
// No new lines channel found.
}
@@ -148,11 +184,14 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
defer close(fieldsCh)
// Gather first lines channel (first input file)
+ a.mu.Lock()
select {
case a.linesCh = <-a.NextLinesCh:
case <-ctx.Done():
+ a.mu.Unlock()
return
}
+ a.mu.Unlock()
for {
select {
@@ -297,3 +336,5 @@ func (a *Aggregate) Serialize(ctx context.Context) {
case <-ctx.Done():
}
}
+
+