diff options
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 127 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 102 | ||||
| -rw-r--r-- | internal/server/handlers/turbo_writer.go | 516 |
3 files changed, 720 insertions, 25 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index e9b1eec..b756201 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -46,6 +46,12 @@ type baseHandler struct { quiet bool plain bool serverless bool + + // Turbo mode support + turboMode bool + turboLines chan []byte // Pre-formatted lines for turbo mode + turboBuffer []byte // Buffer for partially sent turbo data + turboEOF chan struct{} // Signal when turbo data is complete } // Shutdown the handler. @@ -62,6 +68,68 @@ func (h *baseHandler) Done() <-chan struct{} { func (h *baseHandler) Read(p []byte) (n int, err error) { defer h.readBuf.Reset() + // In turbo mode, check if we have buffered data first + if h.turboMode && len(h.turboBuffer) > 0 { + dlog.Server.Trace(h.user, "baseHandler.Read", "using buffered turbo data", "bufferedLen", len(h.turboBuffer)) + n = copy(p, h.turboBuffer) + h.turboBuffer = h.turboBuffer[n:] + dlog.Server.Trace(h.user, "baseHandler.Read", "after buffer read", "copied", n, "remaining", len(h.turboBuffer)) + return + } + + // In turbo mode, prioritize pre-formatted turbo lines + if h.turboMode && h.turboLines != nil { + channelLen := len(h.turboLines) + dlog.Server.Trace(h.user, "baseHandler.Read", "checking turboLines channel", "channelLen", channelLen) + + // Try to read from the channel + select { + case turboData := <-h.turboLines: + dlog.Server.Trace(h.user, "baseHandler.Read", "got data from turboLines", "dataLen", len(turboData)) + n = copy(p, turboData) + // If we couldn't send all data, buffer the rest + if n < len(turboData) { + h.turboBuffer = turboData[n:] + dlog.Server.Trace(h.user, "baseHandler.Read", "buffering remaining data", "bufferedLen", len(h.turboBuffer)) + } + return + default: + // No data immediately available + if channelLen > 0 { + // There's data in the channel but we couldn't get it immediately + // Wait a bit and try again + dlog.Server.Trace(h.user, "baseHandler.Read", "channel has data but not available, waiting") + time.Sleep(time.Millisecond) + select { + case turboData := <-h.turboLines: + dlog.Server.Trace(h.user, "baseHandler.Read", "got data after wait", "dataLen", len(turboData)) + n = copy(p, turboData) + if n < len(turboData) { + h.turboBuffer = turboData[n:] + } + return + default: + // Still no data + } + } + + // Channel is truly empty, check if we should continue in turbo mode + // Only disable turbo mode if we've been signaled to do so + if h.turboEOF != nil { + select { + case <-h.turboEOF: + dlog.Server.Trace(h.user, "baseHandler.Read", "EOF received and channel empty, disabling turbo mode") + h.turboMode = false + default: + // EOF not signaled yet, continue in turbo mode + } + } + + dlog.Server.Trace(h.user, "baseHandler.Read", "no data in turboLines, falling through") + // Fall through to normal processing + } + } + select { case message := <-h.serverMessages: if len(message) > 0 && message[0] == '.' { @@ -288,9 +356,17 @@ func (h *baseHandler) sendln(ch chan<- string, message string) { func (h *baseHandler) flush() { dlog.Server.Trace(h.user, "flush()") numUnsentMessages := func() int { - return len(h.lines) + len(h.serverMessages) + len(h.maprMessages) + lineCount := len(h.lines) + serverCount := len(h.serverMessages) + maprCount := len(h.maprMessages) + turboCount := 0 + if h.turboLines != nil { + turboCount = len(h.turboLines) + } + dlog.Server.Trace(h.user, "flush", "lines", lineCount, "server", serverCount, "mapr", maprCount, "turbo", turboCount) + return lineCount + serverCount + maprCount + turboCount } - for i := 0; i < 10; i++ { + for i := 0; i < 100; i++ { // Increase iterations for turbo mode if numUnsentMessages() == 0 { dlog.Server.Debug(h.user, "ALL lines sent", fmt.Sprintf("%p", h)) return @@ -329,3 +405,50 @@ func (h *baseHandler) decrementActiveCommands() int32 { atomic.AddInt32(&h.activeCommands, -1) return atomic.LoadInt32(&h.activeCommands) } + +// EnableTurboMode enables turbo mode for direct line processing +func (h *baseHandler) EnableTurboMode() { + h.turboMode = true + if h.turboLines == nil { + h.turboLines = make(chan []byte, 1000) // Large buffer for performance + } + if h.turboEOF == nil { + h.turboEOF = make(chan struct{}) + } +} + +// IsTurboMode returns true if turbo mode is enabled +func (h *baseHandler) IsTurboMode() bool { + return h.turboMode +} + +// flushTurboData ensures all turbo channel data is processed +func (h *baseHandler) flushTurboData() { + if h.turboLines == nil { + return + } + + dlog.Server.Debug(h.user, "Flushing turbo data", "channelLen", len(h.turboLines)) + + // Wait for turbo channel to drain with a timeout + timeout := time.After(2 * time.Second) + for { + select { + case <-timeout: + dlog.Server.Warn(h.user, "Timeout while flushing turbo data", "remaining", len(h.turboLines)) + return + default: + if len(h.turboLines) == 0 { + dlog.Server.Debug(h.user, "Turbo channel drained successfully") + return + } + // Give the reader time to process + time.Sleep(10 * time.Millisecond) + } + } +} + +// GetTurboChannel returns the turbo lines channel for direct writing +func (h *baseHandler) GetTurboChannel() chan<- []byte { + return h.turboLines +} diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 2245b7c..dc11aa9 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -2,7 +2,6 @@ package handlers import ( "context" - "fmt" "os" "path/filepath" "strings" @@ -32,7 +31,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, argc int, args []string, retries int) { - + re := regex.NewNoop() if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) @@ -53,7 +52,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, // e.g.: grep foo bar.log | dmap 'from STATS select ...' // Only read from pipe if no file argument is provided isPipe := r.isInputFromPipe() && (argc < 2 || args[1] == "" || args[1] == "-") - + if isPipe { dlog.Server.Debug("Reading data from stdin pipe") // Empty file path and globID "-" represents reading from the stdin pipe. @@ -110,7 +109,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re) } wg.Wait() - + // In turbo mode with aggregate, we don't close the shared channel here // because it will be used across multiple invocations // The aggregate will handle channel closure when it's done @@ -132,7 +131,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, path, globID string, re regex.Regex) { - + dlog.Server.Info(r.server.user, "Start reading", path, globID) var reader fs.FileReader var limiter chan struct{} @@ -173,16 +172,11 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, // Check if we should use the turbo boost optimizations turboBoostEnabled := config.Env("DTAIL_TURBOBOOST_ENABLE") - dlog.Server.Info(r.server.user, "Turbo boost check: enabled=", turboBoostEnabled, "mode=", r.mode) - // Only enable channel-less for server mode, not serverless mode - // Use the serverless field directly as it's more reliable // Enable turbo boost for cat/grep/tail modes, but NOT for aggregate (MapReduce) operations // MapReduce requires the traditional channel-based approach to work correctly - if turboBoostEnabled && !r.server.serverless && r.server.aggregate == nil && + if turboBoostEnabled && r.server.aggregate == nil && (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { - // Log to stderr for testing verification - only in server mode - fmt.Fprintf(os.Stderr, "[DTAIL] Turbo boost enabled: using channel-less implementation for %s\n", path) - r.readWithProcessor(ctx, ltx, path, globID, re, reader) + r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader) return } @@ -222,7 +216,6 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte path, globID string, re regex.Regex, reader fs.FileReader) { dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID) - // Use the existing lines channel but with the processor-based reader lines := r.server.lines @@ -230,15 +223,6 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte // Use the optimized version if turbo boost is enabled turboBoostEnabled := config.Env("DTAIL_TURBOBOOST_ENABLE") - - // Log to stderr for testing verification - only in server mode - if !r.server.serverless { - if turboBoostEnabled { - fmt.Fprintf(os.Stderr, "[DTAIL] Turbo boost enabled: using optimized reader for %s\n", path) - } else { - fmt.Fprintf(os.Stderr, "[DTAIL] Using standard processor reader for %s\n", path) - } - } for { if aggregate != nil { @@ -258,7 +242,7 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte } else { err = reader.StartWithProcessor(ctx, ltx, processor, re) } - + if err != nil { dlog.Server.Error(r.server.user, path, globID, err) } @@ -281,6 +265,78 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte } } +func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.LContext, + path, globID string, re regex.Regex, reader fs.FileReader) { + + dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID) + + // Enable turbo mode if not already enabled + if !r.server.IsTurboMode() { + r.server.EnableTurboMode() + } + + // Create a direct writer based on the mode + // In serverless mode, write directly to stdout + // In server mode, use the turbo channel + var writer TurboWriter + if r.server.serverless { + // In serverless mode, write directly to stdout + writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless) + } else { + // In server mode, use the network writer with turbo channels + writer = &TurboNetworkWriter{ + handler: &r.server.baseHandler, + hostname: r.server.hostname, + plain: r.server.plain, + serverless: r.server.serverless, + } + } + + for { + dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration") + + // Create a direct processor that writes without channels + processor := NewDirectLineProcessor(writer, globID) + + // Use the optimized reader + dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start") + err := reader.StartWithProcessorOptimized(ctx, ltx, processor, re) + dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> completed") + if err != nil { + dlog.Server.Error(r.server.user, path, globID, err) + } + + // Ensure we flush before closing + dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> flushing processor") + if flushErr := processor.Flush(); flushErr != nil { + dlog.Server.Error(r.server.user, path, globID, "flush error", flushErr) + } + + // Close the processor after each iteration + dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> closing processor") + processor.Close() + dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> processor closed") + + // Give time for data to be transmitted + // This is crucial for integration tests to ensure all data is sent + if !r.server.serverless { + dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> waiting for data transmission") + time.Sleep(50 * time.Millisecond) + } + + select { + case <-ctx.Done(): + return + default: + if !reader.Retry() { + return + } + } + time.Sleep(time.Second * 2) + dlog.Server.Info(path, globID, "Reading file again") + } +} + func (r *readCommand) makeGlobID(path, glob string) string { var idParts []string pathParts := strings.Split(path, "/") diff --git a/internal/server/handlers/turbo_writer.go b/internal/server/handlers/turbo_writer.go new file mode 100644 index 0000000..01755a5 --- /dev/null +++ b/internal/server/handlers/turbo_writer.go @@ -0,0 +1,516 @@ +package handlers + +import ( + "bytes" + "fmt" + "io" + "sync" + "time" + + "github.com/mimecast/dtail/internal/color/brush" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/protocol" +) + +// TurboWriter defines the interface for direct writing in turbo mode +type TurboWriter interface { + // WriteLineData writes formatted line data directly to output + WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error + // WriteServerMessage writes a server message + WriteServerMessage(message string) error + // Flush ensures all buffered data is written + Flush() error +} + +// DirectTurboWriter implements TurboWriter for direct network writing +type DirectTurboWriter struct { + writer io.Writer + hostname string + plain bool + serverless bool + + // Buffering for efficiency + writeBuf bytes.Buffer + bufSize int + mutex sync.Mutex + + // Stats + linesWritten uint64 + bytesWritten uint64 +} + +// NewDirectTurboWriter creates a new turbo writer +func NewDirectTurboWriter(writer io.Writer, hostname string, plain, serverless bool) *DirectTurboWriter { + return &DirectTurboWriter{ + writer: writer, + hostname: hostname, + plain: plain, + serverless: serverless, + bufSize: 64 * 1024, // 64KB buffer + } +} + +// WriteLineData writes formatted line data directly to output +func (w *DirectTurboWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error { + w.mutex.Lock() + defer w.mutex.Unlock() + + // In serverless mode with colors, write each line immediately + if w.serverless && !w.plain { + // Build the complete line in a temporary buffer + var lineBuf bytes.Buffer + lineBuf.WriteString("REMOTE") + lineBuf.WriteString(protocol.FieldDelimiter) + lineBuf.WriteString(w.hostname) + lineBuf.WriteString(protocol.FieldDelimiter) + lineBuf.WriteString("100") + lineBuf.WriteString(protocol.FieldDelimiter) + lineBuf.WriteString(fmt.Sprintf("%v", lineNum)) + lineBuf.WriteString(protocol.FieldDelimiter) + lineBuf.WriteString(sourceID) + lineBuf.WriteString(protocol.FieldDelimiter) + + // Remove trailing newline if present (it will be added back after coloring) + content := lineContent + if len(content) > 0 && content[len(content)-1] == '\n' { + content = content[:len(content)-1] + } + lineBuf.Write(content) + + // Apply color formatting + coloredLine := brush.Colorfy(lineBuf.String()) + + // Write directly to output with newline + _, err := w.writer.Write([]byte(coloredLine + "\n")) + w.linesWritten++ + w.bytesWritten += uint64(len(coloredLine) + 1) + return err + } + + // Build the output line + if !w.plain { + w.writeBuf.WriteString("REMOTE") + w.writeBuf.WriteString(protocol.FieldDelimiter) + w.writeBuf.WriteString(w.hostname) + w.writeBuf.WriteString(protocol.FieldDelimiter) + // For direct writing, we don't have transmittedPerc, so use 100 + w.writeBuf.WriteString("100") + w.writeBuf.WriteString(protocol.FieldDelimiter) + w.writeBuf.WriteString(fmt.Sprintf("%v", lineNum)) + w.writeBuf.WriteString(protocol.FieldDelimiter) + w.writeBuf.WriteString(sourceID) + w.writeBuf.WriteString(protocol.FieldDelimiter) + } + + // Write the actual line content + w.writeBuf.Write(lineContent) + + // In plain mode, ensure line has a newline if it doesn't already + if w.plain && len(lineContent) > 0 && lineContent[len(lineContent)-1] != '\n' { + w.writeBuf.WriteByte('\n') + } + + // Only add message delimiter in non-plain, non-serverless mode + // In serverless mode, we output lines directly + if !w.plain && !w.serverless { + w.writeBuf.WriteByte(protocol.MessageDelimiter) + } + + // Update stats + w.linesWritten++ + w.bytesWritten += uint64(w.writeBuf.Len()) + + // Flush if buffer is getting full or in serverless mode + if w.writeBuf.Len() >= w.bufSize || w.serverless { + return w.flushBuffer() + } + + return nil +} + +// WriteServerMessage writes a server message +func (w *DirectTurboWriter) WriteServerMessage(message string) error { + if w.serverless { + return nil + } + + w.mutex.Lock() + defer w.mutex.Unlock() + + // Skip empty server messages when in plain mode + if w.plain && (message == "" || message == "\n") { + return nil + } + + // Handle hidden messages + if len(message) > 0 && message[0] == '.' { + w.writeBuf.WriteString(message) + w.writeBuf.WriteByte(protocol.MessageDelimiter) + return w.flushBuffer() + } + + // Handle normal server message + if !w.plain { + w.writeBuf.WriteString("SERVER") + w.writeBuf.WriteString(protocol.FieldDelimiter) + w.writeBuf.WriteString(w.hostname) + w.writeBuf.WriteString(protocol.FieldDelimiter) + } + w.writeBuf.WriteString(message) + w.writeBuf.WriteByte(protocol.MessageDelimiter) + + return w.flushBuffer() +} + +// Flush ensures all buffered data is written +func (w *DirectTurboWriter) Flush() error { + w.mutex.Lock() + defer w.mutex.Unlock() + + // Force flush any remaining data + err := w.flushBuffer() + + // For serverless mode, ensure everything is written to output + if w.serverless { + // Ensure writer is flushed if it supports it + if flusher, ok := w.writer.(interface{ Flush() error }); ok { + flusher.Flush() + } + } + + return err +} + +// flushBuffer writes the buffer content to the writer (must be called with mutex held) +func (w *DirectTurboWriter) flushBuffer() error { + if w.writeBuf.Len() == 0 { + return nil + } + + data := w.writeBuf.Bytes() + + // In serverless mode with colors, data is already processed line by line + // so we don't need to do any additional formatting here + + _, err := w.writer.Write(data) + w.writeBuf.Reset() + + return err +} + +// Stats returns writing statistics +func (w *DirectTurboWriter) Stats() (linesWritten, bytesWritten uint64) { + w.mutex.Lock() + defer w.mutex.Unlock() + + return w.linesWritten, w.bytesWritten +} + +// TurboChannelWriter writes pre-formatted data to a turbo channel +type TurboChannelWriter struct { + channel chan<- []byte + hostname string + plain bool + serverless bool + + // Buffering for efficiency + writeBuf bytes.Buffer + bufSize int + mutex sync.Mutex + + // Stats + linesWritten uint64 + bytesWritten uint64 +} + +// NewTurboChannelWriter creates a writer that sends to a turbo channel +func NewTurboChannelWriter(channel chan<- []byte, hostname string, plain, serverless bool) *TurboChannelWriter { + return &TurboChannelWriter{ + channel: channel, + hostname: hostname, + plain: plain, + serverless: serverless, + bufSize: 64 * 1024, // 64KB buffer + } +} + +// WriteLineData formats and writes line data to the turbo channel +func (w *TurboChannelWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error { + w.mutex.Lock() + defer w.mutex.Unlock() + + // Build the output line + if !w.plain && !w.serverless { + w.writeBuf.WriteString("REMOTE") + w.writeBuf.WriteString(protocol.FieldDelimiter) + w.writeBuf.WriteString(w.hostname) + w.writeBuf.WriteString(protocol.FieldDelimiter) + // For direct writing, we don't have transmittedPerc, so use 100 + w.writeBuf.WriteString("100") + w.writeBuf.WriteString(protocol.FieldDelimiter) + w.writeBuf.WriteString(fmt.Sprintf("%v", lineNum)) + w.writeBuf.WriteString(protocol.FieldDelimiter) + w.writeBuf.WriteString(sourceID) + w.writeBuf.WriteString(protocol.FieldDelimiter) + } + + // Write the actual line content (already includes line endings) + w.writeBuf.Write(lineContent) + w.writeBuf.WriteByte(protocol.MessageDelimiter) + + // Update stats + w.linesWritten++ + w.bytesWritten += uint64(w.writeBuf.Len()) + + // Send to channel + data := make([]byte, w.writeBuf.Len()) + copy(data, w.writeBuf.Bytes()) + w.writeBuf.Reset() + + select { + case w.channel <- data: + return nil + default: + return fmt.Errorf("turbo channel full") + } +} + +// WriteServerMessage writes a server message +func (w *TurboChannelWriter) WriteServerMessage(message string) error { + if w.serverless { + return nil + } + + w.mutex.Lock() + defer w.mutex.Unlock() + + // Skip empty server messages when in plain mode + if w.plain && (message == "" || message == "\n") { + return nil + } + + var buf bytes.Buffer + + // Handle hidden messages + if len(message) > 0 && message[0] == '.' { + buf.WriteString(message) + buf.WriteByte(protocol.MessageDelimiter) + } else { + // Handle normal server message + if !w.plain { + buf.WriteString("SERVER") + buf.WriteString(protocol.FieldDelimiter) + buf.WriteString(w.hostname) + buf.WriteString(protocol.FieldDelimiter) + } + buf.WriteString(message) + buf.WriteByte(protocol.MessageDelimiter) + } + + data := buf.Bytes() + select { + case w.channel <- data: + return nil + default: + return fmt.Errorf("turbo channel full") + } +} + +// Flush is a no-op for channel writer as data is sent immediately +func (w *TurboChannelWriter) Flush() error { + return nil +} + +// Stats returns writing statistics +func (w *TurboChannelWriter) Stats() (linesWritten, bytesWritten uint64) { + w.mutex.Lock() + defer w.mutex.Unlock() + + return w.linesWritten, w.bytesWritten +} + +// TurboNetworkWriter writes directly to the network connection bypassing channels +type TurboNetworkWriter struct { + handler *baseHandler + hostname string + plain bool + serverless bool + + // Internal buffer for batching writes + writeBuf bytes.Buffer + bufSize int + mutex sync.Mutex + + // Direct output channel for turbo mode + outputChan chan []byte + + // Stats + linesWritten uint64 + bytesWritten uint64 + + // Track if we've signaled EOF + eofSignaled bool +} + +// WriteLineData formats and writes line data directly +func (w *TurboNetworkWriter) WriteLineData(lineContent []byte, lineNum uint64, sourceID string) error { + w.mutex.Lock() + defer w.mutex.Unlock() + + dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "lineNum", lineNum, "sourceID", sourceID, "contentLen", len(lineContent)) + + // Build the output line + if !w.plain && !w.serverless { + w.writeBuf.WriteString("REMOTE") + w.writeBuf.WriteString(protocol.FieldDelimiter) + w.writeBuf.WriteString(w.hostname) + w.writeBuf.WriteString(protocol.FieldDelimiter) + // For direct writing, we don't have transmittedPerc, so use 100 + w.writeBuf.WriteString("100") + w.writeBuf.WriteString(protocol.FieldDelimiter) + w.writeBuf.WriteString(fmt.Sprintf("%v", lineNum)) + w.writeBuf.WriteString(protocol.FieldDelimiter) + w.writeBuf.WriteString(sourceID) + w.writeBuf.WriteString(protocol.FieldDelimiter) + } + + // Write the actual line content (already includes line endings) + w.writeBuf.Write(lineContent) + w.writeBuf.WriteByte(protocol.MessageDelimiter) + + // Update stats + w.linesWritten++ + w.bytesWritten += uint64(w.writeBuf.Len()) + + dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "linesWritten", w.linesWritten, "bytesWritten", w.bytesWritten, "bufSize", w.writeBuf.Len()) + + // Write directly to the turbo channel + if w.handler.turboLines != nil { + data := make([]byte, w.writeBuf.Len()) + copy(data, w.writeBuf.Bytes()) + + dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sending to turboLines channel", "dataLen", len(data)) + + // Send data to turbo channel with a larger buffer + select { + case w.handler.turboLines <- data: + dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sent to channel successfully") + w.writeBuf.Reset() + return nil + default: + // Channel full, wait a bit and retry + dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "channel full, waiting before retry") + time.Sleep(time.Millisecond) + w.handler.turboLines <- data + dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "sent to channel after retry") + w.writeBuf.Reset() + return nil + } + } + + dlog.Server.Trace("TurboNetworkWriter.WriteLineData", "turboLines channel is nil") + w.writeBuf.Reset() + return nil +} + +// WriteServerMessage writes a server message +func (w *TurboNetworkWriter) WriteServerMessage(message string) error { + // Server messages are less critical in turbo mode + // We can send them through the normal channel + if w.handler != nil && w.handler.serverMessages != nil { + select { + case w.handler.serverMessages <- message: + return nil + default: + return fmt.Errorf("server message channel full") + } + } + return nil +} + +// Flush ensures all data is written +func (w *TurboNetworkWriter) Flush() error { + dlog.Server.Trace("TurboNetworkWriter.Flush", "called", "eofSignaled", w.eofSignaled) + + w.mutex.Lock() + defer w.mutex.Unlock() + + // If we have any buffered data, send it now + if w.writeBuf.Len() > 0 { + dlog.Server.Trace("TurboNetworkWriter.Flush", "flushing buffered data", "bufSize", w.writeBuf.Len()) + + if w.handler.turboLines != nil { + data := make([]byte, w.writeBuf.Len()) + copy(data, w.writeBuf.Bytes()) + + // Force send the data + w.handler.turboLines <- data + w.writeBuf.Reset() + dlog.Server.Trace("TurboNetworkWriter.Flush", "flushed data to channel") + } + } + + // Wait for the channel to have space before signaling EOF + // This ensures data has been sent + if !w.eofSignaled && w.handler.turboEOF != nil { + // Wait until channel has been drained somewhat + for i := 0; i < 100 && len(w.handler.turboLines) > 900; i++ { + dlog.Server.Trace("TurboNetworkWriter.Flush", "waiting for channel to drain", "channelLen", len(w.handler.turboLines)) + time.Sleep(10 * time.Millisecond) + } + + dlog.Server.Trace("TurboNetworkWriter.Flush", "signaling EOF", "channelLen", len(w.handler.turboLines)) + close(w.handler.turboEOF) + w.eofSignaled = true + } + + // Wait a bit to ensure data is processed + // This is crucial for integration tests + time.Sleep(10 * time.Millisecond) + dlog.Server.Trace("TurboNetworkWriter.Flush", "completed") + + return nil +} + +// DirectLineProcessor processes lines directly without channels in turbo mode +type DirectLineProcessor struct { + writer TurboWriter + globID string + lineCount uint64 +} + +// NewDirectLineProcessor creates a processor that writes directly +func NewDirectLineProcessor(writer TurboWriter, globID string) *DirectLineProcessor { + return &DirectLineProcessor{ + writer: writer, + globID: globID, + } +} + +// ProcessLine writes a line directly to the output +func (p *DirectLineProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error { + p.lineCount++ + + dlog.Server.Trace("DirectLineProcessor.ProcessLine", "lineCount", p.lineCount, "lineNum", lineNum, "sourceID", sourceID) + + // Write directly to output + err := p.writer.WriteLineData(lineContent.Bytes(), lineNum, sourceID) + + // Recycle the buffer + pool.RecycleBytesBuffer(lineContent) + + return err +} + +// Flush ensures all data is written +func (p *DirectLineProcessor) Flush() error { + dlog.Server.Trace("DirectLineProcessor.Flush", "lineCount", p.lineCount) + return p.writer.Flush() +} + +// Close flushes any remaining data +func (p *DirectLineProcessor) Close() error { + dlog.Server.Trace("DirectLineProcessor.Close", "lineCount", p.lineCount) + return p.writer.Flush() +} |
