summaryrefslogtreecommitdiff
path: root/internal/server/handlers/basehandler.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
committerPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
commitf1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch)
treea9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/server/handlers/basehandler.go
parenta4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff)
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access - Implement TurboAggregate for direct line processing without channels - Fix race conditions in turbo mode MapReduce aggregation - Add proper synchronization for batch processing completion - Update shutdown sequence to ensure all data is serialized - Add integration test configuration for high-concurrency scenarios The turbo mode now correctly handles MapReduce queries with significant performance improvements while maintaining data integrity and preventing race conditions during concurrent aggregation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/basehandler.go')
-rw-r--r--internal/server/handlers/basehandler.go15
1 files changed, 15 insertions, 0 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index bfc7ec2..427ab6c 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -31,6 +31,7 @@ type baseHandler struct {
handleCommandCb handleCommandCb
lines chan *line.Line
aggregate *server.Aggregate
+ turboAggregate *server.TurboAggregate // Turbo mode aggregate
maprMessages chan string
serverMessages chan string
hostname string
@@ -56,6 +57,16 @@ type baseHandler struct {
// Shutdown the handler.
func (h *baseHandler) Shutdown() {
+ // Shutdown turbo aggregate if present
+ if h.turboAggregate != nil {
+ dlog.Server.Info(h.user, "Shutting down turbo aggregate")
+ h.turboAggregate.Shutdown()
+ }
+ // Shutdown regular aggregate if present
+ if h.aggregate != nil {
+ dlog.Server.Info(h.user, "Shutting down regular aggregate")
+ h.aggregate.Shutdown()
+ }
h.done.Shutdown()
}
@@ -372,6 +383,10 @@ func (h *baseHandler) flush() {
if h.turboMode {
maxIterations = 300 // Give more time for turbo mode to drain
}
+ // Also increase iterations if we have MapReduce messages
+ if h.turboAggregate != nil || h.aggregate != nil {
+ maxIterations = 300 // Give more time for MapReduce results
+ }
for i := 0; i < maxIterations; i++ {
if numUnsentMessages() == 0 {