summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/basehandler.go127
-rw-r--r--internal/server/handlers/readcommand.go102
-rw-r--r--internal/server/handlers/turbo_writer.go516
3 files changed, 720 insertions, 25 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index e9b1eec..b756201 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -46,6 +46,12 @@ type baseHandler struct {
quiet bool
plain bool
serverless bool
+
+ // Turbo mode support
+ turboMode bool
+ turboLines chan []byte // Pre-formatted lines for turbo mode
+ turboBuffer []byte // Buffer for partially sent turbo data
+ turboEOF chan struct{} // Signal when turbo data is complete
}
// Shutdown the handler.
@@ -62,6 +68,68 @@ func (h *baseHandler) Done() <-chan struct{} {
func (h *baseHandler) Read(p []byte) (n int, err error) {
defer h.readBuf.Reset()
+ // In turbo mode, check if we have buffered data first
+ if h.turboMode && len(h.turboBuffer) > 0 {
+ dlog.Server.Trace(h.user, "baseHandler.Read", "using buffered turbo data", "bufferedLen", len(h.turboBuffer))
+ n = copy(p, h.turboBuffer)
+ h.turboBuffer = h.turboBuffer[n:]
+ dlog.Server.Trace(h.user, "baseHandler.Read", "after buffer read", "copied", n, "remaining", len(h.turboBuffer))
+ return
+ }
+
+ // In turbo mode, prioritize pre-formatted turbo lines
+ if h.turboMode && h.turboLines != nil {
+ channelLen := len(h.turboLines)
+ dlog.Server.Trace(h.user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen)
+
+ // Try to read from the channel
+ select {
+ case turboData := <-h.turboLines:
+ dlog.Server.Trace(h.user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData))
+ n = copy(p, turboData)
+ // If we couldn't send all data, buffer the rest
+ if n < len(turboData) {
+ h.turboBuffer = turboData[n:]
+ dlog.Server.Trace(h.user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(h.turboBuffer))
+ }
+ return
+ default:
+ // No data immediately available
+ if channelLen > 0 {
+ // There's data in the channel but we couldn't get it immediately
+ // Wait a bit and try again
+ dlog.Server.Trace(h.user, "baseHandler.Read", "channel has data but not available, waiting")
+ time.Sleep(time.Millisecond)
+ select {
+ case turboData := <-h.turboLines:
+ dlog.Server.Trace(h.user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData))
+ n = copy(p, turboData)
+ if n < len(turboData) {
+ h.turboBuffer = turboData[n:]
+ }
+ return
+ default:
+ // Still no data
+ }
+ }
+
+ // Channel is truly empty, check if we should continue in turbo mode
+ // Only disable turbo mode if we've been signaled to do so
+ if h.turboEOF != nil {
+ select {
+ case <-h.turboEOF:
+ dlog.Server.Trace(h.user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode")
+ h.turboMode = false
+ default:
+ // EOF not signaled yet, continue in turbo mode
+ }
+ }
+
+ dlog.Server.Trace(h.user, "baseHandler.Read", "no data in turboLines, falling through")
+ // Fall through to normal processing
+ }
+ }
+
select {
case message := <-h.serverMessages:
if len(message) > 0 && message[0] == '.' {
@@ -288,9 +356,17 @@ func (h *baseHandler) sendln(ch chan<- string, message string) {
func (h *baseHandler) flush() {
dlog.Server.Trace(h.user, "flush()")
numUnsentMessages := func() int {
- return len(h.lines) + len(h.serverMessages) + len(h.maprMessages)
+ lineCount := len(h.lines)
+ serverCount := len(h.serverMessages)
+ maprCount := len(h.maprMessages)
+ turboCount := 0
+ if h.turboLines != nil {
+ turboCount = len(h.turboLines)
+ }
+ dlog.Server.Trace(h.user, "flush", "lines", lineCount, "server", serverCount, "mapr", maprCount, "turbo", turboCount)
+ return lineCount + serverCount + maprCount + turboCount
}
- for i := 0; i < 10; i++ {
+ for i := 0; i < 100; i++ { // Increase iterations for turbo mode
if numUnsentMessages() == 0 {
dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h))
return
@@ -329,3 +405,50 @@ func (h *baseHandler) decrementActiveCommands() int32 {
atomic.AddInt32(&h.activeCommands, -1)
return atomic.LoadInt32(&h.activeCommands)
}
+
+// EnableTurboMode enables turbo mode for direct line processing
+func (h *baseHandler) EnableTurboMode() {
+ h.turboMode = true
+ if h.turboLines == nil {
+ h.turboLines = make(chan []byte, 1000) // Large buffer for performance
+ }
+ if h.turboEOF == nil {
+ h.turboEOF = make(chan struct{})
+ }
+}
+
+// IsTurboMode returns true if turbo mode is enabled
+func (h *baseHandler) IsTurboMode() bool {
+ return h.turboMode
+}
+
+// flushTurboData ensures all turbo channel data is processed
+func (h *baseHandler) flushTurboData() {
+ if h.turboLines == nil {
+ return
+ }
+
+ dlog.Server.Debug(h.user, "Flushing turbo data", "channelLen", len(h.turboLines))
+
+ // Wait for turbo channel to drain with a timeout
+ timeout := time.After(2 * time.Second)
+ for {
+ select {
+ case <-timeout:
+ dlog.Server.Warn(h.user, "Timeout while flushing turbo data", "remaining", len(h.turboLines))
+ return
+ default:
+ if len(h.turboLines) == 0 {
+ dlog.Server.Debug(h.user, "Turbo channel drained successfully")
+ return
+ }
+ // Give the reader time to process
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+// GetTurboChannel returns the turbo lines channel for direct writing
+func (h *baseHandler) GetTurboChannel() chan<- []byte {
+ return h.turboLines
+}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 2245b7c..dc11aa9 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -2,7 +2,6 @@ package handlers
import (
"context"
- "fmt"
"os"
"path/filepath"
"strings"
@@ -32,7 +31,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
argc int, args []string, retries int) {
-
+
re := regex.NewNoop()
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
@@ -53,7 +52,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
// e.g.: grep foo bar.log | dmap 'from STATS select ...'
// Only read from pipe if no file argument is provided
isPipe := r.isInputFromPipe() && (argc < 2 || args[1] == "" || args[1] == "-")
-
+
if isPipe {
dlog.Server.Debug("Reading data from stdin pipe")
// Empty file path and globID "-" represents reading from the stdin pipe.
@@ -110,7 +109,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re)
}
wg.Wait()
-
+
// In turbo mode with aggregate, we don't close the shared channel here
// because it will be used across multiple invocations
// The aggregate will handle channel closure when it's done
@@ -132,7 +131,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
-
+
dlog.Server.Info(r.server.user, "Start reading", path, globID)
var reader fs.FileReader
var limiter chan struct{}
@@ -173,16 +172,11 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
// Check if we should use the turbo boost optimizations
turboBoostEnabled := config.Env("DTAIL_TURBOBOOST_ENABLE")
- dlog.Server.Info(r.server.user, "Turbo boost check: enabled=", turboBoostEnabled, "mode=", r.mode)
- // Only enable channel-less for server mode, not serverless mode
- // Use the serverless field directly as it's more reliable
// Enable turbo boost for cat/grep/tail modes, but NOT for aggregate (MapReduce) operations
// MapReduce requires the traditional channel-based approach to work correctly
- if turboBoostEnabled && !r.server.serverless && r.server.aggregate == nil &&
+ if turboBoostEnabled && r.server.aggregate == nil &&
(r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
- // Log to stderr for testing verification - only in server mode
- fmt.Fprintf(os.Stderr, "[DTAIL] Turbo boost enabled: using channel-less implementation for %s\n", path)
- r.readWithProcessor(ctx, ltx, path, globID, re, reader)
+ r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader)
return
}
@@ -222,7 +216,6 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
path, globID string, re regex.Regex, reader fs.FileReader) {
dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID)
-
// Use the existing lines channel but with the processor-based reader
lines := r.server.lines
@@ -230,15 +223,6 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
// Use the optimized version if turbo boost is enabled
turboBoostEnabled := config.Env("DTAIL_TURBOBOOST_ENABLE")
-
- // Log to stderr for testing verification - only in server mode
- if !r.server.serverless {
- if turboBoostEnabled {
- fmt.Fprintf(os.Stderr, "[DTAIL] Turbo boost enabled: using optimized reader for %s\n", path)
- } else {
- fmt.Fprintf(os.Stderr, "[DTAIL] Using standard processor reader for %s\n", path)
- }
- }
for {
if aggregate != nil {
@@ -258,7 +242,7 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
} else {
err = reader.StartWithProcessor(ctx, ltx, processor, re)
}
-
+
if err != nil {
dlog.Server.Error(r.server.user, path, globID, err)
}
@@ -281,6 +265,78 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
}
}
+func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.LContext,
+ path, globID string, re regex.Regex, reader fs.FileReader) {
+
+ dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID)
+
+ // Enable turbo mode if not already enabled
+ if !r.server.IsTurboMode() {
+ r.server.EnableTurboMode()
+ }
+
+ // Create a direct writer based on the mode
+ // In serverless mode, write directly to stdout
+ // In server mode, use the turbo channel
+ var writer TurboWriter
+ if r.server.serverless {
+ // In serverless mode, write directly to stdout
+ writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless)
+ } else {
+ // In server mode, use the network writer with turbo channels
+ writer = &TurboNetworkWriter{
+ handler: &r.server.baseHandler,
+ hostname: r.server.hostname,
+ plain: r.server.plain,
+ serverless: r.server.serverless,
+ }
+ }
+
+ for {
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration")
+
+ // Create a direct processor that writes without channels
+ processor := NewDirectLineProcessor(writer, globID)
+
+ // Use the optimized reader
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start")
+ err := reader.StartWithProcessorOptimized(ctx, ltx, processor, re)
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> completed")
+ if err != nil {
+ dlog.Server.Error(r.server.user, path, globID, err)
+ }
+
+ // Ensure we flush before closing
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> flushing processor")
+ if flushErr := processor.Flush(); flushErr != nil {
+ dlog.Server.Error(r.server.user, path, globID, "flush error", flushErr)
+ }
+
+ // Close the processor after each iteration
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> closing processor")
+ processor.Close()
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> processor closed")
+
+ // Give time for data to be transmitted
+ // This is crucial for integration tests to ensure all data is sent
+ if !r.server.serverless {
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> waiting for data transmission")
+ time.Sleep(50 * time.Millisecond)
+ }
+
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if !reader.Retry() {
+ return
+ }
+ }
+ time.Sleep(time.Second * 2)
+ dlog.Server.Info(path, globID, "Reading file again")
+ }
+}
+
func (r *readCommand) makeGlobID(path, glob string) string {
var idParts []string
pathParts := strings.Split(path, "/")
diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go
new file mode 100644
index 0000000..01755a5
--- /dev/null
+++ b/internal/server/handlers/turbo_writer.go
@@ -0,0 +1,516 @@
+package handlers
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/mimecast/dtail/internal/color/brush"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+// TurboWriter defines the interface for direct writing in turbo mode
+type TurboWriter interface {
+ // WriteLineData writes formatted line data directly to output
+ WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error
+ // WriteServerMessage writes a server message
+ WriteServerMessage(message string) error
+ // Flush ensures all buffered data is written
+ Flush() error
+}
+
+// DirectTurboWriter implements TurboWriter for direct network writing
+type DirectTurboWriter struct {
+ writer io.Writer
+ hostname string
+ plain bool
+ serverless bool
+
+ // Buffering for efficiency
+ writeBuf bytes.Buffer
+ bufSize int
+ mutex sync.Mutex
+
+ // Stats
+ linesWritten uint64
+ bytesWritten uint64
+}
+
+// NewDirectTurboWriter creates a new turbo writer
+func NewDirectTurboWriter(writer io.Writer, hostname string, plain, serverless bool) *DirectTurboWriter {
+ return &DirectTurboWriter{
+ writer: writer,
+ hostname: hostname,
+ plain: plain,
+ serverless: serverless,
+ bufSize: 64 * 1024, // 64KB buffer
+ }
+}
+
+// WriteLineData writes formatted line data directly to output
+func (w *DirectTurboWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // In serverless mode with colors, write each line immediately
+ if w.serverless && !w.plain {
+ // Build the complete line in a temporary buffer
+ var lineBuf bytes.Buffer
+ lineBuf.WriteString("REMOTE")
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString(w.hostname)
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString("100")
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ lineBuf.WriteString(protocol.FieldDelimiter)
+ lineBuf.WriteString(sourceID)
+ lineBuf.WriteString(protocol.FieldDelimiter)
+
+ // Remove trailing newline if present (it will be added back after coloring)
+ content := lineContent
+ if len(content) > 0 && content[len(content)-1] == '\n' {
+ content = content[:len(content)-1]
+ }
+ lineBuf.Write(content)
+
+ // Apply color formatting
+ coloredLine := brush.Colorfy(lineBuf.String())
+
+ // Write directly to output with newline
+ _, err := w.writer.Write([]byte(coloredLine + "\n"))
+ w.linesWritten++
+ w.bytesWritten += uint64(len(coloredLine) + 1)
+ return err
+ }
+
+ // Build the output line
+ if !w.plain {
+ w.writeBuf.WriteString("REMOTE")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(w.hostname)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ // For direct writing, we don't have transmittedPerc, so use 100
+ w.writeBuf.WriteString("100")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(sourceID)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ }
+
+ // Write the actual line content
+ w.writeBuf.Write(lineContent)
+
+ // In plain mode, ensure line has a newline if it doesn't already
+ if w.plain && len(lineContent) > 0 && lineContent[len(lineContent)-1] != '\n' {
+ w.writeBuf.WriteByte('\n')
+ }
+
+ // Only add message delimiter in non-plain, non-serverless mode
+ // In serverless mode, we output lines directly
+ if !w.plain && !w.serverless {
+ w.writeBuf.WriteByte(protocol.MessageDelimiter)
+ }
+
+ // Update stats
+ w.linesWritten++
+ w.bytesWritten += uint64(w.writeBuf.Len())
+
+ // Flush if buffer is getting full or in serverless mode
+ if w.writeBuf.Len() >= w.bufSize || w.serverless {
+ return w.flushBuffer()
+ }
+
+ return nil
+}
+
+// WriteServerMessage writes a server message
+func (w *DirectTurboWriter) WriteServerMessage(message string) error {
+ if w.serverless {
+ return nil
+ }
+
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // Skip empty server messages when in plain mode
+ if w.plain && (message == "" || message == "\n") {
+ return nil
+ }
+
+ // Handle hidden messages
+ if len(message) > 0 && message[0] == '.' {
+ w.writeBuf.WriteString(message)
+ w.writeBuf.WriteByte(protocol.MessageDelimiter)
+ return w.flushBuffer()
+ }
+
+ // Handle normal server message
+ if !w.plain {
+ w.writeBuf.WriteString("SERVER")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(w.hostname)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ }
+ w.writeBuf.WriteString(message)
+ w.writeBuf.WriteByte(protocol.MessageDelimiter)
+
+ return w.flushBuffer()
+}
+
+// Flush ensures all buffered data is written
+func (w *DirectTurboWriter) Flush() error {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // Force flush any remaining data
+ err := w.flushBuffer()
+
+ // For serverless mode, ensure everything is written to output
+ if w.serverless {
+ // Ensure writer is flushed if it supports it
+ if flusher, ok := w.writer.(interface{ Flush() error }); ok {
+ flusher.Flush()
+ }
+ }
+
+ return err
+}
+
+// flushBuffer writes the buffer content to the writer (must be called with mutex held)
+func (w *DirectTurboWriter) flushBuffer() error {
+ if w.writeBuf.Len() == 0 {
+ return nil
+ }
+
+ data := w.writeBuf.Bytes()
+
+ // In serverless mode with colors, data is already processed line by line
+ // so we don't need to do any additional formatting here
+
+ _, err := w.writer.Write(data)
+ w.writeBuf.Reset()
+
+ return err
+}
+
+// Stats returns writing statistics
+func (w *DirectTurboWriter) Stats() (linesWritten, bytesWritten uint64) {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ return w.linesWritten, w.bytesWritten
+}
+
+// TurboChannelWriter writes pre-formatted data to a turbo channel
+type TurboChannelWriter struct {
+ channel chan<- []byte
+ hostname string
+ plain bool
+ serverless bool
+
+ // Buffering for efficiency
+ writeBuf bytes.Buffer
+ bufSize int
+ mutex sync.Mutex
+
+ // Stats
+ linesWritten uint64
+ bytesWritten uint64
+}
+
+// NewTurboChannelWriter creates a writer that sends to a turbo channel
+func NewTurboChannelWriter(channel chan<- []byte, hostname string, plain, serverless bool) *TurboChannelWriter {
+ return &TurboChannelWriter{
+ channel: channel,
+ hostname: hostname,
+ plain: plain,
+ serverless: serverless,
+ bufSize: 64 * 1024, // 64KB buffer
+ }
+}
+
+// WriteLineData formats and writes line data to the turbo channel
+func (w *TurboChannelWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // Build the output line
+ if !w.plain && !w.serverless {
+ w.writeBuf.WriteString("REMOTE")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(w.hostname)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ // For direct writing, we don't have transmittedPerc, so use 100
+ w.writeBuf.WriteString("100")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(sourceID)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ }
+
+ // Write the actual line content (already includes line endings)
+ w.writeBuf.Write(lineContent)
+ w.writeBuf.WriteByte(protocol.MessageDelimiter)
+
+ // Update stats
+ w.linesWritten++
+ w.bytesWritten += uint64(w.writeBuf.Len())
+
+ // Send to channel
+ data := make([]byte, w.writeBuf.Len())
+ copy(data, w.writeBuf.Bytes())
+ w.writeBuf.Reset()
+
+ select {
+ case w.channel <- data:
+ return nil
+ default:
+ return fmt.Errorf("turbo channel full")
+ }
+}
+
+// WriteServerMessage writes a server message
+func (w *TurboChannelWriter) WriteServerMessage(message string) error {
+ if w.serverless {
+ return nil
+ }
+
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // Skip empty server messages when in plain mode
+ if w.plain && (message == "" || message == "\n") {
+ return nil
+ }
+
+ var buf bytes.Buffer
+
+ // Handle hidden messages
+ if len(message) > 0 && message[0] == '.' {
+ buf.WriteString(message)
+ buf.WriteByte(protocol.MessageDelimiter)
+ } else {
+ // Handle normal server message
+ if !w.plain {
+ buf.WriteString("SERVER")
+ buf.WriteString(protocol.FieldDelimiter)
+ buf.WriteString(w.hostname)
+ buf.WriteString(protocol.FieldDelimiter)
+ }
+ buf.WriteString(message)
+ buf.WriteByte(protocol.MessageDelimiter)
+ }
+
+ data := buf.Bytes()
+ select {
+ case w.channel <- data:
+ return nil
+ default:
+ return fmt.Errorf("turbo channel full")
+ }
+}
+
+// Flush is a no-op for channel writer as data is sent immediately
+func (w *TurboChannelWriter) Flush() error {
+ return nil
+}
+
+// Stats returns writing statistics
+func (w *TurboChannelWriter) Stats() (linesWritten, bytesWritten uint64) {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ return w.linesWritten, w.bytesWritten
+}
+
+// TurboNetworkWriter writes directly to the network connection bypassing channels
+type TurboNetworkWriter struct {
+ handler *baseHandler
+ hostname string
+ plain bool
+ serverless bool
+
+ // Internal buffer for batching writes
+ writeBuf bytes.Buffer
+ bufSize int
+ mutex sync.Mutex
+
+ // Direct output channel for turbo mode
+ outputChan chan []byte
+
+ // Stats
+ linesWritten uint64
+ bytesWritten uint64
+
+ // Track if we've signaled EOF
+ eofSignaled bool
+}
+
+// WriteLineData formats and writes line data directly
+func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error {
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "lineNum", lineNum, "sourceID", sourceID, "contentLen", len(lineContent))
+
+ // Build the output line
+ if !w.plain && !w.serverless {
+ w.writeBuf.WriteString("REMOTE")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(w.hostname)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ // For direct writing, we don't have transmittedPerc, so use 100
+ w.writeBuf.WriteString("100")
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ w.writeBuf.WriteString(sourceID)
+ w.writeBuf.WriteString(protocol.FieldDelimiter)
+ }
+
+ // Write the actual line content (already includes line endings)
+ w.writeBuf.Write(lineContent)
+ w.writeBuf.WriteByte(protocol.MessageDelimiter)
+
+ // Update stats
+ w.linesWritten++
+ w.bytesWritten += uint64(w.writeBuf.Len())
+
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "linesWritten", w.linesWritten, "bytesWritten", w.bytesWritten, "bufSize", w.writeBuf.Len())
+
+ // Write directly to the turbo channel
+ if w.handler.turboLines != nil {
+ data := make([]byte, w.writeBuf.Len())
+ copy(data, w.writeBuf.Bytes())
+
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sending to turboLines channel", "dataLen", len(data))
+
+ // Send data to turbo channel with a larger buffer
+ select {
+ case w.handler.turboLines <- data:
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sent to channel successfully")
+ w.writeBuf.Reset()
+ return nil
+ default:
+ // Channel full, wait a bit and retry
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "channel full, waiting before retry")
+ time.Sleep(time.Millisecond)
+ w.handler.turboLines <- data
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sent to channel after retry")
+ w.writeBuf.Reset()
+ return nil
+ }
+ }
+
+ dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "turboLines channel is nil")
+ w.writeBuf.Reset()
+ return nil
+}
+
+// WriteServerMessage writes a server message
+func (w *TurboNetworkWriter) WriteServerMessage(message string) error {
+ // Server messages are less critical in turbo mode
+ // We can send them through the normal channel
+ if w.handler != nil && w.handler.serverMessages != nil {
+ select {
+ case w.handler.serverMessages <- message:
+ return nil
+ default:
+ return fmt.Errorf("server message channel full")
+ }
+ }
+ return nil
+}
+
+// Flush ensures all data is written
+func (w *TurboNetworkWriter) Flush() error {
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "called", "eofSignaled", w.eofSignaled)
+
+ w.mutex.Lock()
+ defer w.mutex.Unlock()
+
+ // If we have any buffered data, send it now
+ if w.writeBuf.Len() > 0 {
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "flushing buffered data", "bufSize", w.writeBuf.Len())
+
+ if w.handler.turboLines != nil {
+ data := make([]byte, w.writeBuf.Len())
+ copy(data, w.writeBuf.Bytes())
+
+ // Force send the data
+ w.handler.turboLines <- data
+ w.writeBuf.Reset()
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel")
+ }
+ }
+
+ // Wait for the channel to have space before signaling EOF
+ // This ensures data has been sent
+ if !w.eofSignaled && w.handler.turboEOF != nil {
+ // Wait until channel has been drained somewhat
+ for i := 0; i < 100 && len(w.handler.turboLines) > 900; i++ {
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.handler.turboLines))
+ time.Sleep(10 * time.Millisecond)
+ }
+
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "signaling EOF", "channelLen", len(w.handler.turboLines))
+ close(w.handler.turboEOF)
+ w.eofSignaled = true
+ }
+
+ // Wait a bit to ensure data is processed
+ // This is crucial for integration tests
+ time.Sleep(10 * time.Millisecond)
+ dlog.Server.Trace("TurboNetworkWriter.Flush", "completed")
+
+ return nil
+}
+
+// DirectLineProcessor processes lines directly without channels in turbo mode
+type DirectLineProcessor struct {
+ writer TurboWriter
+ globID string
+ lineCount uint64
+}
+
+// NewDirectLineProcessor creates a processor that writes directly
+func NewDirectLineProcessor(writer TurboWriter, globID string) *DirectLineProcessor {
+ return &DirectLineProcessor{
+ writer: writer,
+ globID: globID,
+ }
+}
+
+// ProcessLine writes a line directly to the output
+func (p *DirectLineProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error {
+ p.lineCount++
+
+ dlog.Server.Trace("DirectLineProcessor.ProcessLine", "lineCount", p.lineCount, "lineNum", lineNum, "sourceID", sourceID)
+
+ // Write directly to output
+ err := p.writer.WriteLineData(lineContent.Bytes(), lineNum, sourceID)
+
+ // Recycle the buffer
+ pool.RecycleBytesBuffer(lineContent)
+
+ return err
+}
+
+// Flush ensures all data is written
+func (p *DirectLineProcessor) Flush() error {
+ dlog.Server.Trace("DirectLineProcessor.Flush", "lineCount", p.lineCount)
+ return p.writer.Flush()
+}
+
+// Close flushes any remaining data
+func (p *DirectLineProcessor) Close() error {
+ dlog.Server.Trace("DirectLineProcessor.Close", "lineCount", p.lineCount)
+ return p.writer.Flush()
+}