summaryrefslogtreecommitdiff
path: root/internal/mapr/server/aggregate.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
committerPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
commitf1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch)
treea9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/mapr/server/aggregate.go
parenta4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff)
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access - Implement TurboAggregate for direct line processing without channels - Fix race conditions in turbo mode MapReduce aggregation - Add proper synchronization for batch processing completion - Update shutdown sequence to ensure all data is serialized - Add integration test configuration for high-concurrency scenarios The turbo mode now correctly handles MapReduce queries with significant performance improvements while maintaining data integrity and preventing race conditions during concurrent aggregation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/mapr/server/aggregate.go')
-rw-r--r--internal/mapr/server/aggregate.go70
1 files changed, 38 insertions, 32 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index f055b9d..353cda5 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -68,7 +68,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
return &Aggregate{
done: internal.NewDone(),
- NextLinesCh: make(chan chan *line.Line, 1000),
+ NextLinesCh: make(chan chan *line.Line, 10000), // Increased buffer for high concurrency
serialize: make(chan struct{}),
hostname: s[0],
query: query,
@@ -116,15 +116,15 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) {
}
}
-func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) {
- dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels)
+func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) {
+ dlog.Server.Trace("nextLine.enter", l, ok, noMoreChannels)
// Protect channel operations with mutex to prevent race conditions
a.mu.Lock()
defer a.mu.Unlock()
select {
- case line, ok = <-a.linesCh:
+ case l, ok = <-a.linesCh:
if !ok {
// Channel is closed, go to next channel.
select {
@@ -140,40 +140,46 @@ func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) {
oldLinesCh := a.linesCh
a.linesCh = newLinesCh
- // In turbo mode, synchronously put the channel back to avoid race conditions
- if config.Server.TurboModeEnable {
- select {
- case a.NextLinesCh <- oldLinesCh:
- // Successfully put back
- default:
- // Channel is full, start a goroutine with timeout
- go func() {
- timer := time.NewTimer(5 * time.Second)
- defer timer.Stop()
- select {
- case a.NextLinesCh <- oldLinesCh:
- case <-timer.C:
- dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh full")
- }
- }()
- }
- } else {
- // Non-turbo mode: use goroutine as before
- go func() {
- timer := time.NewTimer(5 * time.Second)
- defer timer.Stop()
+ // Ensure the old channel is fully drained before recycling to prevent data mixing
+ go func(oldCh chan *line.Line) {
+ // First, drain any remaining lines from the old channel
+ drained := 0
+ drainLoop:
+ for {
select {
- case a.NextLinesCh <- oldLinesCh:
- case <-timer.C:
- dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full")
+ case l, ok := <-oldCh:
+ if !ok {
+ // Channel is closed, safe to recycle
+ break drainLoop
+ }
+ if l != nil {
+ l.Recycle()
+ drained++
+ }
+ default:
+ // No more lines to drain immediately
+ break drainLoop
}
- }()
- }
+ }
+
+ if drained > 0 {
+ dlog.Server.Debug("Drained", drained, "lines from recycled channel")
+ }
+
+ // Now safely recycle the drained channel
+ timer := time.NewTimer(5 * time.Second)
+ defer timer.Stop()
+ select {
+ case a.NextLinesCh <- oldCh:
+ case <-timer.C:
+ dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full")
+ }
+ }(oldLinesCh)
default:
// No new lines channel found.
}
}
- dlog.Server.Trace("nextLine.exit", line, ok, noMoreChannels)
+ dlog.Server.Trace("nextLine.exit", l, ok, noMoreChannels)
return
}