diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
| commit | f1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch) | |
| tree | a9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/server/handlers/basehandler.go | |
| parent | a4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff) | |
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access
- Implement TurboAggregate for direct line processing without channels
- Fix race conditions in turbo mode MapReduce aggregation
- Add proper synchronization for batch processing completion
- Update shutdown sequence to ensure all data is serialized
- Add integration test configuration for high-concurrency scenarios
The turbo mode now correctly handles MapReduce queries with significant
performance improvements while maintaining data integrity and preventing
race conditions during concurrent aggregation.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/basehandler.go')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 15 |
1 files changed, 15 insertions, 0 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index bfc7ec2..427ab6c 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -31,6 +31,7 @@ type baseHandler struct { handleCommandCb handleCommandCb lines chan *line.Line aggregate *server.Aggregate + turboAggregate *server.TurboAggregate // Turbo mode aggregate maprMessages chan string serverMessages chan string hostname string @@ -56,6 +57,16 @@ type baseHandler struct { // Shutdown the handler. func (h *baseHandler) Shutdown() { + // Shutdown turbo aggregate if present + if h.turboAggregate != nil { + dlog.Server.Info(h.user, "Shutting down turbo aggregate") + h.turboAggregate.Shutdown() + } + // Shutdown regular aggregate if present + if h.aggregate != nil { + dlog.Server.Info(h.user, "Shutting down regular aggregate") + h.aggregate.Shutdown() + } h.done.Shutdown() } @@ -372,6 +383,10 @@ func (h *baseHandler) flush() { if h.turboMode { maxIterations = 300 // Give more time for turbo mode to drain } + // Also increase iterations if we have MapReduce messages + if h.turboAggregate != nil || h.aggregate != nil { + maxIterations = 300 // Give more time for MapReduce results + } for i := 0; i < maxIterations; i++ { if numUnsentMessages() == 0 { |
