summaryrefslogtreecommitdiff
path: root/internal/server/handlers/basehandler.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-30 23:22:56 +0300
committerPaul Buetow <paul@buetow.org>2025-06-30 23:22:56 +0300
commitb4ca43d97c83c3b9da7138b3b4d6f6cce6fed370 (patch)
tree1dba534b8c7b1784f712cec90ff945e3d7fb7a82 /internal/server/handlers/basehandler.go
parent88886206c2c758bf619362aaa484dd3e254b8ed1 (diff)
fix: ensure complete data transmission in turbo mode for dtail operations
This commit fixes integration test failures in turbo mode where data was not being fully transmitted before the connection closed. The main issue was that readWithTurboProcessor was returning too quickly without ensuring all data had been written to the network stream. Key changes: - Add comprehensive trace logging to track data flow in turbo mode - Fix turbo channel draining mechanism in baseHandler.Read() to wait for all data - Add proper flushing in TurboNetworkWriter with channel drain synchronization - Increase flush timeout from 10 to 100 iterations for turbo mode data volumes - Fix color formatting in serverless mode by processing lines individually - Add synchronization delays to ensure data transmission completes The fixes ensure that all data is properly transmitted before connection closure, resolving TestDcat integration test failures when DTAIL_TURBOBOOST_ENABLE is set. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/basehandler.go')
-rw-r--r--internal/server/handlers/basehandler.go127
1 files changed, 125 insertions, 2 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index e9b1eec..b756201 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -46,6 +46,12 @@ type baseHandler struct {
quiet bool
plain bool
serverless bool
+
+ // Turbo mode support
+ turboMode bool
+ turboLines chan []byte // Pre-formatted lines for turbo mode
+ turboBuffer []byte // Buffer for partially sent turbo data
+ turboEOF chan struct{} // Signal when turbo data is complete
}
// Shutdown the handler.
@@ -62,6 +68,68 @@ func (h *baseHandler) Done() <-chan struct{} {
func (h *baseHandler) Read(p []byte) (n int, err error) {
defer h.readBuf.Reset()
+ // In turbo mode, check if we have buffered data first
+ if h.turboMode && len(h.turboBuffer) > 0 {
+ dlog.Server.Trace(h.user, "baseHandler.Read", "using buffered turbo data", "bufferedLen", len(h.turboBuffer))
+ n = copy(p, h.turboBuffer)
+ h.turboBuffer = h.turboBuffer[n:]
+ dlog.Server.Trace(h.user, "baseHandler.Read", "after buffer read", "copied", n, "remaining", len(h.turboBuffer))
+ return
+ }
+
+ // In turbo mode, prioritize pre-formatted turbo lines
+ if h.turboMode && h.turboLines != nil {
+ channelLen := len(h.turboLines)
+ dlog.Server.Trace(h.user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen)
+
+ // Try to read from the channel
+ select {
+ case turboData := <-h.turboLines:
+ dlog.Server.Trace(h.user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData))
+ n = copy(p, turboData)
+ // If we couldn't send all data, buffer the rest
+ if n < len(turboData) {
+ h.turboBuffer = turboData[n:]
+ dlog.Server.Trace(h.user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(h.turboBuffer))
+ }
+ return
+ default:
+ // No data immediately available
+ if channelLen > 0 {
+ // There's data in the channel but we couldn't get it immediately
+ // Wait a bit and try again
+ dlog.Server.Trace(h.user, "baseHandler.Read", "channel has data but not available, waiting")
+ time.Sleep(time.Millisecond)
+ select {
+ case turboData := <-h.turboLines:
+ dlog.Server.Trace(h.user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData))
+ n = copy(p, turboData)
+ if n < len(turboData) {
+ h.turboBuffer = turboData[n:]
+ }
+ return
+ default:
+ // Still no data
+ }
+ }
+
+ // Channel is truly empty, check if we should continue in turbo mode
+ // Only disable turbo mode if we've been signaled to do so
+ if h.turboEOF != nil {
+ select {
+ case <-h.turboEOF:
+ dlog.Server.Trace(h.user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode")
+ h.turboMode = false
+ default:
+ // EOF not signaled yet, continue in turbo mode
+ }
+ }
+
+ dlog.Server.Trace(h.user, "baseHandler.Read", "no data in turboLines, falling through")
+ // Fall through to normal processing
+ }
+ }
+
select {
case message := <-h.serverMessages:
if len(message) > 0 && message[0] == '.' {
@@ -288,9 +356,17 @@ func (h *baseHandler) sendln(ch chan<- string, message string) {
func (h *baseHandler) flush() {
dlog.Server.Trace(h.user, "flush()")
numUnsentMessages := func() int {
- return len(h.lines) + len(h.serverMessages) + len(h.maprMessages)
+ lineCount := len(h.lines)
+ serverCount := len(h.serverMessages)
+ maprCount := len(h.maprMessages)
+ turboCount := 0
+ if h.turboLines != nil {
+ turboCount = len(h.turboLines)
+ }
+ dlog.Server.Trace(h.user, "flush", "lines", lineCount, "server", serverCount, "mapr", maprCount, "turbo", turboCount)
+ return lineCount + serverCount + maprCount + turboCount
}
- for i := 0; i < 10; i++ {
+ for i := 0; i < 100; i++ { // Increase iterations for turbo mode
if numUnsentMessages() == 0 {
dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h))
return
@@ -329,3 +405,50 @@ func (h *baseHandler) decrementActiveCommands() int32 {
atomic.AddInt32(&h.activeCommands, -1)
return atomic.LoadInt32(&h.activeCommands)
}
+
+// EnableTurboMode enables turbo mode for direct line processing
+func (h *baseHandler) EnableTurboMode() {
+ h.turboMode = true
+ if h.turboLines == nil {
+ h.turboLines = make(chan []byte, 1000) // Large buffer for performance
+ }
+ if h.turboEOF == nil {
+ h.turboEOF = make(chan struct{})
+ }
+}
+
+// IsTurboMode returns true if turbo mode is enabled
+func (h *baseHandler) IsTurboMode() bool {
+ return h.turboMode
+}
+
+// flushTurboData ensures all turbo channel data is processed
+func (h *baseHandler) flushTurboData() {
+ if h.turboLines == nil {
+ return
+ }
+
+ dlog.Server.Debug(h.user, "Flushing turbo data", "channelLen", len(h.turboLines))
+
+ // Wait for turbo channel to drain with a timeout
+ timeout := time.After(2 * time.Second)
+ for {
+ select {
+ case <-timeout:
+ dlog.Server.Warn(h.user, "Timeout while flushing turbo data", "remaining", len(h.turboLines))
+ return
+ default:
+ if len(h.turboLines) == 0 {
+ dlog.Server.Debug(h.user, "Turbo channel drained successfully")
+ return
+ }
+ // Give the reader time to process
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+// GetTurboChannel returns the turbo lines channel for direct writing
+func (h *baseHandler) GetTurboChannel() chan<- []byte {
+ return h.turboLines
+}