summaryrefslogtreecommitdiff
path: root/internal/server/handlers/basehandler.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 09:13:51 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 09:13:51 +0200
commit7a79d0a8bf58b05dfbae331d00275739530b9584 (patch)
tree156a7c91984f11cb334a589649f337e8fa7c434d /internal/server/handlers/basehandler.go
parent9f6850fc202e048dcdbfa6ffb59589d4a851cd84 (diff)
task 682e6ae9: filter stale generation output
Diffstat (limited to 'internal/server/handlers/basehandler.go')
-rw-r--r--internal/server/handlers/basehandler.go166
1 files changed, 103 insertions, 63 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 42cc4cc..66c2cb7 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -47,6 +47,8 @@ type baseHandler struct {
serverless bool
turbo turboManager
+
+ activeGeneration func() uint64
}
// Shutdown the handler.
@@ -73,83 +75,108 @@ func (h *baseHandler) Done() <-chan struct{} {
func (h *baseHandler) Read(p []byte) (n int, err error) {
defer h.readBuf.Reset()
- if n, handled := h.turbo.tryRead(p, h.user); handled {
- return n, nil
- }
-
- pollInterval := time.Second
- if h.turbo.enabled() {
- // Turbo reads require tighter wake-ups so we can continue draining the turbo channel.
- pollInterval = h.turbo.resolvedReadRetryInterval()
- }
- poll := time.After(pollInterval)
-
- select {
- case message := <-h.serverMessages:
- if len(message) > 0 && message[0] == '.' {
- // Handle hidden message (don't display to the user)
- h.readBuf.WriteString(message)
- h.readBuf.WriteByte(protocol.MessageDelimiter)
- n = copy(p, h.readBuf.Bytes())
- return
+ for {
+ if n, handled := h.turbo.tryRead(p, h.user, h.shouldDropGeneration); handled {
+ if n == 0 {
+ continue
+ }
+ return n, nil
}
- if h.serverless {
- return
+ pollInterval := time.Second
+ if h.turbo.enabled() {
+ // Turbo reads require tighter wake-ups so we can continue draining the turbo channel.
+ pollInterval = h.turbo.resolvedReadRetryInterval()
}
+ poll := time.After(pollInterval)
- // Skip empty server messages when in plain mode
- if h.plain && (message == "" || message == "\n") {
+ select {
+ case message := <-h.serverMessages:
+ generation, decodedMessage := decodeGeneratedMessage(message)
+ if h.shouldDropGeneration(generation) {
+ continue
+ }
+ message = decodedMessage
+ if len(message) > 0 && message[0] == '.' {
+ // Handle hidden message (don't display to the user)
+ h.readBuf.WriteString(message)
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ n = copy(p, h.readBuf.Bytes())
+ return
+ }
+
+ if h.serverless {
+ return
+ }
+
+ // Skip empty server messages when in plain mode
+ if h.plain && (message == "" || message == "\n") {
+ return
+ }
+
+ // Handle normal server message (display to the user).
+ formatServerMessage(&h.readBuf, h.hostname, message, h.plain)
+ n = copy(p, h.readBuf.Bytes())
return
- }
- // Handle normal server message (display to the user).
- formatServerMessage(&h.readBuf, h.hostname, message, h.plain)
- n = copy(p, h.readBuf.Bytes())
-
- case message := <-h.maprMessages:
- // Send mapreduce-aggregated data as a message.
- h.readBuf.WriteString("AGGREGATE")
- h.readBuf.WriteString(protocol.FieldDelimiter)
- h.readBuf.WriteString(h.hostname)
- h.readBuf.WriteString(protocol.FieldDelimiter)
- h.readBuf.WriteString(message)
- h.readBuf.WriteByte(protocol.MessageDelimiter)
- n = copy(p, h.readBuf.Bytes())
-
- case line := <-h.lines:
- if h.plain {
- h.readBuf.Write(line.Content.Bytes())
+ case message := <-h.maprMessages:
+ generation, decodedMessage := decodeGeneratedMessage(message)
+ if h.shouldDropGeneration(generation) {
+ continue
+ }
+ message = decodedMessage
+ // Send mapreduce-aggregated data as a message.
+ h.readBuf.WriteString("AGGREGATE")
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(h.hostname)
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(message)
h.readBuf.WriteByte(protocol.MessageDelimiter)
- } else {
- formatRemoteLine(
- &h.readBuf,
- h.hostname,
- fmt.Sprintf("%3d", line.TransmittedPerc),
- line.Count,
- line.SourceID,
- line.Content.Bytes(),
- )
- }
- n = copy(p, h.readBuf.Bytes())
- pool.RecycleBytesBuffer(line.Content)
- line.Recycle()
+ n = copy(p, h.readBuf.Bytes())
+ return
- case <-h.done.Done():
- err = io.EOF
- return
+ case line := <-h.lines:
+ if line == nil {
+ continue
+ }
+ if h.shouldDropGeneration(line.Generation) {
+ pool.RecycleBytesBuffer(line.Content)
+ line.Recycle()
+ continue
+ }
+ if h.plain {
+ h.readBuf.Write(line.Content.Bytes())
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ } else {
+ formatRemoteLine(
+ &h.readBuf,
+ h.hostname,
+ fmt.Sprintf("%3d", line.TransmittedPerc),
+ line.Count,
+ line.SourceID,
+ line.Content.Bytes(),
+ )
+ }
+ n = copy(p, h.readBuf.Bytes())
+ pool.RecycleBytesBuffer(line.Content)
+ line.Recycle()
+ return
- case <-poll:
- // Wake periodically so turbo mode transitions don't leave this read blocked forever.
- select {
case <-h.done.Done():
err = io.EOF
return
- default:
+
+ case <-poll:
+ // Wake periodically so turbo mode transitions don't leave this read blocked forever.
+ select {
+ case <-h.done.Done():
+ err = io.EOF
+ return
+ default:
+ }
+ return
}
- return
}
- return
}
// Write is to receive data from the dtail client via Writer interface.
@@ -288,6 +315,19 @@ func (h *baseHandler) sendln(ch chan<- string, message string) {
h.send(ch, message+"\n")
}
+func (h *baseHandler) shouldDropGeneration(generation uint64) bool {
+ if generation == 0 || h.activeGeneration == nil {
+ return false
+ }
+
+ activeGeneration := h.activeGeneration()
+ if activeGeneration == 0 {
+ return false
+ }
+
+ return activeGeneration != generation
+}
+
func (h *baseHandler) flush() {
dlog.Server.Trace(h.user, "flush()")
numUnsentMessages := func() int {