diff options
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 14 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 5 |
2 files changed, 17 insertions, 2 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 427ab6c..3bb824b 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -409,6 +409,20 @@ func (h *baseHandler) shutdown() { h.flushTurboData() } + // Shutdown aggregates BEFORE flush to ensure MapReduce data is available + if h.turboAggregate != nil { + dlog.Server.Info(h.user, "Shutting down turbo aggregate in shutdown()") + h.turboAggregate.Shutdown() + // Give time for serialization to complete + time.Sleep(100 * time.Millisecond) + } + if h.aggregate != nil { + dlog.Server.Info(h.user, "Shutting down regular aggregate in shutdown()") + h.aggregate.Shutdown() + // Give time for serialization to complete + time.Sleep(100 * time.Millisecond) + } + h.flush() go func() { diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index ce44996..0ceb8ee 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -175,8 +175,9 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC if r.server.turboAggregate != nil { dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization") r.server.turboAggregate.Serialize(context.Background()) - // Give time for serialization to complete - time.Sleep(100 * time.Millisecond) + // Give more time for serialization to complete + // This is critical when processing many files concurrently + time.Sleep(500 * time.Millisecond) } // Double-check that we really have no pending work |
