diff options
Diffstat (limited to 'internal/server/handlers/basehandler.go')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 25 |
1 files changed, 14 insertions, 11 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 66c2cb7..06943b3 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -339,25 +339,28 @@ func (h *baseHandler) flush() { return lineCount + serverCount + maprCount + turboCount } - // Increase iterations for turbo mode to handle large file batches - maxIterations := 100 - if h.turbo.enabled() { - maxIterations = 300 // Give more time for turbo mode to drain + maxWait := time.Second + if h.turbo.enabled() || h.turboAggregate != nil || h.aggregate != nil { + maxWait = 3 * time.Second } - // Also increase iterations if we have MapReduce messages - if h.turboAggregate != nil || h.aggregate != nil { - maxIterations = 300 // Give more time for MapReduce results + if h.serverless && maxWait < 5*time.Second { + maxWait = 5 * time.Second } - for i := 0; i < maxIterations; i++ { - if numUnsentMessages() == 0 { + deadline := time.Now().Add(maxWait) + for i := 0; ; i++ { + unsent := numUnsentMessages() + if unsent == 0 { dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h)) return } - dlog.Server.Debug(h.user, "Still lines to be sent", "iteration", i, "unsent", numUnsentMessages()) + if time.Now().After(deadline) { + dlog.Server.Warn(h.user, "Some lines remain unsent", unsent) + return + } + dlog.Server.Debug(h.user, "Still lines to be sent", "iteration", i, "unsent", unsent, "deadline", deadline.Sub(time.Now())) time.Sleep(time.Millisecond * 10) } - dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages()) } func (h *baseHandler) shutdown() { |
