diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-03 16:13:26 +0300 |
| commit | f1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (patch) | |
| tree | a9348d535148dae1a93f1b08e17d9870a30e7c75 /internal/server/handlers/serverhandler.go | |
| parent | a4eb3cc769c13312fdd4b7aaa20659e408f734b7 (diff) | |
fix: implement thread-safe turbo mode for MapReduce operations
- Add SafeAggregateSet wrapper with mutex protection for concurrent access
- Implement TurboAggregate for direct line processing without channels
- Fix race conditions in turbo mode MapReduce aggregation
- Add proper synchronization for batch processing completion
- Update shutdown sequence to ensure all data is serialized
- Add integration test configuration for high-concurrency scenarios
The turbo mode now correctly handles MapReduce queries with significant
performance improvements while maintaining data integrity and preventing
race conditions during concurrent aggregation.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/serverhandler.go')
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 3 |
1 files changed, 2 insertions, 1 deletions
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index da27066..df227ab 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -93,7 +93,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon commandFinished() }() case "map": - command, aggregate, err := newMapCommand(h, argc, args) + command, aggregate, turboAggregate, err := newMapCommand(h, argc, args) if err != nil { h.sendln(h.serverMessages, err.Error()) dlog.Server.Error(h.user, err) @@ -101,6 +101,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon return } h.aggregate = aggregate + h.turboAggregate = turboAggregate go func() { command.Start(ctx, h.maprMessages) commandFinished() |
