diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 10:42:33 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 10:42:33 +0200 |
| commit | a426a2f9f33b1125a05d3aac29e7b98afdc36a99 (patch) | |
| tree | 13d21a5aef7ec1e586e364ce5bebabb65fd77523 /internal/server/handlers/turbo_manager.go | |
| parent | 3002bdcaa4ec22aa46b6c98eefda2f926dfff618 (diff) | |
server: use auth strategy registry and stabilize turbo EOF sync
Diffstat (limited to 'internal/server/handlers/turbo_manager.go')
| -rw-r--r-- | internal/server/handlers/turbo_manager.go | 36 |
1 files changed, 33 insertions, 3 deletions
diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go index e5cbf1a..deed383 100644 --- a/internal/server/handlers/turbo_manager.go +++ b/internal/server/handlers/turbo_manager.go @@ -12,6 +12,7 @@ const ( defaultTurboFlushTimeout = 2 * time.Second defaultTurboFlushPollInterval = 10 * time.Millisecond defaultTurboReadRetryInterval = time.Millisecond + defaultTurboEOFAckQuietPeriod = 50 * time.Millisecond ) type turboManagerConfig struct { @@ -19,6 +20,7 @@ type turboManagerConfig struct { flushTimeout time.Duration flushPollInterval time.Duration readRetryInterval time.Duration + eofAckQuietPeriod time.Duration } type turboManager struct { @@ -32,6 +34,9 @@ type turboManager struct { flushTimeout time.Duration flushPollInterval time.Duration readRetryInterval time.Duration + eofAckQuietPeriod time.Duration + + eofEmptySince time.Time } func (t *turboManager) configure(cfg turboManagerConfig) { @@ -47,6 +52,9 @@ func (t *turboManager) configure(cfg turboManagerConfig) { if cfg.readRetryInterval > 0 { t.readRetryInterval = cfg.readRetryInterval } + if cfg.eofAckQuietPeriod > 0 { + t.eofAckQuietPeriod = cfg.eofAckQuietPeriod + } } func (t *turboManager) resolvedChannelBufferSize() int { @@ -77,6 +85,13 @@ func (t *turboManager) resolvedReadRetryInterval() time.Duration { return defaultTurboReadRetryInterval } +func (t *turboManager) resolvedEOFAckQuietPeriod() time.Duration { + if t.eofAckQuietPeriod > 0 { + return t.eofAckQuietPeriod + } + return defaultTurboEOFAckQuietPeriod +} + func (t *turboManager) enable() { t.mode = true if t.lines == nil { @@ -85,6 +100,7 @@ func (t *turboManager) enable() { // Always create a new EOF channel for each batch of files. t.eof = make(chan struct{}) t.eofAck = make(chan struct{}) + t.eofEmptySince = time.Time{} } func (t *turboManager) enabled() bool { @@ -202,6 +218,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) select { case turboData := <-t.lines: dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData)) + t.eofEmptySince = time.Time{} n = copy(p, turboData) if n < len(turboData) { t.buffer = turboData[n:] @@ -215,6 +232,7 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) select { case turboData := <-t.lines: dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData)) + t.eofEmptySince = time.Time{} n = copy(p, turboData) if n < len(turboData) { t.buffer = turboData[n:] @@ -228,9 +246,21 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) if t.eof != nil { select { case <-t.eof: - dlog.Server.Trace(user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode") - t.mode = false - t.signalEOFAck() + if len(t.lines) > 0 { + t.eofEmptySince = time.Time{} + break + } + + if t.eofEmptySince.IsZero() { + t.eofEmptySince = time.Now() + break + } + + if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() { + dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode") + t.mode = false + t.signalEOFAck() + } default: } } |
