summaryrefslogtreecommitdiff
path: root/internal/io/fs/readfile_processor.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-08 09:33:22 +0200
committerPaul Buetow <paul@buetow.org>2026-03-08 09:33:22 +0200
commit7179dba1f70f7fbdc8b89bf709bc2d5b643fe692 (patch)
tree7340de538cfcc583102aa5697a65801501ec32c4 /internal/io/fs/readfile_processor.go
parent91b83a9ffcabf7264888cf84b95f08b8cc88c832 (diff)
task: close compressed readers in file read paths (task 377)
Diffstat (limited to 'internal/io/fs/readfile_processor.go')
-rw-r--r--internal/io/fs/readfile_processor.go55
1 files changed, 31 insertions, 24 deletions
diff --git a/internal/io/fs/readfile_processor.go b/internal/io/fs/readfile_processor.go
index 4e636a4..94a28dc 100644
--- a/internal/io/fs/readfile_processor.go
+++ b/internal/io/fs/readfile_processor.go
@@ -21,10 +21,17 @@ import (
func (f *readFile) StartWithProcessor(ctx context.Context, ltx lcontext.LContext,
processor line.Processor, re regex.Regex) error {
- reader, fd, err := f.makeReader()
+ reader, fd, decompressor, err := f.makeReader()
if fd != nil {
defer fd.Close()
}
+ if decompressor != nil {
+ defer func() {
+ if closeErr := decompressor.Close(); closeErr != nil {
+ dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr)
+ }
+ }()
+ }
if err != nil {
return err
}
@@ -38,7 +45,7 @@ func (f *readFile) StartWithProcessor(ctx context.Context, ltx lcontext.LContext
// Process file with direct callbacks instead of channels
err = f.readWithProcessor(ctx, fd, reader, truncate, ltx, processor, re)
-
+
// Ensure any buffered data is flushed
if flushErr := processor.Flush(); flushErr != nil && err == nil {
err = flushErr
@@ -100,7 +107,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte,
if err := processor.ProcessFilteredLine(message); err != nil {
return abortReading
}
-
+
f.warnedAboutLongLine = false
return continueReading
@@ -113,7 +120,7 @@ func (f *readFile) handleReadByteProcessor(ctx context.Context, b byte,
}
// Force a line break
message.WriteByte('\n')
-
+
// Process the line
f.updatePosition()
if err := processor.ProcessFilteredLine(message); err != nil {
@@ -164,19 +171,19 @@ type filteringProcessor struct {
ltx lcontext.LContext
stats *stats
globID string
-
+
// For local context handling
- beforeBuf []*bytes.Buffer
- afterCount int
- maxCount int
- maxReached bool
+ beforeBuf []*bytes.Buffer
+ afterCount int
+ maxCount int
+ maxReached bool
}
// ProcessFilteredLine applies regex filtering before passing to the underlying processor
func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error {
// Update stats
lineNum := fp.stats.totalLineCount()
-
+
// Simple case: no local context
if !fp.ltx.Has() {
if !fp.re.Match(rawLine.Bytes()) {
@@ -185,10 +192,10 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error {
pool.RecycleBytesBuffer(rawLine)
return nil
}
-
+
fp.stats.updateLineMatched()
fp.stats.updateLineTransmitted()
-
+
// Process the line
err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID)
if err != nil {
@@ -196,7 +203,7 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error {
}
return err
}
-
+
// Complex case: handle local context (before/after/max)
return fp.processWithContext(rawLine, lineNum)
}
@@ -204,10 +211,10 @@ func (fp *filteringProcessor) ProcessFilteredLine(rawLine *bytes.Buffer) error {
// processWithContext handles lines when local context is enabled
func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum uint64) error {
matched := fp.re.Match(rawLine.Bytes())
-
+
if !matched {
fp.stats.updateLineNotMatched()
-
+
// Handle after context
if fp.ltx.AfterContext > 0 && fp.afterCount > 0 {
fp.afterCount--
@@ -218,7 +225,7 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum
}
return err
}
-
+
// Handle before context buffer
if fp.ltx.BeforeContext > 0 {
// Add to before buffer
@@ -231,20 +238,20 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum
} else {
pool.RecycleBytesBuffer(rawLine)
}
-
+
fp.stats.updateLineNotTransmitted()
return nil
}
-
+
// Line matched
fp.stats.updateLineMatched()
-
+
// Check if we've reached max count
if fp.maxReached {
pool.RecycleBytesBuffer(rawLine)
return io.EOF // Stop processing
}
-
+
// Process before context
if fp.ltx.BeforeContext > 0 && len(fp.beforeBuf) > 0 {
for i, buf := range fp.beforeBuf {
@@ -260,14 +267,14 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum
}
fp.beforeBuf = fp.beforeBuf[:0] // Clear the buffer
}
-
+
// Process the matched line
fp.stats.updateLineTransmitted()
if err := fp.processor.ProcessLine(rawLine, lineNum, fp.globID); err != nil {
pool.RecycleBytesBuffer(rawLine)
return err
}
-
+
// Update max count
if fp.ltx.MaxCount > 0 {
fp.maxCount++
@@ -278,11 +285,11 @@ func (fp *filteringProcessor) processWithContext(rawLine *bytes.Buffer, lineNum
fp.maxReached = true
}
}
-
+
// Reset after context
if fp.ltx.AfterContext > 0 {
fp.afterCount = fp.ltx.AfterContext
}
-
+
return nil
}