summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/basehandler.go14
-rw-r--r--internal/server/handlers/readcommand.go5
2 files changed, 17 insertions, 2 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 427ab6c..3bb824b 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -409,6 +409,20 @@ func (h *baseHandler) shutdown() {
h.flushTurboData()
}
+ // Shutdown aggregates BEFORE flush to ensure MapReduce data is available
+ if h.turboAggregate != nil {
+ dlog.Server.Info(h.user, "Shutting down turbo aggregate in shutdown()")
+ h.turboAggregate.Shutdown()
+ // Give time for serialization to complete
+ time.Sleep(100 * time.Millisecond)
+ }
+ if h.aggregate != nil {
+ dlog.Server.Info(h.user, "Shutting down regular aggregate in shutdown()")
+ h.aggregate.Shutdown()
+ // Give time for serialization to complete
+ time.Sleep(100 * time.Millisecond)
+ }
+
h.flush()
go func() {
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index ce44996..0ceb8ee 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -175,8 +175,9 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
if r.server.turboAggregate != nil {
dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization")
r.server.turboAggregate.Serialize(context.Background())
- // Give time for serialization to complete
- time.Sleep(100 * time.Millisecond)
+ // Give more time for serialization to complete
+ // This is critical when processing many files concurrently
+ time.Sleep(500 * time.Millisecond)
}
// Double-check that we really have no pending work