summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-07-03 17:58:06 +0300
committerPaul Buetow <paul@buetow.org>2025-07-03 17:58:06 +0300
commit859be4593e4f7ef37ff2c91dc90f42e6930a3996 (patch)
treea73597068c3e5f34017d4e348267f8051f3be614 /internal/server/handlers/readcommand.go
parentf1ae8e6eb80c8f2f4b4b18b5b93893ad3249c6a1 (diff)
fix: improve turbo mode MapReduce batch processing and shutdown sequence
- Fixed batch processor to use synchronous processing during shutdown - Added processBatchAndWait method for guaranteed batch completion - Fixed Flush() to ensure all data is processed before file completion - Improved parser selection logic for table-based queries - Added extensive debug logging for troubleshooting - Increased wait times for serialization during shutdown These changes address data loss issues when processing multiple files concurrently in turbo mode. The batch processor now properly flushes all remaining data when files complete and during shutdown. Note: Integration tests still failing due to SSH authentication issues in test environment, but core turbo mode logic has been fixed. 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go5
1 files changed, 3 insertions, 2 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index ce44996..0ceb8ee 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -175,8 +175,9 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
if r.server.turboAggregate != nil {
dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization")
r.server.turboAggregate.Serialize(context.Background())
- // Give time for serialization to complete
- time.Sleep(100 * time.Millisecond)
+ // Give more time for serialization to complete
+ // This is critical when processing many files concurrently
+ time.Sleep(500 * time.Millisecond)
}
// Double-check that we really have no pending work