summaryrefslogtreecommitdiff
path: root/internal/server/handlers/basehandler.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 20:52:54 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 20:52:54 +0200
commit1b34e1f2501b8def0a0fb4eae28bf6c19a8adde2 (patch)
tree4898ab4ff4a7dd4ea102726a845e3935c39ee320 /internal/server/handlers/basehandler.go
parent07d654f76e1002b6ac18a43aab3c64797dcd2a32 (diff)
Fix serverless output draining regressions
Diffstat (limited to 'internal/server/handlers/basehandler.go')
-rw-r--r--internal/server/handlers/basehandler.go25
1 files changed, 14 insertions, 11 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 66c2cb7..06943b3 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -339,25 +339,28 @@ func (h *baseHandler) flush() {
return lineCount + serverCount + maprCount + turboCount
}
- // Increase iterations for turbo mode to handle large file batches
- maxIterations := 100
- if h.turbo.enabled() {
- maxIterations = 300 // Give more time for turbo mode to drain
+ maxWait := time.Second
+ if h.turbo.enabled() || h.turboAggregate != nil || h.aggregate != nil {
+ maxWait = 3 * time.Second
}
- // Also increase iterations if we have MapReduce messages
- if h.turboAggregate != nil || h.aggregate != nil {
- maxIterations = 300 // Give more time for MapReduce results
+ if h.serverless && maxWait < 5*time.Second {
+ maxWait = 5 * time.Second
}
- for i := 0; i < maxIterations; i++ {
- if numUnsentMessages() == 0 {
+ deadline := time.Now().Add(maxWait)
+ for i := 0; ; i++ {
+ unsent := numUnsentMessages()
+ if unsent == 0 {
dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h))
return
}
- dlog.Server.Debug(h.user, "Still lines to be sent", "iteration", i, "unsent", numUnsentMessages())
+ if time.Now().After(deadline) {
+ dlog.Server.Warn(h.user, "Some lines remain unsent", unsent)
+ return
+ }
+ dlog.Server.Debug(h.user, "Still lines to be sent", "iteration", i, "unsent", unsent, "deadline", deadline.Sub(time.Now()))
time.Sleep(time.Millisecond * 10)
}
- dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages())
}
func (h *baseHandler) shutdown() {