diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
| commit | f1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch) | |
| tree | a9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/mapr/server/aggregate.go | |
| parent | a4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff) | |
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access
- Implement TurboAggregate for direct line processing without channels
- Fix race conditions in turbo mode MapReduce aggregation
- Add proper synchronization for batch processing completion
- Update shutdown sequence to ensure all data is serialized
- Add integration test configuration for high-concurrency scenarios
The turbo mode now correctly handles MapReduce queries with significant
performance improvements while maintaining data integrity and preventing
race conditions during concurrent aggregation.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/mapr/server/aggregate.go')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 70 |
1 files changed, 38 insertions, 32 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index f055b9d..353cda5 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -68,7 +68,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { return &Aggregate{ done: internal.NewDone(), - NextLinesCh: make(chan chan *line.Line, 1000), + NextLinesCh: make(chan chan *line.Line, 10000), // Increased buffer for high concurrency serialize: make(chan struct{}), hostname: s[0], query: query, @@ -116,15 +116,15 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { } } -func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) { - dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels) +func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) { + dlog.Server.Trace("nextLine.enter", l, ok, noMoreChannels) // Protect channel operations with mutex to prevent race conditions a.mu.Lock() defer a.mu.Unlock() select { - case line, ok = <-a.linesCh: + case l, ok = <-a.linesCh: if !ok { // Channel is closed, go to next channel. select { @@ -140,40 +140,46 @@ func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) { oldLinesCh := a.linesCh a.linesCh = newLinesCh - // In turbo mode, synchronously put the channel back to avoid race conditions - if config.Server.TurboModeEnable { - select { - case a.NextLinesCh <- oldLinesCh: - // Successfully put back - default: - // Channel is full, start a goroutine with timeout - go func() { - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() - select { - case a.NextLinesCh <- oldLinesCh: - case <-timer.C: - dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh full") - } - }() - } - } else { - // Non-turbo mode: use goroutine as before - go func() { - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() + // Ensure the old channel is fully drained before recycling to prevent data mixing + go func(oldCh chan *line.Line) { + // First, drain any remaining lines from the old channel + drained := 0 + drainLoop: + for { select { - case a.NextLinesCh <- oldLinesCh: - case <-timer.C: - dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full") + case l, ok := <-oldCh: + if !ok { + // Channel is closed, safe to recycle + break drainLoop + } + if l != nil { + l.Recycle() + drained++ + } + default: + // No more lines to drain immediately + break drainLoop } - }() - } + } + + if drained > 0 { + dlog.Server.Debug("Drained", drained, "lines from recycled channel") + } + + // Now safely recycle the drained channel + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case a.NextLinesCh <- oldCh: + case <-timer.C: + dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full") + } + }(oldLinesCh) default: // No new lines channel found. } } - dlog.Server.Trace("nextLine.exit", line, ok, noMoreChannels) + dlog.Server.Trace("nextLine.exit", l, ok, noMoreChannels) return } |
