diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 09:13:51 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 09:13:51 +0200 |
| commit | 7a79d0a8bf58b05dfbae331d00275739530b9584 (patch) | |
| tree | 156a7c91984f11cb334a589649f337e8fa7c434d /internal/server/handlers/basehandler.go | |
| parent | 9f6850fc202e048dcdbfa6ffb59589d4a851cd84 (diff) | |
task 682e6ae9: filter stale generation output
Diffstat (limited to 'internal/server/handlers/basehandler.go')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 166 |
1 files changed, 103 insertions, 63 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 42cc4cc..66c2cb7 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -47,6 +47,8 @@ type baseHandler struct { serverless bool turbo turboManager + + activeGeneration func() uint64 } // Shutdown the handler. @@ -73,83 +75,108 @@ func (h *baseHandler) Done() <-chan struct{} { func (h *baseHandler) Read(p []byte) (n int, err error) { defer h.readBuf.Reset() - if n, handled := h.turbo.tryRead(p, h.user); handled { - return n, nil - } - - pollInterval := time.Second - if h.turbo.enabled() { - // Turbo reads require tighter wake-ups so we can continue draining the turbo channel. - pollInterval = h.turbo.resolvedReadRetryInterval() - } - poll := time.After(pollInterval) - - select { - case message := <-h.serverMessages: - if len(message) > 0 && message[0] == '.' { - // Handle hidden message (don't display to the user) - h.readBuf.WriteString(message) - h.readBuf.WriteByte(protocol.MessageDelimiter) - n = copy(p, h.readBuf.Bytes()) - return + for { + if n, handled := h.turbo.tryRead(p, h.user, h.shouldDropGeneration); handled { + if n == 0 { + continue + } + return n, nil } - if h.serverless { - return + pollInterval := time.Second + if h.turbo.enabled() { + // Turbo reads require tighter wake-ups so we can continue draining the turbo channel. + pollInterval = h.turbo.resolvedReadRetryInterval() } + poll := time.After(pollInterval) - // Skip empty server messages when in plain mode - if h.plain && (message == "" || message == "\n") { + select { + case message := <-h.serverMessages: + generation, decodedMessage := decodeGeneratedMessage(message) + if h.shouldDropGeneration(generation) { + continue + } + message = decodedMessage + if len(message) > 0 && message[0] == '.' { + // Handle hidden message (don't display to the user) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + return + } + + if h.serverless { + return + } + + // Skip empty server messages when in plain mode + if h.plain && (message == "" || message == "\n") { + return + } + + // Handle normal server message (display to the user). + formatServerMessage(&h.readBuf, h.hostname, message, h.plain) + n = copy(p, h.readBuf.Bytes()) return - } - // Handle normal server message (display to the user). - formatServerMessage(&h.readBuf, h.hostname, message, h.plain) - n = copy(p, h.readBuf.Bytes()) - - case message := <-h.maprMessages: - // Send mapreduce-aggregated data as a message. - h.readBuf.WriteString("AGGREGATE") - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(h.hostname) - h.readBuf.WriteString(protocol.FieldDelimiter) - h.readBuf.WriteString(message) - h.readBuf.WriteByte(protocol.MessageDelimiter) - n = copy(p, h.readBuf.Bytes()) - - case line := <-h.lines: - if h.plain { - h.readBuf.Write(line.Content.Bytes()) + case message := <-h.maprMessages: + generation, decodedMessage := decodeGeneratedMessage(message) + if h.shouldDropGeneration(generation) { + continue + } + message = decodedMessage + // Send mapreduce-aggregated data as a message. + h.readBuf.WriteString("AGGREGATE") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(message) h.readBuf.WriteByte(protocol.MessageDelimiter) - } else { - formatRemoteLine( - &h.readBuf, - h.hostname, - fmt.Sprintf("%3d", line.TransmittedPerc), - line.Count, - line.SourceID, - line.Content.Bytes(), - ) - } - n = copy(p, h.readBuf.Bytes()) - pool.RecycleBytesBuffer(line.Content) - line.Recycle() + n = copy(p, h.readBuf.Bytes()) + return - case <-h.done.Done(): - err = io.EOF - return + case line := <-h.lines: + if line == nil { + continue + } + if h.shouldDropGeneration(line.Generation) { + pool.RecycleBytesBuffer(line.Content) + line.Recycle() + continue + } + if h.plain { + h.readBuf.Write(line.Content.Bytes()) + h.readBuf.WriteByte(protocol.MessageDelimiter) + } else { + formatRemoteLine( + &h.readBuf, + h.hostname, + fmt.Sprintf("%3d", line.TransmittedPerc), + line.Count, + line.SourceID, + line.Content.Bytes(), + ) + } + n = copy(p, h.readBuf.Bytes()) + pool.RecycleBytesBuffer(line.Content) + line.Recycle() + return - case <-poll: - // Wake periodically so turbo mode transitions don't leave this read blocked forever. - select { case <-h.done.Done(): err = io.EOF return - default: + + case <-poll: + // Wake periodically so turbo mode transitions don't leave this read blocked forever. + select { + case <-h.done.Done(): + err = io.EOF + return + default: + } + return } - return } - return } // Write is to receive data from the dtail client via Writer interface. @@ -288,6 +315,19 @@ func (h *baseHandler) sendln(ch chan<- string, message string) { h.send(ch, message+"\n") } +func (h *baseHandler) shouldDropGeneration(generation uint64) bool { + if generation == 0 || h.activeGeneration == nil { + return false + } + + activeGeneration := h.activeGeneration() + if activeGeneration == 0 { + return false + } + + return activeGeneration != generation +} + func (h *baseHandler) flush() { dlog.Server.Trace(h.user, "flush()") numUnsentMessages := func() int { |
