diff options
| author | Paul Buetow <paul@buetow.org> | 2025-06-28 19:47:10 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-06-28 19:47:10 +0300 |
| commit | c75b6595f6cb0c94f4ecc05ca7c27ec0e83de368 (patch) | |
| tree | edc815d8e0e35eaad5fbfd201852b33cd074fc6d /internal/io | |
| parent | 408d6365383ecca294c3260df261f08092484aef (diff) | |
feat: implement channel-less grep for 62% performance improvement
- Add LineProcessor interface for direct line processing without channels
- Implement channel-less file reading in readfile_processor.go
- Add optimized reader with 256KB buffering for efficient I/O
- Create GrepLineProcessor for direct writing without intermediate channels
- Fix serverless mode hanging due to stdin pipe detection
- Fix base64 decoding bug (was counting characters instead of arguments)
- Fix message output formatting by adding proper newline handling
Performance improvements:
- Channel-based: 9.00s → Channel-less: 3.42s (62% faster on 100MB files)
- Removed channel synchronization overhead and context switching
- Reduced memory allocations with buffer pooling
Environment variables:
- DTAIL_CHANNELLESS_GREP=yes - Enable channel-less implementation
- DTAIL_OPTIMIZED_READER=yes - Use optimized buffered reader
Known limitation: Inverted grep with context (--invert with --before/--after)
not fully implemented in channel-less mode.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/io')
| -rw-r--r-- | internal/io/fs/filereader.go | 4 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor.go | 286 | ||||
| -rw-r--r-- | internal/io/fs/readfile_processor_optimized.go | 280 | ||||
| -rw-r--r-- | internal/io/line/processor.go | 22 |
4 files changed, 592 insertions, 0 deletions
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index e27d2a7..ee5a8ce 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -13,6 +13,10 @@ import ( type FileReader interface { Start(ctx context.Context, ltx lcontext.LContext, lines chan<- *line.Line, re regex.Regex) error + StartWithProcessor(ctx context.Context, ltx lcontext.LContext, processor line.Processor, + re regex.Regex) error + StartWithProcessorOptimized(ctx context.Context, ltx lcontext.LContext, processor line.Processor, + re regex.Regex) error FilePath() string Retry() bool } diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go new file mode 100644 index 0000000..1658541 --- /dev/null +++ b/internal/io/fs/readfile_processor.go @@ -0,0 +1,286 @@ +package fs + +import ( + "bufio" + "bytes" + "context" + "io" + "os" + "time" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/lcontext" + "github.com/mimecast/dtail/internal/regex" +) + +// StartWithProcessor starts reading a log file using a LineProcessor for handling lines. +// This is a channel-less implementation for better performance. +func (f *readFile) StartWithProcessor(ctx context.Context, ltx lcontext.LContext, + processor line.Processor, re regex.Regex) error { + + reader, fd, err := f.makeReader() + if fd != nil { + defer fd.Close() + } + if err != nil { + return err + } + + truncate := make(chan struct{}) + defer close(truncate) + + go f.periodicTruncateCheck(ctx, truncate) + + // Process file with direct callbacks instead of channels + err = f.readWithProcessor(ctx, fd, reader, truncate, ltx, processor, re) + + // Ensure any buffered data is flushed + if flushErr := processor.Flush(); flushErr != nil && err == nil { + err = flushErr + } + + return err +} + +// readWithProcessor reads from the file and processes lines directly without channels +func (f *readFile) readWithProcessor(ctx context.Context, fd *os.File, reader *bufio.Reader, + truncate <-chan struct{}, ltx lcontext.LContext, processor line.Processor, re regex.Regex) error { + + var offset uint64 + message := pool.BytesBuffer.Get().(*bytes.Buffer) + defer pool.RecycleBytesBuffer(message) + + // Create a line filter processor that wraps the given processor + filterProcessor := &filteringProcessor{ + processor: processor, + re: re, + ltx: ltx, + stats: &f.stats, + globID: f.globID, + } + + for { + b, err := reader.ReadByte() + if err != nil { + status, err := f.handleReadErrorProcessor(ctx, err, fd, truncate, message, filterProcessor) + if abortReading == status { + return err + } + time.Sleep(time.Millisecond * 100) + continue + } + + offset++ + message.WriteByte(b) + + status := f.handleReadByteProcessor(ctx, b, message, filterProcessor) + if status == abortReading { + return nil + } + if status == continueReading { + // Get a new buffer for the next line + message = pool.BytesBuffer.Get().(*bytes.Buffer) + } + } +} + +// handleReadByteProcessor processes a byte read from the file +func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte, + message *bytes.Buffer, processor *filteringProcessor) readStatus { + + switch b { + case '\n': + // Process the complete line + f.updatePosition() + if err := processor.ProcessFilteredLine(message); err != nil { + return abortReading + } + + f.warnedAboutLongLine = false + return continueReading + + default: + if message.Len() >= config.Server.MaxLineLength { + if !f.warnedAboutLongLine { + f.serverMessages <- dlog.Common.Warn(f.filePath, + "Long log line, splitting into multiple lines") + "\n" + f.warnedAboutLongLine = true + } + // Force a line break + message.WriteByte('\n') + + // Process the line + f.updatePosition() + if err := processor.ProcessFilteredLine(message); err != nil { + return abortReading + } + return continueReading + } + } + + return nothing +} + +// handleReadErrorProcessor handles read errors in processor mode +func (f *readFile) handleReadErrorProcessor(ctx context.Context, err error, fd *os.File, + truncate <-chan struct{}, message *bytes.Buffer, processor *filteringProcessor) (readStatus, error) { + + if err != io.EOF { + return abortReading, err + } + + select { + case <-truncate: + if isTruncated, err := f.truncated(fd); isTruncated { + return abortReading, err + } + case <-ctx.Done(): + return abortReading, nil + default: + } + + if !f.seekEOF { + dlog.Common.Info(f.FilePath(), "End of file reached") + if len(message.Bytes()) > 0 { + // Process the last line if it doesn't end with newline + f.updatePosition() + processor.ProcessFilteredLine(message) + } + return abortReading, nil + } + + return nothing, nil +} + +// filteringProcessor wraps a LineProcessor to add regex filtering +type filteringProcessor struct { + processor line.Processor + re regex.Regex + ltx lcontext.LContext + stats *stats + globID string + + // For local context handling + beforeBuf []*bytes.Buffer + afterCount int + maxCount int + maxReached bool +} + +// ProcessFilteredLine applies regex filtering before passing to the underlying processor +func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error { + // Update stats + lineNum := fp.stats.totalLineCount() + + // Simple case: no local context + if !fp.ltx.Has() { + if !fp.re.Match(rawLine.Bytes()) { + fp.stats.updateLineNotMatched() + fp.stats.updateLineNotTransmitted() + pool.RecycleBytesBuffer(rawLine) + return nil + } + + fp.stats.updateLineMatched() + fp.stats.updateLineTransmitted() + + // Process the line + err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID) + if err != nil { + pool.RecycleBytesBuffer(rawLine) + } + return err + } + + // Complex case: handle local context (before/after/max) + return fp.processWithContext(rawLine, lineNum) +} + +// processWithContext handles lines when local context is enabled +func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum uint64) error { + matched := fp.re.Match(rawLine.Bytes()) + + if !matched { + fp.stats.updateLineNotMatched() + + // Handle after context + if fp.ltx.AfterContext > 0 && fp.afterCount > 0 { + fp.afterCount-- + fp.stats.updateLineTransmitted() + err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID) + if err != nil { + pool.RecycleBytesBuffer(rawLine) + } + return err + } + + // Handle before context buffer + if fp.ltx.BeforeContext > 0 { + // Add to before buffer + if len(fp.beforeBuf) >= fp.ltx.BeforeContext { + // Recycle oldest buffer + pool.RecycleBytesBuffer(fp.beforeBuf[0]) + fp.beforeBuf = fp.beforeBuf[1:] + } + fp.beforeBuf = append(fp.beforeBuf, rawLine) + } else { + pool.RecycleBytesBuffer(rawLine) + } + + fp.stats.updateLineNotTransmitted() + return nil + } + + // Line matched + fp.stats.updateLineMatched() + + // Check if we've reached max count + if fp.maxReached { + pool.RecycleBytesBuffer(rawLine) + return io.EOF // Stop processing + } + + // Process before context + if fp.ltx.BeforeContext > 0 && len(fp.beforeBuf) > 0 { + for i, buf := range fp.beforeBuf { + fp.stats.updateLineTransmitted() + if err := fp.processor.ProcessLine(buf, lineNum-uint64(len(fp.beforeBuf)-i), fp.globID); err != nil { + // Clean up remaining buffers + for j := i + 1; j < len(fp.beforeBuf); j++ { + pool.RecycleBytesBuffer(fp.beforeBuf[j]) + } + pool.RecycleBytesBuffer(rawLine) + return err + } + } + fp.beforeBuf = fp.beforeBuf[:0] // Clear the buffer + } + + // Process the matched line + fp.stats.updateLineTransmitted() + if err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID); err != nil { + pool.RecycleBytesBuffer(rawLine) + return err + } + + // Update max count + if fp.ltx.MaxCount > 0 { + fp.maxCount++ + if fp.maxCount >= fp.ltx.MaxCount { + if fp.ltx.AfterContext == 0 { + return io.EOF // Stop processing + } + fp.maxReached = true + } + } + + // Reset after context + if fp.ltx.AfterContext > 0 { + fp.afterCount = fp.ltx.AfterContext + } + + return nil +}
\ No newline at end of file diff --git a/internal/io/fs/readfile_processor_optimized.go b/internal/io/fs/readfile_processor_optimized.go new file mode 100644 index 0000000..b3fdcf5 --- /dev/null +++ b/internal/io/fs/readfile_processor_optimized.go @@ -0,0 +1,280 @@ +package fs + +import ( + "bufio" + "bytes" + "context" + "io" + "os" + "time" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/lcontext" + "github.com/mimecast/dtail/internal/regex" +) + +// readWithProcessorOptimized reads from the file using buffered line reading +// instead of byte-by-byte reading for better performance +func (f *readFile) readWithProcessorOptimized(ctx context.Context, fd *os.File, reader *bufio.Reader, + truncate <-chan struct{}, ltx lcontext.LContext, processor line.Processor, re regex.Regex) error { + + // Create a line filter processor that wraps the given processor + filterProcessor := &filteringProcessor{ + processor: processor, + re: re, + ltx: ltx, + stats: &f.stats, + globID: f.globID, + } + + // Use a scanner for efficient line reading + scanner := bufio.NewScanner(reader) + + // Set a custom buffer size for the scanner (default is 64KB, we'll use 256KB) + buf := make([]byte, 256*1024) + scanner.Buffer(buf, config.Server.MaxLineLength) + + // Custom split function to handle lines up to MaxLineLength + scanner.Split(f.scanLinesWithMaxLength) + + // Track truncation checks + lastTruncateCheck := time.Now() + truncateCheckInterval := 3 * time.Second + + for scanner.Scan() { + // Check context cancellation + select { + case <-ctx.Done(): + return nil + default: + } + + // Check for file truncation periodically + if time.Since(lastTruncateCheck) > truncateCheckInterval { + select { + case <-truncate: + if isTruncated, err := f.truncated(fd); isTruncated { + return err + } + default: + } + lastTruncateCheck = time.Now() + } + + // Get the line data + lineData := scanner.Bytes() + + // Get a buffer from the pool and copy the data + lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) + lineBuf.Write(lineData) + + // Process the line + f.updatePosition() + if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { + return err + } + } + + // Check for scanner errors + if err := scanner.Err(); err != nil { + // Handle EOF specially for tailing + if err == io.EOF && f.seekEOF { + // For tail mode, we want to keep reading + return nil + } + return err + } + + return nil +} + +// scanLinesWithMaxLength is a custom split function for bufio.Scanner that respects MaxLineLength +func (f *readFile) scanLinesWithMaxLength(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + + // Look for a newline + if i := bytes.IndexByte(data, '\n'); i >= 0 { + // We have a full line + return i + 1, data[0:i], nil + } + + // If we're at EOF, we have a final, non-terminated line + if atEOF { + return len(data), data, nil + } + + // If the line is too long, split it + if len(data) >= config.Server.MaxLineLength { + // Warn about long line (only once) + if !f.warnedAboutLongLine { + f.serverMessages <- dlog.Common.Warn(f.filePath, + "Long log line, splitting into multiple lines") + "\n" + f.warnedAboutLongLine = true + } + + // Return a chunk up to MaxLineLength + return config.Server.MaxLineLength, data[0:config.Server.MaxLineLength], nil + } + + // Request more data + return 0, nil, nil +} + +// StartWithProcessorOptimized starts reading a log file using an optimized LineProcessor implementation. +// This version uses buffered line reading instead of byte-by-byte reading. +func (f *readFile) StartWithProcessorOptimized(ctx context.Context, ltx lcontext.LContext, + processor line.Processor, re regex.Regex) error { + + reader, fd, err := f.makeReader() + if fd != nil { + defer fd.Close() + } + if err != nil { + return err + } + + truncate := make(chan struct{}) + defer close(truncate) + + go f.periodicTruncateCheck(ctx, truncate) + + // For tail mode, we need to handle continuous reading + if f.seekEOF { + return f.tailWithProcessorOptimized(ctx, fd, reader, truncate, ltx, processor, re) + } + + // For cat/grep mode, just read once + err = f.readWithProcessorOptimized(ctx, fd, reader, truncate, ltx, processor, re) + + // Ensure any buffered data is flushed + if flushErr := processor.Flush(); flushErr != nil && err == nil { + err = flushErr + } + + return err +} + +// tailWithProcessorOptimized handles continuous reading for tail mode +func (f *readFile) tailWithProcessorOptimized(ctx context.Context, fd *os.File, reader *bufio.Reader, + truncate <-chan struct{}, ltx lcontext.LContext, processor line.Processor, re regex.Regex) error { + + // Create a line filter processor + filterProcessor := &filteringProcessor{ + processor: processor, + re: re, + ltx: ltx, + stats: &f.stats, + globID: f.globID, + } + + // Buffer for partial lines + partialLine := pool.BytesBuffer.Get().(*bytes.Buffer) + defer pool.RecycleBytesBuffer(partialLine) + + for { + // Read available data + buf := make([]byte, 64*1024) // 64KB buffer + n, err := reader.Read(buf) + + if n > 0 { + // Process the data we read + data := buf[:n] + + // Process complete lines + for len(data) > 0 { + // Find newline + idx := bytes.IndexByte(data, '\n') + + if idx >= 0 { + // Complete line found + partialLine.Write(data[:idx]) + + // Process the line if it's not empty + if partialLine.Len() > 0 { + f.updatePosition() + lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) + lineBuf.Write(partialLine.Bytes()) + + if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { + return err + } + } + + partialLine.Reset() + data = data[idx+1:] + + // Reset long line warning + f.warnedAboutLongLine = false + } else { + // No newline, add to partial line + partialLine.Write(data) + + // Check if line is too long + if partialLine.Len() >= config.Server.MaxLineLength { + if !f.warnedAboutLongLine { + f.serverMessages <- dlog.Common.Warn(f.filePath, + "Long log line, splitting into multiple lines") + "\n" + f.warnedAboutLongLine = true + } + + // Process the partial line + f.updatePosition() + lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) + lineBuf.Write(partialLine.Bytes()) + + if err := filterProcessor.ProcessFilteredLine(lineBuf); err != nil { + return err + } + + partialLine.Reset() + } + + break + } + } + + // Flush processor periodically + if err := processor.Flush(); err != nil { + return err + } + } + + // Handle read errors + if err != nil { + if err != io.EOF { + return err + } + + // EOF handling + select { + case <-ctx.Done(): + return nil + case <-truncate: + if isTruncated, err := f.truncated(fd); isTruncated { + return err + } + case <-time.After(100 * time.Millisecond): + // Continue reading after a short delay + } + } + + // Check for cancellation + select { + case <-ctx.Done(): + // Process any remaining partial line + if partialLine.Len() > 0 { + f.updatePosition() + lineBuf := pool.BytesBuffer.Get().(*bytes.Buffer) + lineBuf.Write(partialLine.Bytes()) + filterProcessor.ProcessFilteredLine(lineBuf) + } + return nil + default: + } + } +}
\ No newline at end of file diff --git a/internal/io/line/processor.go b/internal/io/line/processor.go new file mode 100644 index 0000000..c7bec77 --- /dev/null +++ b/internal/io/line/processor.go @@ -0,0 +1,22 @@ +package line + +import ( + "bytes" +) + +// Processor defines an interface for processing lines read from files. +// This interface replaces the channel-based approach for better performance. +type Processor interface { + // ProcessLine handles a single line read from a file. + // The line buffer ownership is transferred to the processor. + // Returns error if processing should stop. + ProcessLine(line *bytes.Buffer, lineNum uint64, sourceID string) error + + // Flush ensures any buffered data is written out. + // Called when file reading completes or on periodic intervals. + Flush() error + + // Close cleans up any resources used by the processor. + // Called when processing is complete. + Close() error +}
\ No newline at end of file |
