diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-08 09:33:22 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-08 09:33:22 +0200 |
| commit | 7179dba1f70f7fbdc8b89bf709bc2d5b643fe692 (patch) | |
| tree | 7340de538cfcc583102aa5697a65801501ec32c4 /internal/io/fs/readfile_processor.go | |
| parent | 91b83a9ffcabf7264888cf84b95f08b8cc88c832 (diff) | |
task: close compressed readers in file read paths (task 377)
Diffstat (limited to 'internal/io/fs/readfile_processor.go')
| -rw-r--r-- | internal/io/fs/readfile_processor.go | 55 |
1 files changed, 31 insertions, 24 deletions
diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go index 4e636a4..94a28dc 100644 --- a/internal/io/fs/readfile_processor.go +++ b/internal/io/fs/readfile_processor.go @@ -21,10 +21,17 @@ import ( func (f *readFile) StartWithProcessor(ctx context.Context, ltx lcontext.LContext, processor line.Processor, re regex.Regex) error { - reader, fd, err := f.makeReader() + reader, fd, decompressor, err := f.makeReader() if fd != nil { defer fd.Close() } + if decompressor != nil { + defer func() { + if closeErr := decompressor.Close(); closeErr != nil { + dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr) + } + }() + } if err != nil { return err } @@ -38,7 +45,7 @@ func (f *readFile) StartWithProcessor(ctx context.Context, ltx lcontext.LContext // Process file with direct callbacks instead of channels err = f.readWithProcessor(ctx, fd, reader, truncate, ltx, processor, re) - + // Ensure any buffered data is flushed if flushErr := processor.Flush(); flushErr != nil && err == nil { err = flushErr @@ -100,7 +107,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte, if err := processor.ProcessFilteredLine(message); err != nil { return abortReading } - + f.warnedAboutLongLine = false return continueReading @@ -113,7 +120,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte, } // Force a line break message.WriteByte('\n') - + // Process the line f.updatePosition() if err := processor.ProcessFilteredLine(message); err != nil { @@ -164,19 +171,19 @@ type filteringProcessor struct { ltx lcontext.LContext stats *stats globID string - + // For local context handling - beforeBuf []*bytes.Buffer - afterCount int - maxCount int - maxReached bool + beforeBuf []*bytes.Buffer + afterCount int + maxCount int + maxReached bool } // ProcessFilteredLine applies regex filtering before passing to the underlying processor func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error { // Update stats lineNum := fp.stats.totalLineCount() - + // Simple case: no local context if !fp.ltx.Has() { if !fp.re.Match(rawLine.Bytes()) { @@ -185,10 +192,10 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error { pool.RecycleBytesBuffer(rawLine) return nil } - + fp.stats.updateLineMatched() fp.stats.updateLineTransmitted() - + // Process the line err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID) if err != nil { @@ -196,7 +203,7 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error { } return err } - + // Complex case: handle local context (before/after/max) return fp.processWithContext(rawLine, lineNum) } @@ -204,10 +211,10 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error { // processWithContext handles lines when local context is enabled func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum uint64) error { matched := fp.re.Match(rawLine.Bytes()) - + if !matched { fp.stats.updateLineNotMatched() - + // Handle after context if fp.ltx.AfterContext > 0 && fp.afterCount > 0 { fp.afterCount-- @@ -218,7 +225,7 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum } return err } - + // Handle before context buffer if fp.ltx.BeforeContext > 0 { // Add to before buffer @@ -231,20 +238,20 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum } else { pool.RecycleBytesBuffer(rawLine) } - + fp.stats.updateLineNotTransmitted() return nil } - + // Line matched fp.stats.updateLineMatched() - + // Check if we've reached max count if fp.maxReached { pool.RecycleBytesBuffer(rawLine) return io.EOF // Stop processing } - + // Process before context if fp.ltx.BeforeContext > 0 && len(fp.beforeBuf) > 0 { for i, buf := range fp.beforeBuf { @@ -260,14 +267,14 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum } fp.beforeBuf = fp.beforeBuf[:0] // Clear the buffer } - + // Process the matched line fp.stats.updateLineTransmitted() if err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID); err != nil { pool.RecycleBytesBuffer(rawLine) return err } - + // Update max count if fp.ltx.MaxCount > 0 { fp.maxCount++ @@ -278,11 +285,11 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum fp.maxReached = true } } - + // Reset after context if fp.ltx.AfterContext > 0 { fp.afterCount = fp.ltx.AfterContext } - + return nil } |
