diff options
Diffstat (limited to 'internal/io')
| -rw-r--r-- | internal/io/fs/filereader.go | 3 | ||||
| -rw-r--r-- | internal/io/fs/filter.go | 167 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 172 | ||||
| -rw-r--r-- | internal/io/fs/truncate.go | 61 |
4 files changed, 273 insertions, 130 deletions
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index 0774837..efd410e 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -4,12 +4,13 @@ import ( "context" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" ) // FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file. type FileReader interface { - Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error + Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error FilePath() string Retry() bool } diff --git a/internal/io/fs/filter.go b/internal/io/fs/filter.go new file mode 100644 index 0000000..c4f605e --- /dev/null +++ b/internal/io/fs/filter.go @@ -0,0 +1,167 @@ +package fs + +import ( + "context" + + "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" + "github.com/mimecast/dtail/internal/regex" +) + +func (f readFile) filter(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) { + // Do we have any kind of local context settings? If so then run the more complex + // filterWithLContext method. + if lContext.Has() { + // We can not skip transmitting any lines to the client with a local + // grep context specified. + f.canSkipLines = false + f.filterWithLContext(ctx, rawLines, lines, re, lContext) + return + } + + f.filterWithoutLContext(ctx, rawLines, lines, re) +} + +// Filter log lines matching a given regular expression, however with local grep context. +func (f readFile) filterWithLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) { + // Scenario 1: Finish once maxCount hits found + maxCount := lContext.MaxCount + processMaxCount := maxCount > 0 + maxReached := false + + // Scenario 2: Print prev. N lines when current line matches. + before := lContext.BeforeContext + processBefore := before > 0 + var beforeBuf chan []byte + if processBefore { + beforeBuf = make(chan []byte, before) + } + + // Screnario 3: Print next N lines when current line matches. + after := 0 + processAfter := lContext.AfterContext > 0 + + for rawLine := range rawLines { + // logger.Debug("rawLine", string(rawLine)) + f.updatePosition() + + if !re.Match(rawLine) { + f.updateLineNotMatched() + + if processAfter && after > 0 { + after-- + myLine := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100} + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + + } else if processBefore { + // Keep last num BeforeContext raw messages. + select { + case beforeBuf <- rawLine: + default: + <-beforeBuf + beforeBuf <- rawLine + } + } + continue + } + + f.updateLineMatched() + + if processAfter { + if maxReached { + return + } + after = lContext.AfterContext + } + + if processBefore { + i := uint64(len(beforeBuf)) + for { + select { + case myRawLine := <-beforeBuf: + myLine := line.Line{Content: myRawLine, SourceID: f.globID, Count: f.totalLineCount() - i, TransmittedPerc: 100} + i-- + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + default: + // beforeBuf is now empty. + } + if len(beforeBuf) == 0 { + break + } + } + } + + line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100} + + select { + case lines <- line: + if processMaxCount { + maxCount-- + if maxCount == 0 { + if !processAfter || after == 0 { + return + } + // Unfortunatley we have to continue filter, as there might be more lines to print + maxReached = true + } + } + case <-ctx.Done(): + return + } + } +} + +// Filter log lines matching a given regular expression, there is no local grep context specified. +func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { + for { + select { + case rawLine, ok := <-rawLines: + f.updatePosition() + if !ok { + return + } + + if f.lineUntransmittable(rawLine, len(lines), cap(lines), re) { + continue + } + + line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc()} + + select { + case lines <- line: + continue + case <-ctx.Done(): + return + } + } + } +} + +func (f readFile) lineUntransmittable(rawLine []byte, length, capacity int, re regex.Regex) bool { + if !re.Match(rawLine) { + f.updateLineNotMatched() + f.updateLineNotTransmitted() + // Regex dosn't match, so not interested in it. + return true + } + f.updateLineMatched() + + // Can we actually send more messages, channel capacity reached? + if f.canSkipLines && length >= capacity { + f.updateLineNotTransmitted() + // Matching, not transmittable + return true + } + f.updateLineTransmitted() + + // Matching, transmittable + return false +} diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 6757bd6..161e3f0 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -4,16 +4,15 @@ import ( "bufio" "compress/gzip" "context" - "errors" "fmt" "io" "os" "strings" - "sync" "time" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -38,6 +37,28 @@ type readFile struct { limiter chan struct{} } +func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { + switch { + case strings.HasSuffix(f.FilePath(), ".gz"): + fallthrough + case strings.HasSuffix(f.FilePath(), ".gzip"): + logger.Info(f.FilePath(), "Detected gzip compression format") + var gzipReader *gzip.Reader + gzipReader, err = gzip.NewReader(fd) + if err != nil { + return + } + reader = bufio.NewReader(gzipReader) + case strings.HasSuffix(f.FilePath(), ".zst"): + logger.Info(f.FilePath(), "Detected zstd compression format") + reader = bufio.NewReader(zstd.NewReader(fd)) + default: + reader = bufio.NewReader(fd) + } + + return +} + // String returns the string representation of the readFile func (f readFile) String() string { return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", @@ -59,7 +80,7 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error { +func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error { logger.Debug("readFile", f) defer func() { select { @@ -90,58 +111,28 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re } rawLines := make(chan []byte, 100) - truncate := make(chan struct{}) - - var wg sync.WaitGroup - wg.Add(1) - - go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, &wg, rawLines, lines, re) + readCtx, readCancel := context.WithCancel(ctx) + + filterDone := make(chan struct{}) + go func() { + f.filter(ctx, rawLines, lines, re, lContext) + close(filterDone) + // If the filter stopped, make the reader stop too, no need to read + // more data if there is nothing more the filter wants to filter for! + // E.g. it could be that we only want to filter N matches but not more. + readCancel() + }() - err = f.read(ctx, fd, rawLines, truncate) + err = f.read(readCtx, fd, rawLines) close(rawLines) - wg.Wait() - - return err -} -func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { - for { - select { - case <-time.After(time.Second * 3): - select { - case truncate <- struct{}{}: - case <-ctx.Done(): - } - case <-ctx.Done(): - return - } - } -} - -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { - switch { - case strings.HasSuffix(f.FilePath(), ".gz"): - fallthrough - case strings.HasSuffix(f.FilePath(), ".gzip"): - logger.Info(f.FilePath(), "Detected gzip compression format") - var gzipReader *gzip.Reader - gzipReader, err = gzip.NewReader(fd) - if err != nil { - return - } - reader = bufio.NewReader(gzipReader) - case strings.HasSuffix(f.FilePath(), ".zst"): - logger.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) - default: - reader = bufio.NewReader(fd) - } + // Filter may flushes some data still. So wait until it is done here. + <-filterDone - return + return err } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { +func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error { var offset uint64 reader, err := f.makeReader(fd) @@ -153,6 +144,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t lineLengthThreshold := 1024 * 1024 // 1mb longLineWarning := false + checkTruncate := f.truncateTimer(ctx) + for { select { case <-ctx.Done(): @@ -161,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } select { - case <-truncate: + case <-checkTruncate: if isTruncated, err := f.truncated(fd); isTruncated { return err } @@ -225,82 +218,3 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } } } - -// Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() - - for { - select { - case line, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { - select { - case lines <- filteredLine: - case <-ctx.Done(): - return - } - } - } - } -} - -func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) { - var read line.Line - - if !re.Match(lineBytes) { - f.updateLineNotMatched() - f.updateLineNotTransmitted() - return read, false - } - f.updateLineMatched() - - // Can we actually send more messages, channel capacity reached? - if f.canSkipLines && length >= capacity { - f.updateLineNotTransmitted() - return read, false - } - f.updateLineTransmitted() - - read = line.Line{ - Content: lineBytes, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: f.transmittedPerc(), - } - - return read, true -} - -// Check wether log file is truncated. Returns nil if not. -func (f readFile) truncated(fd *os.File) (bool, error) { - logger.Debug(f.filePath, "File truncation check") - - // Can not seek currently open FD. - curPos, err := fd.Seek(0, os.SEEK_CUR) - if err != nil { - return true, err - } - - // Can not open file at original path. - pathFd, err := os.Open(f.filePath) - if err != nil { - return true, err - } - defer pathFd.Close() - - // Can not seek file at original path. - pathPos, err := pathFd.Seek(0, io.SeekEnd) - if err != nil { - return true, err - } - - if curPos > pathPos { - return true, errors.New("File got truncated") - } - - return false, nil -} diff --git a/internal/io/fs/truncate.go b/internal/io/fs/truncate.go new file mode 100644 index 0000000..a8d59ac --- /dev/null +++ b/internal/io/fs/truncate.go @@ -0,0 +1,61 @@ +package fs + +import ( + "context" + "errors" + "io" + "os" + "time" + + "github.com/mimecast/dtail/internal/io/logger" +) + +func (f readFile) truncateTimer(ctx context.Context) (checkTruncate chan struct{}) { + checkTruncate = make(chan struct{}) + + go func() { + for { + select { + case <-time.After(time.Second * 3): + select { + case checkTruncate <- struct{}{}: + case <-ctx.Done(): + } + case <-ctx.Done(): + return + } + } + }() + + return +} + +// Check wether log file is truncated. Returns nil if not. +func (f readFile) truncated(fd *os.File) (bool, error) { + logger.Debug(f.filePath, "File truncation check") + + // Can not seek currently open FD. + curPos, err := fd.Seek(0, os.SEEK_CUR) + if err != nil { + return true, err + } + + // Can not open file at original path. + pathFd, err := os.Open(f.filePath) + if err != nil { + return true, err + } + defer pathFd.Close() + + // Can not seek file at original path. + pathPos, err := pathFd.Seek(0, io.SeekEnd) + if err != nil { + return true, err + } + + if curPos > pathPos { + return true, errors.New("File got truncated") + } + + return false, nil +} |
