summaryrefslogtreecommitdiff
path: root/internal/server/handlers/turbo_manager.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 09:13:51 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 09:13:51 +0200
commit7a79d0a8bf58b05dfbae331d00275739530b9584 (patch)
tree156a7c91984f11cb334a589649f337e8fa7c434d /internal/server/handlers/turbo_manager.go
parent9f6850fc202e048dcdbfa6ffb59589d4a851cd84 (diff)
task 682e6ae9: filter stale generation output
Diffstat (limited to 'internal/server/handlers/turbo_manager.go')
-rw-r--r--internal/server/handlers/turbo_manager.go102
1 files changed, 57 insertions, 45 deletions
diff --git a/internal/server/handlers/turbo_manager.go b/internal/server/handlers/turbo_manager.go
index deed383..4b4a883 100644
--- a/internal/server/handlers/turbo_manager.go
+++ b/internal/server/handlers/turbo_manager.go
@@ -195,7 +195,7 @@ func (t *turboManager) flush(user *user.User) {
// tryRead tries to serve data from turbo state and channels.
// Returns handled=false when caller should continue with normal path.
-func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool) {
+func (t *turboManager) tryRead(p []byte, user *user.User, shouldDropGeneration func(uint64) bool) (n int, handled bool) {
if !t.mode {
return 0, false
}
@@ -215,57 +215,69 @@ func (t *turboManager) tryRead(p []byte, user *user.User) (n int, handled bool)
channelLen := len(t.lines)
dlog.Server.Trace(user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen)
- select {
- case turboData := <-t.lines:
- dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData))
- t.eofEmptySince = time.Time{}
- n = copy(p, turboData)
- if n < len(turboData) {
- t.buffer = turboData[n:]
- dlog.Server.Trace(user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(t.buffer))
- }
- return n, true
- default:
- if channelLen > 0 {
- dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting")
- time.Sleep(t.resolvedReadRetryInterval())
- select {
- case turboData := <-t.lines:
- dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData))
+ for {
+ select {
+ case turboData := <-t.lines:
+ generation, decodedData := decodeGeneratedBytes(turboData)
+ if shouldDropGeneration != nil && shouldDropGeneration(generation) {
t.eofEmptySince = time.Time{}
- n = copy(p, turboData)
- if n < len(turboData) {
- t.buffer = turboData[n:]
- }
- return n, true
- default:
- // Still no data.
+ continue
}
- }
-
- if t.eof != nil {
- select {
- case <-t.eof:
- if len(t.lines) > 0 {
+ dlog.Server.Trace(user, "baseHandler.Read", "got data from turboLines", "dataLen", len(decodedData))
+ t.eofEmptySince = time.Time{}
+ n = copy(p, decodedData)
+ if n < len(decodedData) {
+ t.buffer = decodedData[n:]
+ dlog.Server.Trace(user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(t.buffer))
+ }
+ return n, true
+ default:
+ if channelLen > 0 {
+ dlog.Server.Trace(user, "baseHandler.Read", "channel has data but not available, waiting")
+ time.Sleep(t.resolvedReadRetryInterval())
+ select {
+ case turboData := <-t.lines:
+ generation, decodedData := decodeGeneratedBytes(turboData)
+ if shouldDropGeneration != nil && shouldDropGeneration(generation) {
+ t.eofEmptySince = time.Time{}
+ continue
+ }
+ dlog.Server.Trace(user, "baseHandler.Read", "got data after wait", "dataLen", len(decodedData))
t.eofEmptySince = time.Time{}
- break
- }
-
- if t.eofEmptySince.IsZero() {
- t.eofEmptySince = time.Now()
- break
+ n = copy(p, decodedData)
+ if n < len(decodedData) {
+ t.buffer = decodedData[n:]
+ }
+ return n, true
+ default:
+ // Still no data.
}
+ }
- if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() {
- dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode")
- t.mode = false
- t.signalEOFAck()
+ if t.eof != nil {
+ select {
+ case <-t.eof:
+ if len(t.lines) > 0 {
+ t.eofEmptySince = time.Time{}
+ break
+ }
+
+ if t.eofEmptySince.IsZero() {
+ t.eofEmptySince = time.Now()
+ break
+ }
+
+ if time.Since(t.eofEmptySince) >= t.resolvedEOFAckQuietPeriod() {
+ dlog.Server.Trace(user, "baseHandler.Read", "EOF acknowledged and channel stable-empty, disabling turbo mode")
+ t.mode = false
+ t.signalEOFAck()
+ }
+ default:
}
- default:
}
- }
- dlog.Server.Trace(user, "baseHandler.Read", "no data in turboLines, falling through")
- return 0, false
+ dlog.Server.Trace(user, "baseHandler.Read", "no data in turboLines, falling through")
+ return 0, false
+ }
}
}