diff options
Diffstat (limited to 'internal/io/fs/readfile.go')
| -rw-r--r-- | internal/io/fs/readfile.go | 172 |
1 files changed, 43 insertions, 129 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 6757bd6..161e3f0 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -4,16 +4,15 @@ import ( "bufio" "compress/gzip" "context" - "errors" "fmt" "io" "os" "strings" - "sync" "time" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -38,6 +37,28 @@ type readFile struct { limiter chan struct{} } +func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { + switch { + case strings.HasSuffix(f.FilePath(), ".gz"): + fallthrough + case strings.HasSuffix(f.FilePath(), ".gzip"): + logger.Info(f.FilePath(), "Detected gzip compression format") + var gzipReader *gzip.Reader + gzipReader, err = gzip.NewReader(fd) + if err != nil { + return + } + reader = bufio.NewReader(gzipReader) + case strings.HasSuffix(f.FilePath(), ".zst"): + logger.Info(f.FilePath(), "Detected zstd compression format") + reader = bufio.NewReader(zstd.NewReader(fd)) + default: + reader = bufio.NewReader(fd) + } + + return +} + // String returns the string representation of the readFile func (f readFile) String() string { return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", @@ -59,7 +80,7 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error { +func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error { logger.Debug("readFile", f) defer func() { select { @@ -90,58 +111,28 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re } rawLines := make(chan []byte, 100) - truncate := make(chan struct{}) - - var wg sync.WaitGroup - wg.Add(1) - - go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, &wg, rawLines, lines, re) + readCtx, readCancel := context.WithCancel(ctx) + + filterDone := make(chan struct{}) + go func() { + f.filter(ctx, rawLines, lines, re, lContext) + close(filterDone) + // If the filter stopped, make the reader stop too, no need to read + // more data if there is nothing more the filter wants to filter for! + // E.g. it could be that we only want to filter N matches but not more. + readCancel() + }() - err = f.read(ctx, fd, rawLines, truncate) + err = f.read(readCtx, fd, rawLines) close(rawLines) - wg.Wait() - - return err -} -func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { - for { - select { - case <-time.After(time.Second * 3): - select { - case truncate <- struct{}{}: - case <-ctx.Done(): - } - case <-ctx.Done(): - return - } - } -} - -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { - switch { - case strings.HasSuffix(f.FilePath(), ".gz"): - fallthrough - case strings.HasSuffix(f.FilePath(), ".gzip"): - logger.Info(f.FilePath(), "Detected gzip compression format") - var gzipReader *gzip.Reader - gzipReader, err = gzip.NewReader(fd) - if err != nil { - return - } - reader = bufio.NewReader(gzipReader) - case strings.HasSuffix(f.FilePath(), ".zst"): - logger.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) - default: - reader = bufio.NewReader(fd) - } + // Filter may flushes some data still. So wait until it is done here. + <-filterDone - return + return err } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { +func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error { var offset uint64 reader, err := f.makeReader(fd) @@ -153,6 +144,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t lineLengthThreshold := 1024 * 1024 // 1mb longLineWarning := false + checkTruncate := f.truncateTimer(ctx) + for { select { case <-ctx.Done(): @@ -161,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } select { - case <-truncate: + case <-checkTruncate: if isTruncated, err := f.truncated(fd); isTruncated { return err } @@ -225,82 +218,3 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } } } - -// Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() - - for { - select { - case line, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { - select { - case lines <- filteredLine: - case <-ctx.Done(): - return - } - } - } - } -} - -func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) { - var read line.Line - - if !re.Match(lineBytes) { - f.updateLineNotMatched() - f.updateLineNotTransmitted() - return read, false - } - f.updateLineMatched() - - // Can we actually send more messages, channel capacity reached? - if f.canSkipLines && length >= capacity { - f.updateLineNotTransmitted() - return read, false - } - f.updateLineTransmitted() - - read = line.Line{ - Content: lineBytes, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: f.transmittedPerc(), - } - - return read, true -} - -// Check wether log file is truncated. Returns nil if not. -func (f readFile) truncated(fd *os.File) (bool, error) { - logger.Debug(f.filePath, "File truncation check") - - // Can not seek currently open FD. - curPos, err := fd.Seek(0, os.SEEK_CUR) - if err != nil { - return true, err - } - - // Can not open file at original path. - pathFd, err := os.Open(f.filePath) - if err != nil { - return true, err - } - defer pathFd.Close() - - // Can not seek file at original path. - pathPos, err := pathFd.Seek(0, io.SeekEnd) - if err != nil { - return true, err - } - - if curPos > pathPos { - return true, errors.New("File got truncated") - } - - return false, nil -} |
