summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-30 23:22:56 +0300
committerPaul Buetow <paul@buetow.org>2025-06-30 23:22:56 +0300
commitb4ca43d97c83c3b9da7138b3b4d6f6cce6fed370 (patch)
tree1dba534b8c7b1784f712cec90ff945e3d7fb7a82 /internal/server
parent88886206c2c758bf619362aaa484dd3e254b8ed1 (diff)
fix: ensure complete data transmission in turbo mode for dtail operations
This commit fixes integration test failures in turbo mode where data was not being fully transmitted before the connection closed. The main issue was that readWithTurboProcessor was returning too quickly without ensuring all data had been written to the network stream. Key changes: - Add comprehensive trace logging to track data flow in turbo mode - Fix turbo channel draining mechanism in baseHandler.Read() to wait for all data - Add proper flushing in TurboNetworkWriter with channel drain synchronization - Increase flush timeout from 10 to 100 iterations for turbo mode data volumes - Fix color formatting in serverless mode by processing lines individually - Add synchronization delays to ensure data transmission completes The fixes ensure that all data is properly transmitted before connection closure, resolving TestDcat integration test failures when DTAIL_TURBOBOOST_ENABLE is set. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/basehandler.go127
-rw-r--r--internal/server/handlers/readcommand.go102
-rw-r--r--internal/server/handlers/turbo_writer.go516
3 files changed, 720 insertions, 25 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index e9b1eec..b756201 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -46,6 +46,12 @@ type baseHandler struct {
quiet bool
plain bool
serverless bool
+
+ // Turbo mode support
+ turboMode bool
+ turboLines chan []byte // Pre-formatted lines for turbo mode
+ turboBuffer []byte // Buffer for partially sent turbo data
+ turboEOF chan struct{} // Signal when turbo data is complete
}
// Shutdown the handler.
@@ -62,6 +68,68 @@ func (h *baseHandler) Done() <-chan struct{} {
func (h *baseHandler) Read(p []byte) (n int, err error) {
defer h.readBuf.Reset()
+ // In turbo mode, check if we have buffered data first
+ if h.turboMode && len(h.turboBuffer) > 0 {
+ dlog.Server.Trace(h.user, "baseHandler.Read", "using buffered turbo data", "bufferedLen", len(h.turboBuffer))
+ n = copy(p, h.turboBuffer)
+ h.turboBuffer = h.turboBuffer[n:]
+ dlog.Server.Trace(h.user, "baseHandler.Read", "after buffer read", "copied", n, "remaining", len(h.turboBuffer))
+ return
+ }
+
+ // In turbo mode, prioritize pre-formatted turbo lines
+ if h.turboMode && h.turboLines != nil {
+ channelLen := len(h.turboLines)
+ dlog.Server.Trace(h.user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen)
+
+ // Try to read from the channel
+ select {
+ case turboData := <-h.turboLines:
+ dlog.Server.Trace(h.user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData))
+ n = copy(p, turboData)
+ // If we couldn't send all data, buffer the rest
+ if n < len(turboData) {
+ h.turboBuffer = turboData[n:]
+ dlog.Server.Trace(h.user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(h.turboBuffer))
+ }
+ return
+ default:
+ // No data immediately available
+ if channelLen > 0 {
+ // There's data in the channel but we couldn't get it immediately
+ // Wait a bit and try again
+ dlog.Server.Trace(h.user, "baseHandler.Read", "channel has data but not available, waiting")
+ time.Sleep(time.Millisecond)
+ select {
+ case turboData := <-h.turboLines:
+ dlog.Server.Trace(h.user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData))
+ n = copy(p, turboData)
+ if n < len(turboData) {
+ h.turboBuffer = turboData[n:]
+ }
+ return
+ default:
+ // Still no data
+ }
+ }
+
+ // Channel is truly empty, check if we should continue in turbo mode
+ // Only disable turbo mode if we've been signaled to do so
+ if h.turboEOF != nil {
+ select {
+ case <-h.turboEOF:
+ dlog.Server.Trace(h.user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode")
+ h.turboMode = false
+ default:
+ // EOF not signaled yet, continue in turbo mode
+ }
+ }
+
+ dlog.Server.Trace(h.user, "baseHandler.Read", "no data in turboLines, falling through")
+ // Fall through to normal processing
+ }
+ }
+
select {
case message := <-h.serverMessages:
if len(message) > 0 && message[0] == '.' {
@@ -288,9 +356,17 @@ func (h *baseHandler) sendln(ch chan<- string, message string) {
func (h *baseHandler) flush() {
dlog.Server.Trace(h.user, "flush()")
numUnsentMessages := func() int {
- return len(h.lines) + len(h.serverMessages) + len(h.maprMessages)
+ lineCount := len(h.lines)
+ serverCount := len(h.serverMessages)
+ maprCount := len(h.maprMessages)
+ turboCount := 0
+ if h.turboLines != nil {
+ turboCount = len(h.turboLines)
+ }
+ dlog.Server.Trace(h.user, "flush", "lines", lineCount, "server", serverCount, "mapr", maprCount, "turbo", turboCount)
+ return lineCount + serverCount + maprCount + turboCount
}
- for i := 0; i < 10; i++ {
+ for i := 0; i < 100; i++ { // Increase iterations for turbo mode
if numUnsentMessages() == 0 {
dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h))
return
@@ -329,3 +405,50 @@ func (h *baseHandler) decrementActiveCommands() int32 {
atomic.AddInt32(&h.activeCommands, -1)
return atomic.LoadInt32(&h.activeCommands)
}
+
+// EnableTurboMode enables turbo mode for direct line processing
+func (h *baseHandler) EnableTurboMode() {
+ h.turboMode = true
+ if h.turboLines == nil {
+ h.turboLines = make(chan []byte, 1000) // Large buffer for performance
+ }
+ if h.turboEOF == nil {
+ h.turboEOF = make(chan struct{})
+ }
+}
+
+// IsTurboMode returns true if turbo mode is enabled
+func (h *baseHandler) IsTurboMode() bool {
+ return h.turboMode
+}
+
+// flushTurboData ensures all turbo channel data is processed
+func (h *baseHandler) flushTurboData() {
+ if h.turboLines == nil {
+ return
+ }
+
+ dlog.Server.Debug(h.user, "Flushing turbo data", "channelLen", len(h.turboLines))
+
+ // Wait for turbo channel to drain with a timeout
+ timeout := time.After(2 * time.Second)
+ for {
+ select {
+ case <-timeout:
+ dlog.Server.Warn(h.user, "Timeout while flushing turbo data", "remaining", len(h.turboLines))
+ return
+ default:
+ if len(h.turboLines) == 0 {
+ dlog.Server.Debug(h.user, "Turbo channel drained successfully")
+ return
+ }
+ // Give the reader time to process
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+// GetTurboChannel returns the turbo lines channel for direct writing
+func (h *baseHandler) GetTurboChannel() chan<- []byte {
+ return h.turboLines
+}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 2245b7c..dc11aa9 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -2,7 +2,6 @@ package handlers
import (
"context"
- "fmt"
"os"
"path/filepath"
"strings"
@@ -32,7 +31,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
argc int, args []string, retries int) {
-
+
re := regex.NewNoop()
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
@@ -53,7 +52,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
// e.g.: grep foo bar.log | dmap 'from STATS select ...'
// Only read from pipe if no file argument is provided
isPipe := r.isInputFromPipe() && (argc < 2 || args[1] == "" || args[1] == "-")
-
+
if isPipe {
dlog.Server.Debug("Reading data from stdin pipe")
// Empty file path and globID "-" represents reading from the stdin pipe.
@@ -110,7 +109,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re)
}
wg.Wait()
-
+
// In turbo mode with aggregate, we don't close the shared channel here
// because it will be used across multiple invocations
// The aggregate will handle channel closure when it's done
@@ -132,7 +131,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
-
+
dlog.Server.Info(r.server.user, "Start reading", path, globID)
var reader fs.FileReader
var limiter chan struct{}
@@ -173,16 +172,11 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
// Check if we should use the turbo boost optimizations
turboBoostEnabled := config.Env("DTAIL_TURBOBOOST_ENABLE")
- dlog.Server.Info(r.server.user, "Turbo boost check: enabled=", turboBoostEnabled, "mode=", r.mode)
- // Only enable channel-less for server mode, not serverless mode
- // Use the serverless field directly as it's more reliable
// Enable turbo boost for cat/grep/tail modes, but NOT for aggregate (MapReduce) operations
// MapReduce requires the traditional channel-based approach to work correctly
- if turboBoostEnabled && !r.server.serverless && r.server.aggregate == nil &&
+ if turboBoostEnabled && r.server.aggregate == nil &&
(r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
- // Log to stderr for testing verification - only in server mode
- fmt.Fprintf(os.Stderr, "[DTAIL] Turbo boost enabled: using channel-less implementation for %s\n", path)
- r.readWithProcessor(ctx, ltx, path, globID, re, reader)
+ r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader)
return
}
@@ -222,7 +216,6 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
path, globID string, re regex.Regex, reader fs.FileReader) {
dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID)
-
// Use the existing lines channel but with the processor-based reader
lines := r.server.lines
@@ -230,15 +223,6 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
// Use the optimized version if turbo boost is enabled
turboBoostEnabled := config.Env("DTAIL_TURBOBOOST_ENABLE")
-
- // Log to stderr for testing verification - only in server mode
- if !r.server.serverless {
- if turboBoostEnabled {
- fmt.Fprintf(os.Stderr, "[DTAIL] Turbo boost enabled: using optimized reader for %s\n", path)
- } else {
- fmt.Fprintf(os.Stderr, "[DTAIL] Using standard processor reader for %s\n", path)
- }
- }
for {
if aggregate != nil {
@@ -258,7 +242,7 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
} else {
err = reader.StartWithProcessor(ctx, ltx, processor, re)
}
-
+
if err != nil {
dlog.Server.Error(r.server.user, path, globID, err)
}
@@ -281,6 +265,78 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
}
}
+func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.LContext,
+ path, globID string, re regex.Regex, reader fs.FileReader) {
+
+ dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID)
+
+ // Enable turbo mode if not already enabled
+ if !r.server.IsTurboMode() {
+ r.server.EnableTurboMode()
+ }
+
+ // Create a direct writer based on the mode
+ // In serverless mode, write directly to stdout
+ // In server mode, use the turbo channel
+ var writer TurboWriter
+ if r.server.serverless {
+ // In serverless mode, write directly to stdout
+ writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless)
+ } else {
+ // In server mode, use the network writer with turbo channels
+ writer = &TurboNetworkWriter{
+ handler: &r.server.baseHandler,
+ hostname: r.server.hostname,
+ plain: r.server.plain,
+ serverless: r.server.serverless,
+ }
+ }
+
+ for {
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration")
+
+ // Create a direct processor that writes without channels
+ processor := NewDirectLineProcessor(writer, globID)
+
+ // Use the optimized reader
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start")
+ err := reader.StartWithProcessorOptimized(ctx, ltx, processor, re)
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> completed")
+ if err != nil {
+ dlog.Server.Error(r.server.user, path, globID, err)
+ }
+
+ // Ensure we flush before closing
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> flushing processor")
+ if flushErr := processor.Flush(); flushErr != nil {
+ dlog.Server.Error(r.server.user, path, globID, "flush error", flushErr)
+ }
+
+ // Close the processor after each iteration
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> closing processor")
+ processor.Close()
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> processor closed")
+
+ // Give time for data to be transmitted
+ // This is crucial for integration tests to ensure all data is sent
+ if !r.server.serverless {
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> waiting for data transmission")
+ time.Sleep(50 * time.Millisecond)
+ }
+
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if !reader.Retry() {
+ return
+ }
+ }
+ time.Sleep(time.Second * 2)
+ dlog.Server.Info(path, globID, "Reading file again")
+ }
+}
+
func (r *readCommand) makeGlobID(path, glob string) string {
var idParts []string
pathParts := strings.Split(path, "/")
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go
new file mode 100644
index 0000000..01755a5
--- /dev/null
+++ b/internal/server/handlers/turbo_writer.go
@@ -0,0 +1,516 @@
+package handlers
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/mimecast/dtail/internal/color/brush"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+// TurboWriter defines the interface for direct writing in turbo mode
+type TurboWriter interface {
+ // WriteLineData writes formatted line data directly to output
+ WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error
+ // WriteServerMessage writes a server message
+ WriteServerMessage(message string) error
+ // Flush ensures all buffered data is written
+ Flush() error
+}
+
+// DirectTurboWriter implements TurboWriter for direct network writing
+type DirectTurboWriter struct {
+ writer io.Writer
+ hostname string
+ plain bool
+ serverless bool
+
+ // Buffering for efficiency
+ writeBuf bytes.Buffer
+ bufSize int
+ mutex sync.Mutex
+
+ // Stats
+ linesWritten uint64
+ bytesWritten uint64
+}
+
+// NewDirectTurboWriter creates a new turbo writer
+func NewDirectTurboWriter(writer io.Writer, hostname string, plain, serverless bool) *DirectTurboWriter {
+ return &DirectTurboWriter{
+ writer: writer,
+ hostname: hostname,
+ plain: plain,
+ serverless: serverless,
+ bufSize: 64 * 1024, // 64KB buffer
+ }
+}
+
+// WriteLineData writes formatted line data directly to output
+func (w *DirectTurboWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // In serverless mode with colors, write each line immediately
+ if w.serverless && !w.plain {
+ // Build the complete line in a temporary buffer
+ var lineBuf bytes.Buffer
+ lineBuf.WriteString("REMOTE")
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString(w.hostname)
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString("100")
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString(sourceID)
+ lineBuf.WriteString(protocol.FieldDelimiter)
+
+ // Remove trailing newline if present (it will be added back after coloring)
+ content := lineContent
+ if len(content) > 0 && content[len(content)-1] == '\n' {
+ content = content[:len(content)-1]
+ }
+ lineBuf.Write(content)
+
+ // Apply color formatting
+ coloredLine := brush.Colorfy(lineBuf.String())
+
+ // Write directly to output with newline
+ _, err := w.writer.Write([]byte(coloredLine + "\n"))
+ w.linesWritten++
+ w.bytesWritten += uint64(len(coloredLine) + 1)
+ return err
+ }
+
+ // Build the output line
+ if !w.plain {
+ w.writeBuf.WriteString("REMOTE")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(w.hostname)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ // For direct writing, we don't have transmittedPerc, so use 100
+ w.writeBuf.WriteString("100")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(sourceID)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ }
+
+ // Write the actual line content
+ w.writeBuf.Write(lineContent)
+
+ // In plain mode, ensure line has a newline if it doesn't already
+ if w.plain && len(lineContent) > 0 && lineContent[len(lineContent)-1] != '\n' {
+ w.writeBuf.WriteByte('\n')
+ }
+
+ // Only add message delimiter in non-plain, non-serverless mode
+ // In serverless mode, we output lines directly
+ if !w.plain && !w.serverless {
+ w.writeBuf.WriteByte(protocol.MessageDelimiter)
+ }
+
+ // Update stats
+ w.linesWritten++
+ w.bytesWritten += uint64(w.writeBuf.Len())
+
+ // Flush if buffer is getting full or in serverless mode
+ if w.writeBuf.Len() >= w.bufSize || w.serverless {
+ return w.flushBuffer()
+ }
+
+ return nil
+}
+
+// WriteServerMessage writes a server message
+func (w *DirectTurboWriter) WriteServerMessage(message string) error {
+ if w.serverless {
+ return nil
+ }
+
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // Skip empty server messages when in plain mode
+ if w.plain && (message == "" || message == "\n") {
+ return nil
+ }
+
+ // Handle hidden messages
+ if len(message) > 0 && message[0] == '.' {
+ w.writeBuf.WriteString(message)
+ w.writeBuf.WriteByte(protocol.MessageDelimiter)
+ return w.flushBuffer()
+ }
+
+ // Handle normal server message
+ if !w.plain {
+ w.writeBuf.WriteString("SERVER")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(w.hostname)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ }
+ w.writeBuf.WriteString(message)
+ w.writeBuf.WriteByte(protocol.MessageDelimiter)
+
+ return w.flushBuffer()
+}
+
+// Flush ensures all buffered data is written
+func (w *DirectTurboWriter) Flush() error {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // Force flush any remaining data
+ err := w.flushBuffer()
+
+ // For serverless mode, ensure everything is written to output
+ if w.serverless {
+ // Ensure writer is flushed if it supports it
+ if flusher, ok := w.writer.(interface{ Flush() error }); ok {
+ flusher.Flush()
+ }
+ }
+
+ return err
+}
+
+// flushBuffer writes the buffer content to the writer (must be called with mutex held)
+func (w *DirectTurboWriter) flushBuffer() error {
+ if w.writeBuf.Len() == 0 {
+ return nil
+ }
+
+ data := w.writeBuf.Bytes()
+
+ // In serverless mode with colors, data is already processed line by line
+ // so we don't need to do any additional formatting here
+
+ _, err := w.writer.Write(data)
+ w.writeBuf.Reset()
+
+ return err
+}
+
+// Stats returns writing statistics
+func (w *DirectTurboWriter) Stats() (linesWritten, bytesWritten uint64) {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ return w.linesWritten, w.bytesWritten
+}
+
+// TurboChannelWriter writes pre-formatted data to a turbo channel
+type TurboChannelWriter struct {
+ channel chan<- []byte
+ hostname string
+ plain bool
+ serverless bool
+
+ // Buffering for efficiency
+ writeBuf bytes.Buffer
+ bufSize int
+ mutex sync.Mutex
+
+ // Stats
+ linesWritten uint64
+ bytesWritten uint64
+}
+
+// NewTurboChannelWriter creates a writer that sends to a turbo channel
+func NewTurboChannelWriter(channel chan<- []byte, hostname string, plain, serverless bool) *TurboChannelWriter {
+ return &TurboChannelWriter{
+ channel: channel,
+ hostname: hostname,
+ plain: plain,
+ serverless: serverless,
+ bufSize: 64 * 1024, // 64KB buffer
+ }
+}
+
+// WriteLineData formats and writes line data to the turbo channel
+func (w *TurboChannelWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // Build the output line
+ if !w.plain && !w.serverless {
+ w.writeBuf.WriteString("REMOTE")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(w.hostname)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ // For direct writing, we don't have transmittedPerc, so use 100
+ w.writeBuf.WriteString("100")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(sourceID)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ }
+
+ // Write the actual line content (already includes line endings)
+ w.writeBuf.Write(lineContent)
+ w.writeBuf.WriteByte(protocol.MessageDelimiter)
+
+ // Update stats
+ w.linesWritten++
+ w.bytesWritten += uint64(w.writeBuf.Len())
+
+ // Send to channel
+ data := make([]byte, w.writeBuf.Len())
+ copy(data, w.writeBuf.Bytes())
+ w.writeBuf.Reset()
+
+ select {
+ case w.channel <- data:
+ return nil
+ default:
+ return fmt.Errorf("turbo channel full")
+ }
+}
+
+// WriteServerMessage writes a server message
+func (w *TurboChannelWriter) WriteServerMessage(message string) error {
+ if w.serverless {
+ return nil
+ }
+
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // Skip empty server messages when in plain mode
+ if w.plain && (message == "" || message == "\n") {
+ return nil
+ }
+
+ var buf bytes.Buffer
+
+ // Handle hidden messages
+ if len(message) > 0 && message[0] == '.' {
+ buf.WriteString(message)
+ buf.WriteByte(protocol.MessageDelimiter)
+ } else {
+ // Handle normal server message
+ if !w.plain {
+ buf.WriteString("SERVER")
+ buf.WriteString(protocol.FieldDelimiter)
+ buf.WriteString(w.hostname)
+ buf.WriteString(protocol.FieldDelimiter)
+ }
+ buf.WriteString(message)
+ buf.WriteByte(protocol.MessageDelimiter)
+ }
+
+ data := buf.Bytes()
+ select {
+ case w.channel <- data:
+ return nil
+ default:
+ return fmt.Errorf("turbo channel full")
+ }
+}
+
+// Flush is a no-op for channel writer as data is sent immediately
+func (w *TurboChannelWriter) Flush() error {
+ return nil
+}
+
+// Stats returns writing statistics
+func (w *TurboChannelWriter) Stats() (linesWritten, bytesWritten uint64) {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ return w.linesWritten, w.bytesWritten
+}
+
+// TurboNetworkWriter writes directly to the network connection bypassing channels
+type TurboNetworkWriter struct {
+ handler *baseHandler
+ hostname string
+ plain bool
+ serverless bool
+
+ // Internal buffer for batching writes
+ writeBuf bytes.Buffer
+ bufSize int
+ mutex sync.Mutex
+
+ // Direct output channel for turbo mode
+ outputChan chan []byte
+
+ // Stats
+ linesWritten uint64
+ bytesWritten uint64
+
+ // Track if we've signaled EOF
+ eofSignaled bool
+}
+
+// WriteLineData formats and writes line data directly
+func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "lineNum", lineNum, "sourceID", sourceID, "contentLen", len(lineContent))
+
+ // Build the output line
+ if !w.plain && !w.serverless {
+ w.writeBuf.WriteString("REMOTE")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(w.hostname)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ // For direct writing, we don't have transmittedPerc, so use 100
+ w.writeBuf.WriteString("100")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(sourceID)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ }
+
+ // Write the actual line content (already includes line endings)
+ w.writeBuf.Write(lineContent)
+ w.writeBuf.WriteByte(protocol.MessageDelimiter)
+
+ // Update stats
+ w.linesWritten++
+ w.bytesWritten += uint64(w.writeBuf.Len())
+
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "linesWritten", w.linesWritten, "bytesWritten", w.bytesWritten, "bufSize", w.writeBuf.Len())
+
+ // Write directly to the turbo channel
+ if w.handler.turboLines != nil {
+ data := make([]byte, w.writeBuf.Len())
+ copy(data, w.writeBuf.Bytes())
+
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sending to turboLines channel", "dataLen", len(data))
+
+ // Send data to turbo channel with a larger buffer
+ select {
+ case w.handler.turboLines <- data:
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sent to channel successfully")
+ w.writeBuf.Reset()
+ return nil
+ default:
+ // Channel full, wait a bit and retry
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "channel full, waiting before retry")
+ time.Sleep(time.Millisecond)
+ w.handler.turboLines <- data
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sent to channel after retry")
+ w.writeBuf.Reset()
+ return nil
+ }
+ }
+
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "turboLines channel is nil")
+ w.writeBuf.Reset()
+ return nil
+}
+
+// WriteServerMessage writes a server message
+func (w *TurboNetworkWriter) WriteServerMessage(message string) error {
+ // Server messages are less critical in turbo mode
+ // We can send them through the normal channel
+ if w.handler != nil && w.handler.serverMessages != nil {
+ select {
+ case w.handler.serverMessages <- message:
+ return nil
+ default:
+ return fmt.Errorf("server message channel full")
+ }
+ }
+ return nil
+}
+
+// Flush ensures all data is written
+func (w *TurboNetworkWriter) Flush() error {
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "called", "eofSignaled", w.eofSignaled)
+
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // If we have any buffered data, send it now
+ if w.writeBuf.Len() > 0 {
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "flushing buffered data", "bufSize", w.writeBuf.Len())
+
+ if w.handler.turboLines != nil {
+ data := make([]byte, w.writeBuf.Len())
+ copy(data, w.writeBuf.Bytes())
+
+ // Force send the data
+ w.handler.turboLines <- data
+ w.writeBuf.Reset()
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel")
+ }
+ }
+
+ // Wait for the channel to have space before signaling EOF
+ // This ensures data has been sent
+ if !w.eofSignaled && w.handler.turboEOF != nil {
+ // Wait until channel has been drained somewhat
+ for i := 0; i < 100 && len(w.handler.turboLines) > 900; i++ {
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.handler.turboLines))
+ time.Sleep(10 * time.Millisecond)
+ }
+
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "signaling EOF", "channelLen", len(w.handler.turboLines))
+ close(w.handler.turboEOF)
+ w.eofSignaled = true
+ }
+
+ // Wait a bit to ensure data is processed
+ // This is crucial for integration tests
+ time.Sleep(10 * time.Millisecond)
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "completed")
+
+ return nil
+}
+
+// DirectLineProcessor processes lines directly without channels in turbo mode
+type DirectLineProcessor struct {
+ writer TurboWriter
+ globID string
+ lineCount uint64
+}
+
+// NewDirectLineProcessor creates a processor that writes directly
+func NewDirectLineProcessor(writer TurboWriter, globID string) *DirectLineProcessor {
+ return &DirectLineProcessor{
+ writer: writer,
+ globID: globID,
+ }
+}
+
+// ProcessLine writes a line directly to the output
+func (p *DirectLineProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error {
+ p.lineCount++
+
+ dlog.Server.Trace("DirectLineProcessor.ProcessLine", "lineCount", p.lineCount, "lineNum", lineNum, "sourceID", sourceID)
+
+ // Write directly to output
+ err := p.writer.WriteLineData(lineContent.Bytes(), lineNum, sourceID)
+
+ // Recycle the buffer
+ pool.RecycleBytesBuffer(lineContent)
+
+ return err
+}
+
+// Flush ensures all data is written
+func (p *DirectLineProcessor) Flush() error {
+ dlog.Server.Trace("DirectLineProcessor.Flush", "lineCount", p.lineCount)
+ return p.writer.Flush()
+}
+
+// Close flushes any remaining data
+func (p *DirectLineProcessor) Close() error {
+ dlog.Server.Trace("DirectLineProcessor.Close", "lineCount", p.lineCount)
+ return p.writer.Flush()
+}