diff options
| author | Paul Buetow <git@mx.buetow.org> | 2021-03-23 09:37:02 +0000 |
|---|---|---|
| committer | Paul Buetow <git@mx.buetow.org> | 2021-03-23 20:12:09 +0000 |
| commit | 2b47630c2f68794a95d5065a7989d489990f7a19 (patch) | |
| tree | 389166c157ebc7cec690a967b02255d9337c6988 /internal/io/fs/readfile.go | |
| parent | d6dd896805faa074960f17bd1e8c516420e27f0d (diff) | |
context aware grep with -max -after and -before not work
Diffstat (limited to 'internal/io/fs/readfile.go')
| -rw-r--r-- | internal/io/fs/readfile.go | 216 |
1 files changed, 41 insertions, 175 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 4ac82d8..161e3f0 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -4,12 +4,10 @@ import ( "bufio" "compress/gzip" "context" - "errors" "fmt" "io" "os" "strings" - "sync" "time" "github.com/mimecast/dtail/internal/io/line" @@ -39,6 +37,28 @@ type readFile struct { limiter chan struct{} } +func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { + switch { + case strings.HasSuffix(f.FilePath(), ".gz"): + fallthrough + case strings.HasSuffix(f.FilePath(), ".gzip"): + logger.Info(f.FilePath(), "Detected gzip compression format") + var gzipReader *gzip.Reader + gzipReader, err = gzip.NewReader(fd) + if err != nil { + return + } + reader = bufio.NewReader(gzipReader) + case strings.HasSuffix(f.FilePath(), ".zst"): + logger.Info(f.FilePath(), "Detected zstd compression format") + reader = bufio.NewReader(zstd.NewReader(fd)) + default: + reader = bufio.NewReader(fd) + } + + return +} + // String returns the string representation of the readFile func (f readFile) String() string { return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", @@ -91,58 +111,28 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c } rawLines := make(chan []byte, 100) - truncate := make(chan struct{}) - - var wg sync.WaitGroup - wg.Add(1) - - go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, lContext, &wg, rawLines, lines, re) + readCtx, readCancel := context.WithCancel(ctx) + + filterDone := make(chan struct{}) + go func() { + f.filter(ctx, rawLines, lines, re, lContext) + close(filterDone) + // If the filter stopped, make the reader stop too, no need to read + // more data if there is nothing more the filter wants to filter for! + // E.g. it could be that we only want to filter N matches but not more. + readCancel() + }() - err = f.read(ctx, fd, rawLines, truncate) + err = f.read(readCtx, fd, rawLines) close(rawLines) - wg.Wait() - return err -} - -func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { - for { - select { - case <-time.After(time.Second * 3): - select { - case truncate <- struct{}{}: - case <-ctx.Done(): - } - case <-ctx.Done(): - return - } - } -} - -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { - switch { - case strings.HasSuffix(f.FilePath(), ".gz"): - fallthrough - case strings.HasSuffix(f.FilePath(), ".gzip"): - logger.Info(f.FilePath(), "Detected gzip compression format") - var gzipReader *gzip.Reader - gzipReader, err = gzip.NewReader(fd) - if err != nil { - return - } - reader = bufio.NewReader(gzipReader) - case strings.HasSuffix(f.FilePath(), ".zst"): - logger.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) - default: - reader = bufio.NewReader(fd) - } + // Filter may flushes some data still. So wait until it is done here. + <-filterDone - return + return err } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { +func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error { var offset uint64 reader, err := f.makeReader(fd) @@ -154,6 +144,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t lineLengthThreshold := 1024 * 1024 // 1mb longLineWarning := false + checkTruncate := f.truncateTimer(ctx) + for { select { case <-ctx.Done(): @@ -162,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } select { - case <-truncate: + case <-checkTruncate: if isTruncated, err := f.truncated(fd); isTruncated { return err } @@ -226,129 +218,3 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } } } - -// Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() - - for { - select { - case rawLine, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - - line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re) - if transmittable { - select { - case lines <- line: - continue - case <-ctx.Done(): - return - } - } - } - } -} - -// Filter log lines matching a given regular expression. -func (f readFile) filterWithLContext(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() - - var bPos, bCount int - before := make([]*[]byte, lContext.BeforeContext) - - for { - select { - case rawLine, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - - if lContext.BeforeContext > 0 { - before[bPos] = &rawLine - bPos = (bPos + 1) % lContext.BeforeContext - if bCount < lContext.BeforeContext { - bCount++ - } - } - - line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re) - if transmittable { - if lContext.BeforeContext > 0 { - for bCount > 0 { - bCount-- - } - } - select { - case lines <- line: - continue - case <-ctx.Done(): - return - } - } - // before[bPos] = line - // bPos = (bPos+1) % lContext.BeforeContext - // bCount = (bCount+1) % lContext.BeforeContext - } - } -} - -func (f readFile) lineTransmittable(rawLine []byte, length, capacity int, re regex.Regex) (line.Line, bool, bool) { - var read line.Line - - if !re.Match(rawLine) { - f.updateLineNotMatched() - f.updateLineNotTransmitted() - return read, false, false - } - f.updateLineMatched() - - // Can we actually send more messages, channel capacity reached? - if f.canSkipLines && length >= capacity { - f.updateLineNotTransmitted() - return read, true, false - } - f.updateLineTransmitted() - - read = line.Line{ - Content: rawLine, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: f.transmittedPerc(), - } - - return read, true, true -} - -// Check wether log file is truncated. Returns nil if not. -func (f readFile) truncated(fd *os.File) (bool, error) { - logger.Debug(f.filePath, "File truncation check") - - // Can not seek currently open FD. - curPos, err := fd.Seek(0, os.SEEK_CUR) - if err != nil { - return true, err - } - - // Can not open file at original path. - pathFd, err := os.Open(f.filePath) - if err != nil { - return true, err - } - defer pathFd.Close() - - // Can not seek file at original path. - pathPos, err := pathFd.Seek(0, io.SeekEnd) - if err != nil { - return true, err - } - - if curPos > pathPos { - return true, errors.New("File got truncated") - } - - return false, nil -} |
