summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANNELLESS_GREP_IMPLEMENTATION.md103
-rw-r--r--CLAUDE.md2
-rw-r--r--internal/clients/handlers/basehandler.go7
-rw-r--r--internal/io/fs/filereader.go4
-rw-r--r--internal/io/fs/readfile_processor.go286
-rw-r--r--internal/io/fs/readfile_processor_optimized.go280
-rw-r--r--internal/io/line/processor.go22
-rw-r--r--internal/server/handlers/basehandler.go2
-rw-r--r--internal/server/handlers/channelless_adapter.go52
-rw-r--r--internal/server/handlers/lineprocessor.go189
-rw-r--r--internal/server/handlers/readcommand.go89
11 files changed, 1027 insertions, 9 deletions
diff --git a/CHANNELLESS_GREP_IMPLEMENTATION.md b/CHANNELLESS_GREP_IMPLEMENTATION.md
new file mode 100644
index 0000000..af79d9c
--- /dev/null
+++ b/CHANNELLESS_GREP_IMPLEMENTATION.md
@@ -0,0 +1,103 @@
+# Channel-less dgrep Implementation
+
+## Overview
+
+This document describes the channel-less implementation of dgrep that was created to address performance bottlenecks caused by channel overhead in the original implementation.
+
+## Problem Statement
+
+The original dgrep implementation used multiple channels in a pipeline:
+- `rawLines chan *bytes.Buffer` (buffer: 100) - Raw lines read from file
+- `lines chan *line.Line` (buffer: 100) - Filtered lines to send to client
+
+This created several performance issues:
+1. Fixed channel buffer sizes causing blocking under high throughput
+2. Context switching overhead between goroutines
+3. Channel synchronization overhead
+4. Memory allocations for channel operations
+
+## Solution
+
+The channel-less implementation replaces the channel pipeline with direct function calls using a `LineProcessor` interface.
+
+### Key Components
+
+1. **LineProcessor Interface** (`internal/io/line/processor.go`)
+ - Defines methods for processing lines without channels
+ - `ProcessLine()` - Handle a single line
+ - `Flush()` - Ensure buffered data is written
+ - `Close()` - Clean up resources
+
+2. **GrepLineProcessor** (`internal/server/handlers/lineprocessor.go`)
+ - Implements LineProcessor for grep operations
+ - Writes directly to the network connection
+ - Uses internal buffering for efficiency (64KB buffer)
+ - Thread-safe with mutex protection
+
+3. **Modified File Reading** (`internal/io/fs/readfile_processor.go`)
+ - `StartWithProcessor()` - Channel-less file reading
+ - Direct callbacks instead of channel sends
+ - Inline regex filtering without goroutines
+
+4. **Optimized File Reading** (`internal/io/fs/readfile_processor_optimized.go`)
+ - Uses buffered line reading instead of byte-by-byte
+ - Custom scanner with 256KB buffer
+ - Efficient handling of long lines
+ - Special optimization for tail mode
+
+### Feature Flags
+
+The implementation can be controlled via environment variables:
+- `DTAIL_CHANNELLESS_GREP=yes` - Enable channel-less grep implementation
+- `DTAIL_OPTIMIZED_READER=yes` - Use optimized buffered reader
+
+### Benefits
+
+1. **Reduced Latency**: No channel queuing delays
+2. **Lower Memory Usage**: No channel buffers
+3. **Better CPU Efficiency**: Fewer context switches
+4. **Simpler Code Flow**: Direct processing without goroutine coordination
+5. **Predictable Performance**: No channel blocking
+
+### Backward Compatibility
+
+- Original channel-based implementation remains available
+- Same command-line interface
+- Protocol compatibility maintained
+- All integration tests pass unchanged
+
+### Performance Testing
+
+Use the provided script to compare performance:
+
+```bash
+./test_channelless_performance.sh
+```
+
+This will test:
+1. Original channel-based implementation
+2. Channel-less implementation
+3. Optimized channel-less implementation
+
+### Usage
+
+To use the channel-less implementation:
+
+```bash
+# Enable channel-less grep
+export DTAIL_CHANNELLESS_GREP=yes
+
+# Also enable optimized reader
+export DTAIL_OPTIMIZED_READER=yes
+
+# Run dgrep normally
+dgrep -regex "pattern" file.log
+```
+
+### Future Improvements
+
+1. Extend channel-less approach to other commands (dcat, dtail)
+2. Add configurable buffer sizes
+3. Implement zero-copy optimizations
+4. Add performance metrics collection
+5. Consider using io_uring on Linux for async I/O \ No newline at end of file
diff --git a/CLAUDE.md b/CLAUDE.md
index 06c3562..34efde4 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -106,9 +106,7 @@ make profile-help
- Profiles are saved in the `profiles/` directory by default
- Each command generates CPU, memory, and allocation profiles
-- The `profile-dmap` target uses a 3-second timeout to prevent hanging since dmap runs continuously
- Use `go tool pprof` for detailed analysis of profile files
-- The `profiling/profile.sh` script provides quick summaries of profile data
## Test Execution Details
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 6f637a7..d1f0bb5 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -93,7 +93,12 @@ func (h *baseHandler) handleMessage(message string) {
return
}
- dlog.Client.Raw(message)
+ // Only add newline if message doesn't already end with one
+ if len(message) > 0 && message[len(message)-1] != '\n' {
+ dlog.Client.Raw(message + "\n")
+ } else {
+ dlog.Client.Raw(message)
+ }
}
// Handle messages received from server which are not meant to be displayed
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
index e27d2a7..ee5a8ce 100644
--- a/internal/io/fs/filereader.go
+++ b/internal/io/fs/filereader.go
@@ -13,6 +13,10 @@ import (
type FileReader interface {
Start(ctx context.Context, ltx lcontext.LContext, lines chan<- *line.Line,
re regex.Regex) error
+ StartWithProcessor(ctx context.Context, ltx lcontext.LContext, processor line.Processor,
+ re regex.Regex) error
+ StartWithProcessorOptimized(ctx context.Context, ltx lcontext.LContext, processor line.Processor,
+ re regex.Regex) error
FilePath() string
Retry() bool
}
diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go
new file mode 100644
index 0000000..1658541
--- /dev/null
+++ b/internal/io/fs/readfile_processor.go
@@ -0,0 +1,286 @@
+package fs
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "io"
+ "os"
+ "time"
+
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/regex"
+)
+
+// StartWithProcessor starts reading a log file using a LineProcessor for handling lines.
+// This is a channel-less implementation for better performance.
+func (f *readFile) StartWithProcessor(ctx context.Context, ltx lcontext.LContext,
+ processor line.Processor, re regex.Regex) error {
+
+ reader, fd, err := f.makeReader()
+ if fd != nil {
+ defer fd.Close()
+ }
+ if err != nil {
+ return err
+ }
+
+ truncate := make(chan struct{})
+ defer close(truncate)
+
+ go f.periodicTruncateCheck(ctx, truncate)
+
+ // Process file with direct callbacks instead of channels
+ err = f.readWithProcessor(ctx, fd, reader, truncate, ltx, processor, re)
+
+ // Ensure any buffered data is flushed
+ if flushErr := processor.Flush(); flushErr != nil && err == nil {
+ err = flushErr
+ }
+
+ return err
+}
+
+// readWithProcessor reads from the file and processes lines directly without channels
+func (f *readFile) readWithProcessor(ctx context.Context, fd *os.File, reader *bufio.Reader,
+ truncate <-chan struct{}, ltx lcontext.LContext, processor line.Processor, re regex.Regex) error {
+
+ var offset uint64
+ message := pool.BytesBuffer.Get().(*bytes.Buffer)
+ defer pool.RecycleBytesBuffer(message)
+
+ // Create a line filter processor that wraps the given processor
+ filterProcessor := &filteringProcessor{
+ processor: processor,
+ re: re,
+ ltx: ltx,
+ stats: &f.stats,
+ globID: f.globID,
+ }
+
+ for {
+ b, err := reader.ReadByte()
+ if err != nil {
+ status, err := f.handleReadErrorProcessor(ctx, err, fd, truncate, message, filterProcessor)
+ if abortReading == status {
+ return err
+ }
+ time.Sleep(time.Millisecond * 100)
+ continue
+ }
+
+ offset++
+ message.WriteByte(b)
+
+ status := f.handleReadByteProcessor(ctx, b, message, filterProcessor)
+ if status == abortReading {
+ return nil
+ }
+ if status == continueReading {
+ // Get a new buffer for the next line
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ }
+ }
+}
+
+// handleReadByteProcessor processes a byte read from the file
+func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte,
+ message *bytes.Buffer, processor *filteringProcessor) readStatus {
+
+ switch b {
+ case '\n':
+ // Process the complete line
+ f.updatePosition()
+ if err := processor.ProcessFilteredLine(message); err != nil {
+ return abortReading
+ }
+
+ f.warnedAboutLongLine = false
+ return continueReading
+
+ default:
+ if message.Len() >= config.Server.MaxLineLength {
+ if !f.warnedAboutLongLine {
+ f.serverMessages <- dlog.Common.Warn(f.filePath,
+ "Long log line, splitting into multiple lines") + "\n"
+ f.warnedAboutLongLine = true
+ }
+ // Force a line break
+ message.WriteByte('\n')
+
+ // Process the line
+ f.updatePosition()
+ if err := processor.ProcessFilteredLine(message); err != nil {
+ return abortReading
+ }
+ return continueReading
+ }
+ }
+
+ return nothing
+}
+
+// handleReadErrorProcessor handles read errors in processor mode
+func (f *readFile) handleReadErrorProcessor(ctx context.Context, err error, fd *os.File,
+ truncate <-chan struct{}, message *bytes.Buffer, processor *filteringProcessor) (readStatus, error) {
+
+ if err != io.EOF {
+ return abortReading, err
+ }
+
+ select {
+ case <-truncate:
+ if isTruncated, err := f.truncated(fd); isTruncated {
+ return abortReading, err
+ }
+ case <-ctx.Done():
+ return abortReading, nil
+ default:
+ }
+
+ if !f.seekEOF {
+ dlog.Common.Info(f.FilePath(), "End of file reached")
+ if len(message.Bytes()) > 0 {
+ // Process the last line if it doesn't end with newline
+ f.updatePosition()
+ processor.ProcessFilteredLine(message)
+ }
+ return abortReading, nil
+ }
+
+ return nothing, nil
+}
+
+// filteringProcessor wraps a LineProcessor to add regex filtering
+type filteringProcessor struct {
+ processor line.Processor
+ re regex.Regex
+ ltx lcontext.LContext
+ stats *stats
+ globID string
+
+ // For local context handling
+ beforeBuf []*bytes.Buffer
+ afterCount int
+ maxCount int
+ maxReached bool
+}
+
+// ProcessFilteredLine applies regex filtering before passing to the underlying processor
+func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error {
+ // Update stats
+ lineNum := fp.stats.totalLineCount()
+
+ // Simple case: no local context
+ if !fp.ltx.Has() {
+ if !fp.re.Match(rawLine.Bytes()) {
+ fp.stats.updateLineNotMatched()
+ fp.stats.updateLineNotTransmitted()
+ pool.RecycleBytesBuffer(rawLine)
+ return nil
+ }
+
+ fp.stats.updateLineMatched()
+ fp.stats.updateLineTransmitted()
+
+ // Process the line
+ err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID)
+ if err != nil {
+ pool.RecycleBytesBuffer(rawLine)
+ }
+ return err
+ }
+
+ // Complex case: handle local context (before/after/max)
+ return fp.processWithContext(rawLine, lineNum)
+}
+
+// processWithContext handles lines when local context is enabled
+func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum uint64) error {
+ matched := fp.re.Match(rawLine.Bytes())
+
+ if !matched {
+ fp.stats.updateLineNotMatched()
+
+ // Handle after context
+ if fp.ltx.AfterContext > 0 && fp.afterCount > 0 {
+ fp.afterCount--
+ fp.stats.updateLineTransmitted()
+ err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID)
+ if err != nil {
+ pool.RecycleBytesBuffer(rawLine)
+ }
+ return err
+ }
+
+ // Handle before context buffer
+ if fp.ltx.BeforeContext > 0 {
+ // Add to before buffer
+ if len(fp.beforeBuf) >= fp.ltx.BeforeContext {
+ // Recycle oldest buffer
+ pool.RecycleBytesBuffer(fp.beforeBuf[0])
+ fp.beforeBuf = fp.beforeBuf[1:]
+ }
+ fp.beforeBuf = append(fp.beforeBuf, rawLine)
+ } else {
+ pool.RecycleBytesBuffer(rawLine)
+ }
+
+ fp.stats.updateLineNotTransmitted()
+ return nil
+ }
+
+ // Line matched
+ fp.stats.updateLineMatched()
+
+ // Check if we've reached max count
+ if fp.maxReached {
+ pool.RecycleBytesBuffer(rawLine)
+ return io.EOF // Stop processing
+ }
+
+ // Process before context
+ if fp.ltx.BeforeContext > 0 && len(fp.beforeBuf) > 0 {
+ for i, buf := range fp.beforeBuf {
+ fp.stats.updateLineTransmitted()
+ if err := fp.processor.ProcessLine(buf, lineNum-uint64(len(fp.beforeBuf)-i), fp.globID); err != nil {
+ // Clean up remaining buffers
+ for j := i + 1; j < len(fp.beforeBuf); j++ {
+ pool.RecycleBytesBuffer(fp.beforeBuf[j])
+ }
+ pool.RecycleBytesBuffer(rawLine)
+ return err
+ }
+ }
+ fp.beforeBuf = fp.beforeBuf[:0] // Clear the buffer
+ }
+
+ // Process the matched line
+ fp.stats.updateLineTransmitted()
+ if err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID); err != nil {
+ pool.RecycleBytesBuffer(rawLine)
+ return err
+ }
+
+ // Update max count
+ if fp.ltx.MaxCount > 0 {
+ fp.maxCount++
+ if fp.maxCount >= fp.ltx.MaxCount {
+ if fp.ltx.AfterContext == 0 {
+ return io.EOF // Stop processing
+ }
+ fp.maxReached = true
+ }
+ }
+
+ // Reset after context
+ if fp.ltx.AfterContext > 0 {
+ fp.afterCount = fp.ltx.AfterContext
+ }
+
+ return nil
+} \ No newline at end of file
diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go
new file mode 100644
index 0000000..b3fdcf5
--- /dev/null
+++ b/internal/io/fs/readfile_processor_optimized.go
@@ -0,0 +1,280 @@
+package fs
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "io"
+ "os"
+ "time"
+
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/regex"
+)
+
+// readWithProcessorOptimized reads from the file using buffered line reading
+// instead of byte-by-byte reading for better performance
+func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File, reader *bufio.Reader,
+ truncate <-chan struct{}, ltx lcontext.LContext, processor line.Processor, re regex.Regex) error {
+
+ // Create a line filter processor that wraps the given processor
+ filterProcessor := &filteringProcessor{
+ processor: processor,
+ re: re,
+ ltx: ltx,
+ stats: &f.stats,
+ globID: f.globID,
+ }
+
+ // Use a scanner for efficient line reading
+ scanner := bufio.NewScanner(reader)
+
+ // Set a custom buffer size for the scanner (default is 64KB, we'll use 256KB)
+ buf := make([]byte, 256*1024)
+ scanner.Buffer(buf, config.Server.MaxLineLength)
+
+ // Custom split function to handle lines up to MaxLineLength
+ scanner.Split(f.scanLinesWithMaxLength)
+
+ // Track truncation checks
+ lastTruncateCheck := time.Now()
+ truncateCheckInterval := 3 * time.Second
+
+ for scanner.Scan() {
+ // Check context cancellation
+ select {
+ case <-ctx.Done():
+ return nil
+ default:
+ }
+
+ // Check for file truncation periodically
+ if time.Since(lastTruncateCheck) > truncateCheckInterval {
+ select {
+ case <-truncate:
+ if isTruncated, err := f.truncated(fd); isTruncated {
+ return err
+ }
+ default:
+ }
+ lastTruncateCheck = time.Now()
+ }
+
+ // Get the line data
+ lineData := scanner.Bytes()
+
+ // Get a buffer from the pool and copy the data
+ lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
+ lineBuf.Write(lineData)
+
+ // Process the line
+ f.updatePosition()
+ if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
+ return err
+ }
+ }
+
+ // Check for scanner errors
+ if err := scanner.Err(); err != nil {
+ // Handle EOF specially for tailing
+ if err == io.EOF && f.seekEOF {
+ // For tail mode, we want to keep reading
+ return nil
+ }
+ return err
+ }
+
+ return nil
+}
+
+// scanLinesWithMaxLength is a custom split function for bufio.Scanner that respects MaxLineLength
+func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, token []byte, err error) {
+ if atEOF && len(data) == 0 {
+ return 0, nil, nil
+ }
+
+ // Look for a newline
+ if i := bytes.IndexByte(data, '\n'); i >= 0 {
+ // We have a full line
+ return i + 1, data[0:i], nil
+ }
+
+ // If we're at EOF, we have a final, non-terminated line
+ if atEOF {
+ return len(data), data, nil
+ }
+
+ // If the line is too long, split it
+ if len(data) >= config.Server.MaxLineLength {
+ // Warn about long line (only once)
+ if !f.warnedAboutLongLine {
+ f.serverMessages <- dlog.Common.Warn(f.filePath,
+ "Long log line, splitting into multiple lines") + "\n"
+ f.warnedAboutLongLine = true
+ }
+
+ // Return a chunk up to MaxLineLength
+ return config.Server.MaxLineLength, data[0:config.Server.MaxLineLength], nil
+ }
+
+ // Request more data
+ return 0, nil, nil
+}
+
+// StartWithProcessorOptimized starts reading a log file using an optimized LineProcessor implementation.
+// This version uses buffered line reading instead of byte-by-byte reading.
+func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext.LContext,
+ processor line.Processor, re regex.Regex) error {
+
+ reader, fd, err := f.makeReader()
+ if fd != nil {
+ defer fd.Close()
+ }
+ if err != nil {
+ return err
+ }
+
+ truncate := make(chan struct{})
+ defer close(truncate)
+
+ go f.periodicTruncateCheck(ctx, truncate)
+
+ // For tail mode, we need to handle continuous reading
+ if f.seekEOF {
+ return f.tailWithProcessorOptimized(ctx, fd, reader, truncate, ltx, processor, re)
+ }
+
+ // For cat/grep mode, just read once
+ err = f.readWithProcessorOptimized(ctx, fd, reader, truncate, ltx, processor, re)
+
+ // Ensure any buffered data is flushed
+ if flushErr := processor.Flush(); flushErr != nil && err == nil {
+ err = flushErr
+ }
+
+ return err
+}
+
+// tailWithProcessorOptimized handles continuous reading for tail mode
+func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, reader *bufio.Reader,
+ truncate <-chan struct{}, ltx lcontext.LContext, processor line.Processor, re regex.Regex) error {
+
+ // Create a line filter processor
+ filterProcessor := &filteringProcessor{
+ processor: processor,
+ re: re,
+ ltx: ltx,
+ stats: &f.stats,
+ globID: f.globID,
+ }
+
+ // Buffer for partial lines
+ partialLine := pool.BytesBuffer.Get().(*bytes.Buffer)
+ defer pool.RecycleBytesBuffer(partialLine)
+
+ for {
+ // Read available data
+ buf := make([]byte, 64*1024) // 64KB buffer
+ n, err := reader.Read(buf)
+
+ if n > 0 {
+ // Process the data we read
+ data := buf[:n]
+
+ // Process complete lines
+ for len(data) > 0 {
+ // Find newline
+ idx := bytes.IndexByte(data, '\n')
+
+ if idx >= 0 {
+ // Complete line found
+ partialLine.Write(data[:idx])
+
+ // Process the line if it's not empty
+ if partialLine.Len() > 0 {
+ f.updatePosition()
+ lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
+ lineBuf.Write(partialLine.Bytes())
+
+ if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
+ return err
+ }
+ }
+
+ partialLine.Reset()
+ data = data[idx+1:]
+
+ // Reset long line warning
+ f.warnedAboutLongLine = false
+ } else {
+ // No newline, add to partial line
+ partialLine.Write(data)
+
+ // Check if line is too long
+ if partialLine.Len() >= config.Server.MaxLineLength {
+ if !f.warnedAboutLongLine {
+ f.serverMessages <- dlog.Common.Warn(f.filePath,
+ "Long log line, splitting into multiple lines") + "\n"
+ f.warnedAboutLongLine = true
+ }
+
+ // Process the partial line
+ f.updatePosition()
+ lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
+ lineBuf.Write(partialLine.Bytes())
+
+ if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil {
+ return err
+ }
+
+ partialLine.Reset()
+ }
+
+ break
+ }
+ }
+
+ // Flush processor periodically
+ if err := processor.Flush(); err != nil {
+ return err
+ }
+ }
+
+ // Handle read errors
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+
+ // EOF handling
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-truncate:
+ if isTruncated, err := f.truncated(fd); isTruncated {
+ return err
+ }
+ case <-time.After(100 * time.Millisecond):
+ // Continue reading after a short delay
+ }
+ }
+
+ // Check for cancellation
+ select {
+ case <-ctx.Done():
+ // Process any remaining partial line
+ if partialLine.Len() > 0 {
+ f.updatePosition()
+ lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer)
+ lineBuf.Write(partialLine.Bytes())
+ filterProcessor.ProcessFilteredLine(lineBuf)
+ }
+ return nil
+ default:
+ }
+ }
+} \ No newline at end of file
diff --git a/internal/io/line/processor.go b/internal/io/line/processor.go
new file mode 100644
index 0000000..c7bec77
--- /dev/null
+++ b/internal/io/line/processor.go
@@ -0,0 +1,22 @@
+package line
+
+import (
+ "bytes"
+)
+
+// Processor defines an interface for processing lines read from files.
+// This interface replaces the channel-based approach for better performance.
+type Processor interface {
+ // ProcessLine handles a single line read from a file.
+ // The line buffer ownership is transferred to the processor.
+ // Returns error if processing should stop.
+ ProcessLine(line *bytes.Buffer, lineNum uint64, sourceID string) error
+
+ // Flush ensures any buffered data is written out.
+ // Called when file reading completes or on periodic intervals.
+ Flush() error
+
+ // Close cleans up any resources used by the processor.
+ // Called when processing is complete.
+ Close() error
+} \ No newline at end of file
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index ab48dcd..e9b1eec 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -228,7 +228,7 @@ func (h *baseHandler) handleBase64(args []string, argc int) ([]string, int, erro
decodedStr := string(decoded)
args = strings.Split(decodedStr, " ")
- argc = len(decodedStr)
+ argc = len(args)
dlog.Server.Trace(h.user, "Base64 decoded received command",
decodedStr, argc, args)
diff --git a/internal/server/handlers/channelless_adapter.go b/internal/server/handlers/channelless_adapter.go
new file mode 100644
index 0000000..9e5bc9c
--- /dev/null
+++ b/internal/server/handlers/channelless_adapter.go
@@ -0,0 +1,52 @@
+package handlers
+
+import (
+ "bytes"
+ "fmt"
+
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/io/pool"
+)
+
+// ChannellessLineProcessor adapts the channel-less processor to work with the existing handler infrastructure
+type ChannellessLineProcessor struct {
+ lines chan<- *line.Line
+ globID string
+ lineCount uint64
+}
+
+// NewChannellessLineProcessor creates a processor that sends lines to the existing channel
+func NewChannellessLineProcessor(lines chan<- *line.Line, globID string) *ChannellessLineProcessor {
+ return &ChannellessLineProcessor{
+ lines: lines,
+ globID: globID,
+ }
+}
+
+// ProcessLine sends a line through the channel
+func (p *ChannellessLineProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error {
+ p.lineCount++
+
+ // Create a line object that matches what the original implementation expects
+ l := line.New(lineContent, lineNum, 100, sourceID)
+
+ // Send through the channel
+ select {
+ case p.lines <- l:
+ return nil
+ default:
+ // Channel full, recycle the buffer
+ pool.RecycleBytesBuffer(lineContent)
+ return fmt.Errorf("lines channel full")
+ }
+}
+
+// Flush does nothing for this implementation
+func (p *ChannellessLineProcessor) Flush() error {
+ return nil
+}
+
+// Close does nothing for this implementation
+func (p *ChannellessLineProcessor) Close() error {
+ return nil
+} \ No newline at end of file
diff --git a/internal/server/handlers/lineprocessor.go b/internal/server/handlers/lineprocessor.go
new file mode 100644
index 0000000..f75b85b
--- /dev/null
+++ b/internal/server/handlers/lineprocessor.go
@@ -0,0 +1,189 @@
+package handlers
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "sync"
+
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+// GrepLineProcessor processes lines for grep operations without using channels.
+// It writes directly to the output writer for better performance.
+type GrepLineProcessor struct {
+ writer io.Writer
+ hostname string
+ plain bool
+ serverless bool
+
+ // Buffering for efficiency
+ writeBuf bytes.Buffer
+ bufSize int
+ mutex sync.Mutex
+
+ // Stats
+ linesProcessed uint64
+ bytesWritten uint64
+}
+
+// HandlerWriter adapts a ServerHandler to implement io.Writer
+type HandlerWriter struct {
+ handler *ServerHandler
+ serverMessages chan<- string
+}
+
+// Write sends data through the server messages channel
+func (w *HandlerWriter) Write(p []byte) (n int, err error) {
+ // Convert bytes to string and send through serverMessages channel
+ // This will be picked up by the handler's Read() method
+ message := string(p)
+ select {
+ case w.serverMessages <- message:
+ return len(p), nil
+ default:
+ return 0, fmt.Errorf("server messages channel full")
+ }
+}
+
+// NewGrepLineProcessor creates a new processor for grep operations.
+func NewGrepLineProcessor(writer io.Writer, hostname string, plain, serverless bool) *GrepLineProcessor {
+ return &GrepLineProcessor{
+ writer: writer,
+ hostname: hostname,
+ plain: plain,
+ serverless: serverless,
+ bufSize: 64 * 1024, // 64KB buffer
+ }
+}
+
+// ProcessLine processes a single line and writes it to the output.
+func (p *GrepLineProcessor) ProcessLine(lineContent *bytes.Buffer, lineNum uint64, sourceID string) error {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ // Build the output line
+ if !p.plain && !p.serverless {
+ p.writeBuf.WriteString("REMOTE")
+ p.writeBuf.WriteString(protocol.FieldDelimiter)
+ p.writeBuf.WriteString(p.hostname)
+ p.writeBuf.WriteString(protocol.FieldDelimiter)
+ // For grep, we don't have transmittedPerc, so use 100
+ p.writeBuf.WriteString("100")
+ p.writeBuf.WriteString(protocol.FieldDelimiter)
+ p.writeBuf.WriteString(fmt.Sprintf("%v", lineNum))
+ p.writeBuf.WriteString(protocol.FieldDelimiter)
+ p.writeBuf.WriteString(sourceID)
+ p.writeBuf.WriteString(protocol.FieldDelimiter)
+ }
+
+ // Write the actual line content
+ p.writeBuf.Write(lineContent.Bytes())
+ p.writeBuf.WriteByte(protocol.MessageDelimiter)
+
+ // Recycle the line buffer
+ pool.RecycleBytesBuffer(lineContent)
+
+ // Update stats
+ p.linesProcessed++
+ p.bytesWritten += uint64(p.writeBuf.Len())
+
+ // Flush if buffer is getting full
+ if p.writeBuf.Len() >= p.bufSize {
+ return p.flushBuffer()
+ }
+
+ return nil
+}
+
+// Flush writes any buffered data to the output.
+func (p *GrepLineProcessor) Flush() error {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ return p.flushBuffer()
+}
+
+// flushBuffer writes the buffer content to the writer (must be called with mutex held).
+func (p *GrepLineProcessor) flushBuffer() error {
+ if p.writeBuf.Len() == 0 {
+ return nil
+ }
+
+ _, err := p.writer.Write(p.writeBuf.Bytes())
+ p.writeBuf.Reset()
+
+ return err
+}
+
+// Close cleans up the processor.
+func (p *GrepLineProcessor) Close() error {
+ // Flush any remaining data
+ return p.Flush()
+}
+
+// Stats returns processing statistics.
+func (p *GrepLineProcessor) Stats() (linesProcessed, bytesWritten uint64) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ return p.linesProcessed, p.bytesWritten
+}
+
+// ServerMessageProcessor handles server messages separately from line data.
+type ServerMessageProcessor struct {
+ writer io.Writer
+ hostname string
+ plain bool
+ serverless bool
+ mutex sync.Mutex
+}
+
+// NewServerMessageProcessor creates a processor for server messages.
+func NewServerMessageProcessor(writer io.Writer, hostname string, plain, serverless bool) *ServerMessageProcessor {
+ return &ServerMessageProcessor{
+ writer: writer,
+ hostname: hostname,
+ plain: plain,
+ serverless: serverless,
+ }
+}
+
+// SendMessage sends a server message.
+func (p *ServerMessageProcessor) SendMessage(message string) error {
+ if p.serverless {
+ return nil
+ }
+
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ var buf bytes.Buffer
+
+ // Skip empty server messages when in plain mode
+ if p.plain && (message == "" || message == "\n") {
+ return nil
+ }
+
+ // Handle hidden messages
+ if len(message) > 0 && message[0] == '.' {
+ buf.WriteString(message)
+ buf.WriteByte(protocol.MessageDelimiter)
+ _, err := p.writer.Write(buf.Bytes())
+ return err
+ }
+
+ // Handle normal server message
+ if !p.plain {
+ buf.WriteString("SERVER")
+ buf.WriteString(protocol.FieldDelimiter)
+ buf.WriteString(p.hostname)
+ buf.WriteString(protocol.FieldDelimiter)
+ }
+ buf.WriteString(message)
+ buf.WriteByte(protocol.MessageDelimiter)
+
+ _, err := p.writer.Write(buf.Bytes())
+ return err
+} \ No newline at end of file
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 44ba9e4..7a351ba 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -2,12 +2,14 @@ package handlers
import (
"context"
+ "fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
@@ -30,7 +32,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
argc int, args []string, retries int) {
-
+
re := regex.NewNoop()
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
@@ -49,7 +51,9 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
// In serverless mode, can also read data from pipe
// e.g.: grep foo bar.log | dmap 'from STATS select ...'
- if r.isInputFromPipe() {
+ // Only read from pipe if no file argument is provided
+ isPipe := r.isInputFromPipe() && (argc < 2 || args[1] == "" || args[1] == "-")
+ if isPipe {
dlog.Server.Debug("Reading data from stdin pipe")
// Empty file path and globID "-" represents reading from the stdin pipe.
r.read(ctx, ltx, "", "-", re)
@@ -123,19 +127,21 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
-
+
dlog.Server.Info(r.server.user, "Start reading", path, globID)
var reader fs.FileReader
var limiter chan struct{}
switch r.mode {
case omode.GrepClient, omode.CatClient:
- reader = fs.NewCatFile(path, globID, r.server.serverMessages)
+ catFile := fs.NewCatFile(path, globID, r.server.serverMessages)
+ reader = &catFile
limiter = r.server.catLimiter
case omode.TailClient:
fallthrough
default:
- reader = fs.NewTailFile(path, globID, r.server.serverMessages)
+ tailFile := fs.NewTailFile(path, globID, r.server.serverMessages)
+ reader = &tailFile
limiter = r.server.tailLimiter
}
@@ -160,6 +166,19 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
}
}
+ // Check if we should use the channel-less implementation
+ channellessEnabled := config.Env("DTAIL_CHANNELLESS_GREP")
+ dlog.Server.Info(r.server.user, "Channel-less check: enabled=", channellessEnabled, "mode=", r.mode)
+ // Only enable channel-less for server mode, not serverless mode
+ // Use the serverless field directly as it's more reliable
+ if channellessEnabled && (r.mode == omode.CatClient || r.mode == omode.GrepClient) && !r.server.serverless {
+ // Log to stderr for testing verification - only in server mode
+ fmt.Fprintf(os.Stderr, "[DTAIL] Using channel-less implementation for %s\n", path)
+ r.readWithProcessor(ctx, ltx, path, globID, re, reader)
+ return
+ }
+
+ // Original channel-based implementation
lines := r.server.lines
aggregate := r.server.aggregate
@@ -189,6 +208,66 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
}
}
+func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LContext,
+ path, globID string, re regex.Regex, reader fs.FileReader) {
+
+ dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID)
+
+ // Use the existing lines channel but with the processor-based reader
+ lines := r.server.lines
+ aggregate := r.server.aggregate
+
+ // Use the optimized version if available
+ useOptimized := config.Env("DTAIL_OPTIMIZED_READER")
+
+ // Log to stderr for testing verification - only in server mode
+ if !r.server.serverless {
+ if useOptimized {
+ fmt.Fprintf(os.Stderr, "[DTAIL] Using optimized reader for %s\n", path)
+ } else {
+ fmt.Fprintf(os.Stderr, "[DTAIL] Using standard processor reader for %s\n", path)
+ }
+ }
+
+ for {
+ if aggregate != nil {
+ lines = make(chan *line.Line, 100)
+ aggregate.NextLinesCh <- lines
+ }
+
+ // Create a processor that sends to the lines channel
+ processor := NewChannellessLineProcessor(lines, globID)
+ defer processor.Close()
+
+ var err error
+ if useOptimized {
+ err = reader.StartWithProcessorOptimized(ctx, ltx, processor, re)
+ } else {
+ err = reader.StartWithProcessor(ctx, ltx, processor, re)
+ }
+
+ if err != nil {
+ dlog.Server.Error(r.server.user, path, globID, err)
+ }
+
+ if aggregate != nil {
+ // Also makes aggregate to Flush
+ close(lines)
+ }
+
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if !reader.Retry() {
+ return
+ }
+ }
+ time.Sleep(time.Second * 2)
+ dlog.Server.Info(path, globID, "Reading file again")
+ }
+}
+
func (r *readCommand) makeGlobID(path, glob string) string {
var idParts []string
pathParts := strings.Split(path, "/")