summaryrefslogtreecommitdiff
path: root/internal/server/handlers/serverhandler.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
committerPaul Buetow <paul@buetow.org>2025-07-03 16:13:26 +0300
commitf1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch)
treea9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/server/handlers/serverhandler.go
parenta4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff)
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access - Implement TurboAggregate for direct line processing without channels - Fix race conditions in turbo mode MapReduce aggregation - Add proper synchronization for batch processing completion - Update shutdown sequence to ensure all data is serialized - Add integration test configuration for high-concurrency scenarios The turbo mode now correctly handles MapReduce queries with significant performance improvements while maintaining data integrity and preventing race conditions during concurrent aggregation. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/serverhandler.go')
-rw-r--r--internal/server/handlers/serverhandler.go3
1 files changed, 2 insertions, 1 deletions
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index da27066..df227ab 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -93,7 +93,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon
commandFinished()
}()
case "map":
- command, aggregate, err := newMapCommand(h, argc, args)
+ command, aggregate, turboAggregate, err := newMapCommand(h, argc, args)
if err != nil {
h.sendln(h.serverMessages, err.Error())
dlog.Server.Error(h.user, err)
@@ -101,6 +101,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon
return
}
h.aggregate = aggregate
+ h.turboAggregate = turboAggregate
go func() {
command.Start(ctx, h.maprMessages)
commandFinished()