summaryrefslogtreecommitdiff
path: root/internal/server/handlers/turbo_manager.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 10:42:33 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 10:42:33 +0200
commita426a2f9f33b1125a05d3aac29e7b98afdc36a99 (patch)
tree13d21a5aef7ec1e586e364ce5bebabb65fd77523 /internal/server/handlers/turbo_manager.go
parent3002bdcaa4ec22aa46b6c98eefda2f926dfff618 (diff)
server: use auth strategy registry and stabilize turbo EOF sync
Diffstat (limited to 'internal/server/handlers/turbo_manager.go')
-rw-r--r--internal/server/handlers/turbo_manager.go36
1 files changed, 33 insertions, 3 deletions
diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go
index e5cbf1a..deed383 100644
--- a/internal/server/handlers/turbo_manager.go
+++ b/internal/server/handlers/turbo_manager.go
@@ -12,6 +12,7 @@ const (
defaultTurboFlushTimeout = 2 * time.Second
defaultTurboFlushPollInterval = 10 * time.Millisecond
defaultTurboReadRetryInterval = time.Millisecond
+ defaultTurboEOFAckQuietPeriod = 50 * time.Millisecond
)
type turboManagerConfig struct {
@@ -19,6 +20,7 @@ type turboManagerConfig struct {
flushTimeout time.Duration
flushPollInterval time.Duration
readRetryInterval time.Duration
+ eofAckQuietPeriod time.Duration
}
type turboManager struct {
@@ -32,6 +34,9 @@ type turboManager struct {
flushTimeout time.Duration
flushPollInterval time.Duration
readRetryInterval time.Duration
+ eofAckQuietPeriod time.Duration
+
+ eofEmptySince time.Time
}
func (t *turboManager) configure(cfg turboManagerConfig) {
@@ -47,6 +52,9 @@ func (t *turboManager) configure(cfg turboManagerConfig) {
if cfg.readRetryInterval > 0 {
t.readRetryInterval = cfg.readRetryInterval
}
+ if cfg.eofAckQuietPeriod > 0 {
+ t.eofAckQuietPeriod = cfg.eofAckQuietPeriod
+ }
}
func (t *turboManager) resolvedChannelBufferSize() int {
@@ -77,6 +85,13 @@ func (t *turboManager) resolvedReadRetryInterval() time.Duration {
return defaultTurboReadRetryInterval
}
+func (t *turboManager) resolvedEOFAckQuietPeriod() time.Duration {
+ if t.eofAckQuietPeriod > 0 {
+ return t.eofAckQuietPeriod
+ }
+ return defaultTurboEOFAckQuietPeriod
+}
+
func (t *turboManager) enable() {
t.mode = true
if t.lines == nil {
@@ -85,6 +100,7 @@ func (t *turboManager) enable() {
// Always create a new EOF channel for each batch of files.
t.eof = make(chan struct{})
t.eofAck = make(chan struct{})
+ t.eofEmptySince = time.Time{}
}
func (t *turboManager) enabled() bool {
@@ -202,6 +218,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool)
select {
case turboData := <-t.lines:
dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData))
+ t.eofEmptySince = time.Time{}
n = copy(p, turboData)
if n < len(turboData) {
t.buffer = turboData[n:]
@@ -215,6 +232,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool)
select {
case turboData := <-t.lines:
dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData))
+ t.eofEmptySince = time.Time{}
n = copy(p, turboData)
if n < len(turboData) {
t.buffer = turboData[n:]
@@ -228,9 +246,21 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool)
if t.eof != nil {
select {
case <-t.eof:
- dlog.Server.Trace(user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode")
- t.mode = false
- t.signalEOFAck()
+ if len(t.lines) > 0 {
+ t.eofEmptySince = time.Time{}
+ break
+ }
+
+ if t.eofEmptySince.IsZero() {
+ t.eofEmptySince = time.Now()
+ break
+ }
+
+ if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() {
+ dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode")
+ t.mode = false
+ t.signalEOFAck()
+ }
default:
}
}