diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-08 09:33:22 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-08 09:33:22 +0200 |
| commit | 7179dba1f70f7fbdc8b89bf709bc2d5b643fe692 (patch) | |
| tree | 7340de538cfcc583102aa5697a65801501ec32c4 /internal/io/fs/readfile.go | |
| parent | 91b83a9ffcabf7264888cf84b95f08b8cc88c832 (diff) | |
task: close compressed readers in file read paths (task 377)
Diffstat (limited to 'internal/io/fs/readfile.go')
| -rw-r--r-- | internal/io/fs/readfile.go | 28 |
1 files changed, 19 insertions, 9 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index fab933b..ee486bc 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -76,10 +76,17 @@ func (f readFile) Retry() bool { func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, lines chan<- *line.Line, re regex.Regex) error { - reader, fd, err := f.makeReader() + reader, fd, decompressor, err := f.makeReader() if fd != nil { defer fd.Close() } + if decompressor != nil { + defer func() { + if closeErr := decompressor.Close(); closeErr != nil { + dlog.Common.Warn(f.filePath, "Unable to close compressed reader", closeErr) + } + }() + } if err != nil { return err } @@ -109,14 +116,14 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, return err } -func (f *readFile) makeReader() (*bufio.Reader, *os.File, error) { +func (f *readFile) makeReader() (*bufio.Reader, *os.File, io.Closer, error) { if f.filePath == "" && f.globID == "-" { return f.makePipeReader() } return f.makeFileReader() } -func (f *readFile) makeFileReader() (reader *bufio.Reader, fd *os.File, err error) { +func (f *readFile) makeFileReader() (reader *bufio.Reader, fd *os.File, decompressor io.Closer, err error) { if fd, err = os.Open(f.filePath); err != nil { return } @@ -127,18 +134,18 @@ func (f *readFile) makeFileReader() (reader *bufio.Reader, fd *os.File, err erro } } - reader, err = f.makeCompressedFileReader(fd) + reader, decompressor, err = f.makeCompressedFileReader(fd) return } -func (f *readFile) makePipeReader() (*bufio.Reader, *os.File, error) { - return bufio.NewReader(os.Stdin), nil, nil +func (f *readFile) makePipeReader() (*bufio.Reader, *os.File, io.Closer, error) { + return bufio.NewReader(os.Stdin), nil, nil, nil } func (f *readFile) periodicTruncateCheck(ctx context.Context, truncate chan<- struct{}) { ticker := time.NewTicker(time.Second * 3) defer ticker.Stop() - + for { select { case <-ticker.C: @@ -153,7 +160,7 @@ func (f *readFile) periodicTruncateCheck(ctx context.Context, truncate chan<- st } } -func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) { +func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, decompressor io.Closer, err error) { switch { case strings.HasSuffix(f.FilePath(), ".gz"): fallthrough @@ -164,10 +171,13 @@ func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, if err != nil { return } + decompressor = gzipReader reader = bufio.NewReader(gzipReader) case strings.HasSuffix(f.FilePath(), ".zst"): dlog.Common.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) + zstdReader := zstd.NewReader(fd) + decompressor = zstdReader + reader = bufio.NewReader(zstdReader) default: reader = bufio.NewReader(fd) } |
