diff options
| author | Paul Buetow <paul@buetow.org> | 2025-06-29 21:55:36 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-06-29 21:55:36 +0300 |
| commit | a688faabdd2f1ddca1e28744eb1efe11a23f29d3 (patch) | |
| tree | 7edc0d361b22816e769bc2f2f1dde30b5048da45 /internal | |
| parent | 2140ed9dcbd180cd5e810eaabd2f3c2fbce55a57 (diff) | |
fix: improve aggregate channel switching for MapReduce operations
- Add mutex protection to prevent race conditions in nextLine()
- Implement synchronous channel put-back in turbo mode when possible
- Add timeout mechanism to prevent goroutine leaks
- Increase NextLinesCh buffer size to 1000 for better concurrency handling
- Document known limitation with turbo mode and high-concurrency MapReduce
These changes ensure TestDMap3 passes consistently without turbo mode.
With turbo mode, extreme concurrency (100+ files) may still have issues
due to the fundamental mismatch between turbo mode's speed and the
aggregate's channel rotation design. Workarounds are documented.
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 45 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 12 |
2 files changed, 53 insertions, 4 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 4f14751..1f735ac 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -3,6 +3,7 @@ package server import ( "context" "strings" + "sync" "time" "github.com/mimecast/dtail/internal" @@ -28,6 +29,8 @@ type Aggregate struct { query *mapr.Query // The mapr log format parser parser logformat.Parser + // mu protects concurrent access to channel switching + mu sync.Mutex } // NewAggregate return a new server side aggregator. @@ -65,7 +68,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { return &Aggregate{ done: internal.NewDone(), - NextLinesCh: make(chan chan *line.Line, 100), + NextLinesCh: make(chan chan *line.Line, 1000), serialize: make(chan struct{}), hostname: s[0], query: query, @@ -116,6 +119,10 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) { dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels) + // Protect channel operations with mutex to prevent race conditions + a.mu.Lock() + defer a.mu.Unlock() + select { case line, ok = <-a.linesCh: if !ok { @@ -131,8 +138,37 @@ func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) { select { case newLinesCh := <-a.NextLinesCh: oldLinesCh := a.linesCh - go func() { a.NextLinesCh <- oldLinesCh }() a.linesCh = newLinesCh + + // In turbo mode, synchronously put the channel back to avoid race conditions + if config.Env("DTAIL_TURBOBOOST_ENABLE") { + select { + case a.NextLinesCh <- oldLinesCh: + // Successfully put back + default: + // Channel is full, start a goroutine with timeout + go func() { + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case a.NextLinesCh <- oldLinesCh: + case <-timer.C: + dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh full") + } + }() + } + } else { + // Non-turbo mode: use goroutine as before + go func() { + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case a.NextLinesCh <- oldLinesCh: + case <-timer.C: + dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full") + } + }() + } default: // No new lines channel found. } @@ -148,11 +184,14 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin defer close(fieldsCh) // Gather first lines channel (first input file) + a.mu.Lock() select { case a.linesCh = <-a.NextLinesCh: case <-ctx.Done(): + a.mu.Unlock() return } + a.mu.Unlock() for { select { @@ -297,3 +336,5 @@ func (a *Aggregate) Serialize(ctx context.Context) { case <-ctx.Done(): } } + + diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 566d400..3d74e52 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -110,6 +110,10 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re) } wg.Wait() + + // In turbo mode with aggregate, we don't close the shared channel here + // because it will be used across multiple invocations + // The aggregate will handle channel closure when it's done } func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LContext, @@ -187,7 +191,9 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, for { if aggregate != nil { - lines = make(chan *line.Line, 100) + // Use a larger buffer for aggregate operations to handle high concurrency + // This prevents deadlock when processing many files simultaneously + lines = make(chan *line.Line, 10000) aggregate.NextLinesCh <- lines } if err := reader.Start(ctx, ltx, lines, re); err != nil { @@ -235,7 +241,9 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte for { if aggregate != nil { - lines = make(chan *line.Line, 100) + // Use a larger buffer for aggregate operations to handle high concurrency + // This prevents deadlock when processing many files simultaneously + lines = make(chan *line.Line, 10000) aggregate.NextLinesCh <- lines } |
