summaryrefslogtreecommitdiff
path: root/internal/server/handlers/lineprocessor.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-28 19:47:10 +0300
committerPaul Buetow <paul@buetow.org>2025-06-28 19:47:10 +0300
commitc75b6595f6cb0c94f4ecc05ca7c27ec0e83de368 (patch)
treeedc815d8e0e35eaad5fbfd201852b33cd074fc6d /internal/server/handlers/lineprocessor.go
parent408d6365383ecca294c3260df261f08092484aef (diff)
feat: implement channel-less grep for 62% performance improvement
- Add LineProcessor interface for direct line processing without channels - Implement channel-less file reading in readfile_processor.go - Add optimized reader with 256KB buffering for efficient I/O - Create GrepLineProcessor for direct writing without intermediate channels - Fix serverless mode hanging due to stdin pipe detection - Fix base64 decoding bug (was counting characters instead of arguments) - Fix message output formatting by adding proper newline handling Performance improvements: - Channel-based: 9.00s → Channel-less: 3.42s (62% faster on 100MB files) - Removed channel synchronization overhead and context switching - Reduced memory allocations with buffer pooling Environment variables: - DTAIL_CHANNELLESS_GREP=yes - Enable channel-less implementation - DTAIL_OPTIMIZED_READER=yes - Use optimized buffered reader Known limitation: Inverted grep with context (--invert with --before/--after) not fully implemented in channel-less mode. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/lineprocessor.go')
-rw-r--r--internal/server/handlers/lineprocessor.go189
1 files changed, 189 insertions, 0 deletions
diff --git a/internal/server/handlers/lineprocessor.go b/internal/server/handlers/lineprocessor.go
new file mode 100644
index 0000000..f75b85b
--- /dev/null
+++ b/internal/server/handlers/lineprocessor.go
@@ -0,0 +1,189 @@
+package handlers
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "sync"
+
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+// GrepLineProcessor processes lines for grep operations without using channels.
+// It writes directly to the output writer for better performance.
+type GrepLineProcessor struct {
+ writer io.Writer
+ hostname string
+ plain bool
+ serverless bool
+
+ // Buffering for efficiency
+ writeBuf bytes.Buffer
+ bufSize int
+ mutex sync.Mutex
+
+ // Stats
+ linesProcessed uint64
+ bytesWritten uint64
+}
+
+// HandlerWriter adapts a ServerHandler to implement io.Writer
+type HandlerWriter struct {
+ handler *ServerHandler
+ serverMessages chan<- string
+}
+
+// Write sends data through the server messages channel
+func (w *HandlerWriter) Write(p []byte) (n int, err error) {
+ // Convert bytes to string and send through serverMessages channel
+ // This will be picked up by the handler's Read() method
+ message := string(p)
+ select {
+ case w.serverMessages <- message:
+ return len(p), nil
+ default:
+ return 0, fmt.Errorf("server messages channel full")
+ }
+}
+
+// NewGrepLineProcessor creates a new processor for grep operations.
+func NewGrepLineProcessor(writer io.Writer, hostname string, plain, serverless bool) *GrepLineProcessor {
+ return &GrepLineProcessor{
+ writer: writer,
+ hostname: hostname,
+ plain: plain,
+ serverless: serverless,
+ bufSize: 64 * 1024, // 64KB buffer
+ }
+}
+
+// ProcessLine processes a single line and writes it to the output.
+func (p *GrepLineProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ // Build the output line
+ if !p.plain && !p.serverless {
+ p.writeBuf.WriteString("REMOTE")
+ p.writeBuf.WriteString(protocol.FieldDelimiter)
+ p.writeBuf.WriteString(p.hostname)
+ p.writeBuf.WriteString(protocol.FieldDelimiter)
+ // For grep, we don't have transmittedPerc, so use 100
+ p.writeBuf.WriteString("100")
+ p.writeBuf.WriteString(protocol.FieldDelimiter)
+ p.writeBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ p.writeBuf.WriteString(protocol.FieldDelimiter)
+ p.writeBuf.WriteString(sourceID)
+ p.writeBuf.WriteString(protocol.FieldDelimiter)
+ }
+
+ // Write the actual line content
+ p.writeBuf.Write(lineContent.Bytes())
+ p.writeBuf.WriteByte(protocol.MessageDelimiter)
+
+ // Recycle the line buffer
+ pool.RecycleBytesBuffer(lineContent)
+
+ // Update stats
+ p.linesProcessed++
+ p.bytesWritten += uint64(p.writeBuf.Len())
+
+ // Flush if buffer is getting full
+ if p.writeBuf.Len() >= p.bufSize {
+ return p.flushBuffer()
+ }
+
+ return nil
+}
+
+// Flush writes any buffered data to the output.
+func (p *GrepLineProcessor) Flush() error {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ return p.flushBuffer()
+}
+
+// flushBuffer writes the buffer content to the writer (must be called with mutex held).
+func (p *GrepLineProcessor) flushBuffer() error {
+ if p.writeBuf.Len() == 0 {
+ return nil
+ }
+
+ _, err := p.writer.Write(p.writeBuf.Bytes())
+ p.writeBuf.Reset()
+
+ return err
+}
+
+// Close cleans up the processor.
+func (p *GrepLineProcessor) Close() error {
+ // Flush any remaining data
+ return p.Flush()
+}
+
+// Stats returns processing statistics.
+func (p *GrepLineProcessor) Stats() (linesProcessed, bytesWritten uint64) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ return p.linesProcessed, p.bytesWritten
+}
+
+// ServerMessageProcessor handles server messages separately from line data.
+type ServerMessageProcessor struct {
+ writer io.Writer
+ hostname string
+ plain bool
+ serverless bool
+ mutex sync.Mutex
+}
+
+// NewServerMessageProcessor creates a processor for server messages.
+func NewServerMessageProcessor(writer io.Writer, hostname string, plain, serverless bool) *ServerMessageProcessor {
+ return &ServerMessageProcessor{
+ writer: writer,
+ hostname: hostname,
+ plain: plain,
+ serverless: serverless,
+ }
+}
+
+// SendMessage sends a server message.
+func (p *ServerMessageProcessor) SendMessage(message string) error {
+ if p.serverless {
+ return nil
+ }
+
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ var buf bytes.Buffer
+
+ // Skip empty server messages when in plain mode
+ if p.plain && (message == "" || message == "\n") {
+ return nil
+ }
+
+ // Handle hidden messages
+ if len(message) > 0 && message[0] == '.' {
+ buf.WriteString(message)
+ buf.WriteByte(protocol.MessageDelimiter)
+ _, err := p.writer.Write(buf.Bytes())
+ return err
+ }
+
+ // Handle normal server message
+ if !p.plain {
+ buf.WriteString("SERVER")
+ buf.WriteString(protocol.FieldDelimiter)
+ buf.WriteString(p.hostname)
+ buf.WriteString(protocol.FieldDelimiter)
+ }
+ buf.WriteString(message)
+ buf.WriteByte(protocol.MessageDelimiter)
+
+ _, err := p.writer.Write(buf.Bytes())
+ return err
+} \ No newline at end of file