diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 09:13:51 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 09:13:51 +0200 |
| commit | 7a79d0a8bf58b05dfbae331d00275739530b9584 (patch) | |
| tree | 156a7c91984f11cb334a589649f337e8fa7c434d /internal/server/handlers/turbo_manager.go | |
| parent | 9f6850fc202e048dcdbfa6ffb59589d4a851cd84 (diff) | |
task 682e6ae9: filter stale generation output
Diffstat (limited to 'internal/server/handlers/turbo_manager.go')
| -rw-r--r-- | internal/server/handlers/turbo_manager.go | 102 |
1 files changed, 57 insertions, 45 deletions
diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go index deed383..4b4a883 100644 --- a/internal/server/handlers/turbo_manager.go +++ b/internal/server/handlers/turbo_manager.go @@ -195,7 +195,7 @@ func (t *turboManager) flush(user *user.User) { // tryRead tries to serve data from turbo state and channels. // Returns handled=false when caller should continue with normal path. -func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) { +func (t *turboManager) tryRead(p []byte, user *user.User, shouldDropGeneration func(uint64) bool) (n int, handled bool) { if !t.mode { return 0, false } @@ -215,57 +215,69 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) channelLen := len(t.lines) dlog.Server.Trace(user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen) - select { - case turboData := <-t.lines: - dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData)) - t.eofEmptySince = time.Time{} - n = copy(p, turboData) - if n < len(turboData) { - t.buffer = turboData[n:] - dlog.Server.Trace(user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(t.buffer)) - } - return n, true - default: - if channelLen > 0 { - dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting") - time.Sleep(t.resolvedReadRetryInterval()) - select { - case turboData := <-t.lines: - dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData)) + for { + select { + case turboData := <-t.lines: + generation, decodedData := decodeGeneratedBytes(turboData) + if shouldDropGeneration != nil && shouldDropGeneration(generation) { t.eofEmptySince = time.Time{} - n = copy(p, turboData) - if n < len(turboData) { - t.buffer = turboData[n:] - } - return n, true - default: - // Still no data. + continue } - } - - if t.eof != nil { - select { - case <-t.eof: - if len(t.lines) > 0 { + dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(decodedData)) + t.eofEmptySince = time.Time{} + n = copy(p, decodedData) + if n < len(decodedData) { + t.buffer = decodedData[n:] + dlog.Server.Trace(user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(t.buffer)) + } + return n, true + default: + if channelLen > 0 { + dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting") + time.Sleep(t.resolvedReadRetryInterval()) + select { + case turboData := <-t.lines: + generation, decodedData := decodeGeneratedBytes(turboData) + if shouldDropGeneration != nil && shouldDropGeneration(generation) { + t.eofEmptySince = time.Time{} + continue + } + dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(decodedData)) t.eofEmptySince = time.Time{} - break - } - - if t.eofEmptySince.IsZero() { - t.eofEmptySince = time.Now() - break + n = copy(p, decodedData) + if n < len(decodedData) { + t.buffer = decodedData[n:] + } + return n, true + default: + // Still no data. } + } - if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() { - dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode") - t.mode = false - t.signalEOFAck() + if t.eof != nil { + select { + case <-t.eof: + if len(t.lines) > 0 { + t.eofEmptySince = time.Time{} + break + } + + if t.eofEmptySince.IsZero() { + t.eofEmptySince = time.Now() + break + } + + if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() { + dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode") + t.mode = false + t.signalEOFAck() + } + default: } - default: } - } - dlog.Server.Trace(user, "baseHandler.Read", "no data in turboLines, falling through") - return 0, false + dlog.Server.Trace(user, "baseHandler.Read", "no data in turboLines, falling through") + return 0, false + } } } |
