summaryrefslogtreecommitdiff
path: root/internal/io/fs/readfile.go
diff options
context:
space:
mode:
authorPaul Buetow <git@mx.buetow.org>2021-03-23 09:37:02 +0000
committerPaul Buetow <git@mx.buetow.org>2021-03-23 20:12:09 +0000
commit2b47630c2f68794a95d5065a7989d489990f7a19 (patch)
tree389166c157ebc7cec690a967b02255d9337c6988 /internal/io/fs/readfile.go
parentd6dd896805faa074960f17bd1e8c516420e27f0d (diff)
context aware grep with -max -after and -before not work
Diffstat (limited to 'internal/io/fs/readfile.go')
-rw-r--r--internal/io/fs/readfile.go216
1 files changed, 41 insertions, 175 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 4ac82d8..161e3f0 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -4,12 +4,10 @@ import (
"bufio"
"compress/gzip"
"context"
- "errors"
"fmt"
"io"
"os"
"strings"
- "sync"
"time"
"github.com/mimecast/dtail/internal/io/line"
@@ -39,6 +37,28 @@ type readFile struct {
limiter chan struct{}
}
+func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
+ switch {
+ case strings.HasSuffix(f.FilePath(), ".gz"):
+ fallthrough
+ case strings.HasSuffix(f.FilePath(), ".gzip"):
+ logger.Info(f.FilePath(), "Detected gzip compression format")
+ var gzipReader *gzip.Reader
+ gzipReader, err = gzip.NewReader(fd)
+ if err != nil {
+ return
+ }
+ reader = bufio.NewReader(gzipReader)
+ case strings.HasSuffix(f.FilePath(), ".zst"):
+ logger.Info(f.FilePath(), "Detected zstd compression format")
+ reader = bufio.NewReader(zstd.NewReader(fd))
+ default:
+ reader = bufio.NewReader(fd)
+ }
+
+ return
+}
+
// String returns the string representation of the readFile
func (f readFile) String() string {
return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
@@ -91,58 +111,28 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c
}
rawLines := make(chan []byte, 100)
- truncate := make(chan struct{})
-
- var wg sync.WaitGroup
- wg.Add(1)
-
- go f.periodicTruncateCheck(ctx, truncate)
- go f.filter(ctx, lContext, &wg, rawLines, lines, re)
+ readCtx, readCancel := context.WithCancel(ctx)
+
+ filterDone := make(chan struct{})
+ go func() {
+ f.filter(ctx, rawLines, lines, re, lContext)
+ close(filterDone)
+ // If the filter stopped, make the reader stop too, no need to read
+ // more data if there is nothing more the filter wants to filter for!
+ // E.g. it could be that we only want to filter N matches but not more.
+ readCancel()
+ }()
- err = f.read(ctx, fd, rawLines, truncate)
+ err = f.read(readCtx, fd, rawLines)
close(rawLines)
- wg.Wait()
- return err
-}
-
-func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
- for {
- select {
- case <-time.After(time.Second * 3):
- select {
- case truncate <- struct{}{}:
- case <-ctx.Done():
- }
- case <-ctx.Done():
- return
- }
- }
-}
-
-func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
- switch {
- case strings.HasSuffix(f.FilePath(), ".gz"):
- fallthrough
- case strings.HasSuffix(f.FilePath(), ".gzip"):
- logger.Info(f.FilePath(), "Detected gzip compression format")
- var gzipReader *gzip.Reader
- gzipReader, err = gzip.NewReader(fd)
- if err != nil {
- return
- }
- reader = bufio.NewReader(gzipReader)
- case strings.HasSuffix(f.FilePath(), ".zst"):
- logger.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
- default:
- reader = bufio.NewReader(fd)
- }
+ // Filter may flushes some data still. So wait until it is done here.
+ <-filterDone
- return
+ return err
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error {
var offset uint64
reader, err := f.makeReader(fd)
@@ -154,6 +144,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
lineLengthThreshold := 1024 * 1024 // 1mb
longLineWarning := false
+ checkTruncate := f.truncateTimer(ctx)
+
for {
select {
case <-ctx.Done():
@@ -162,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
select {
- case <-truncate:
+ case <-checkTruncate:
if isTruncated, err := f.truncated(fd); isTruncated {
return err
}
@@ -226,129 +218,3 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
}
}
-
-// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
- defer wg.Done()
-
- for {
- select {
- case rawLine, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
-
- line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re)
- if transmittable {
- select {
- case lines <- line:
- continue
- case <-ctx.Done():
- return
- }
- }
- }
- }
-}
-
-// Filter log lines matching a given regular expression.
-func (f readFile) filterWithLContext(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
- defer wg.Done()
-
- var bPos, bCount int
- before := make([]*[]byte, lContext.BeforeContext)
-
- for {
- select {
- case rawLine, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
-
- if lContext.BeforeContext > 0 {
- before[bPos] = &rawLine
- bPos = (bPos + 1) % lContext.BeforeContext
- if bCount < lContext.BeforeContext {
- bCount++
- }
- }
-
- line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re)
- if transmittable {
- if lContext.BeforeContext > 0 {
- for bCount > 0 {
- bCount--
- }
- }
- select {
- case lines <- line:
- continue
- case <-ctx.Done():
- return
- }
- }
- // before[bPos] = line
- // bPos = (bPos+1) % lContext.BeforeContext
- // bCount = (bCount+1) % lContext.BeforeContext
- }
- }
-}
-
-func (f readFile) lineTransmittable(rawLine []byte, length, capacity int, re regex.Regex) (line.Line, bool, bool) {
- var read line.Line
-
- if !re.Match(rawLine) {
- f.updateLineNotMatched()
- f.updateLineNotTransmitted()
- return read, false, false
- }
- f.updateLineMatched()
-
- // Can we actually send more messages, channel capacity reached?
- if f.canSkipLines && length >= capacity {
- f.updateLineNotTransmitted()
- return read, true, false
- }
- f.updateLineTransmitted()
-
- read = line.Line{
- Content: rawLine,
- SourceID: f.globID,
- Count: f.totalLineCount(),
- TransmittedPerc: f.transmittedPerc(),
- }
-
- return read, true, true
-}
-
-// Check wether log file is truncated. Returns nil if not.
-func (f readFile) truncated(fd *os.File) (bool, error) {
- logger.Debug(f.filePath, "File truncation check")
-
- // Can not seek currently open FD.
- curPos, err := fd.Seek(0, os.SEEK_CUR)
- if err != nil {
- return true, err
- }
-
- // Can not open file at original path.
- pathFd, err := os.Open(f.filePath)
- if err != nil {
- return true, err
- }
- defer pathFd.Close()
-
- // Can not seek file at original path.
- pathPos, err := pathFd.Seek(0, io.SeekEnd)
- if err != nil {
- return true, err
- }
-
- if curPos > pathPos {
- return true, errors.New("File got truncated")
- }
-
- return false, nil
-}