summaryrefslogtreecommitdiff
path: root/internal/io/fs/readfile.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/io/fs/readfile.go')
-rw-r--r--internal/io/fs/readfile.go172
1 files changed, 43 insertions, 129 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 6757bd6..161e3f0 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -4,16 +4,15 @@ import (
"bufio"
"compress/gzip"
"context"
- "errors"
"fmt"
"io"
"os"
"strings"
- "sync"
"time"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
"github.com/DataDog/zstd"
@@ -38,6 +37,28 @@ type readFile struct {
limiter chan struct{}
}
+func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
+ switch {
+ case strings.HasSuffix(f.FilePath(), ".gz"):
+ fallthrough
+ case strings.HasSuffix(f.FilePath(), ".gzip"):
+ logger.Info(f.FilePath(), "Detected gzip compression format")
+ var gzipReader *gzip.Reader
+ gzipReader, err = gzip.NewReader(fd)
+ if err != nil {
+ return
+ }
+ reader = bufio.NewReader(gzipReader)
+ case strings.HasSuffix(f.FilePath(), ".zst"):
+ logger.Info(f.FilePath(), "Detected zstd compression format")
+ reader = bufio.NewReader(zstd.NewReader(fd))
+ default:
+ reader = bufio.NewReader(fd)
+ }
+
+ return
+}
+
// String returns the string representation of the readFile
func (f readFile) String() string {
return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
@@ -59,7 +80,7 @@ func (f readFile) Retry() bool {
}
// Start tailing a log file.
-func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error {
+func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error {
logger.Debug("readFile", f)
defer func() {
select {
@@ -90,58 +111,28 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re
}
rawLines := make(chan []byte, 100)
- truncate := make(chan struct{})
-
- var wg sync.WaitGroup
- wg.Add(1)
-
- go f.periodicTruncateCheck(ctx, truncate)
- go f.filter(ctx, &wg, rawLines, lines, re)
+ readCtx, readCancel := context.WithCancel(ctx)
+
+ filterDone := make(chan struct{})
+ go func() {
+ f.filter(ctx, rawLines, lines, re, lContext)
+ close(filterDone)
+ // If the filter stopped, make the reader stop too, no need to read
+ // more data if there is nothing more the filter wants to filter for!
+ // E.g. it could be that we only want to filter N matches but not more.
+ readCancel()
+ }()
- err = f.read(ctx, fd, rawLines, truncate)
+ err = f.read(readCtx, fd, rawLines)
close(rawLines)
- wg.Wait()
-
- return err
-}
-func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
- for {
- select {
- case <-time.After(time.Second * 3):
- select {
- case truncate <- struct{}{}:
- case <-ctx.Done():
- }
- case <-ctx.Done():
- return
- }
- }
-}
-
-func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
- switch {
- case strings.HasSuffix(f.FilePath(), ".gz"):
- fallthrough
- case strings.HasSuffix(f.FilePath(), ".gzip"):
- logger.Info(f.FilePath(), "Detected gzip compression format")
- var gzipReader *gzip.Reader
- gzipReader, err = gzip.NewReader(fd)
- if err != nil {
- return
- }
- reader = bufio.NewReader(gzipReader)
- case strings.HasSuffix(f.FilePath(), ".zst"):
- logger.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
- default:
- reader = bufio.NewReader(fd)
- }
+ // Filter may flushes some data still. So wait until it is done here.
+ <-filterDone
- return
+ return err
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error {
var offset uint64
reader, err := f.makeReader(fd)
@@ -153,6 +144,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
lineLengthThreshold := 1024 * 1024 // 1mb
longLineWarning := false
+ checkTruncate := f.truncateTimer(ctx)
+
for {
select {
case <-ctx.Done():
@@ -161,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
select {
- case <-truncate:
+ case <-checkTruncate:
if isTruncated, err := f.truncated(fd); isTruncated {
return err
}
@@ -225,82 +218,3 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
}
}
-
-// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
- defer wg.Done()
-
- for {
- select {
- case line, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
- if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok {
- select {
- case lines <- filteredLine:
- case <-ctx.Done():
- return
- }
- }
- }
- }
-}
-
-func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) {
- var read line.Line
-
- if !re.Match(lineBytes) {
- f.updateLineNotMatched()
- f.updateLineNotTransmitted()
- return read, false
- }
- f.updateLineMatched()
-
- // Can we actually send more messages, channel capacity reached?
- if f.canSkipLines && length >= capacity {
- f.updateLineNotTransmitted()
- return read, false
- }
- f.updateLineTransmitted()
-
- read = line.Line{
- Content: lineBytes,
- SourceID: f.globID,
- Count: f.totalLineCount(),
- TransmittedPerc: f.transmittedPerc(),
- }
-
- return read, true
-}
-
-// Check wether log file is truncated. Returns nil if not.
-func (f readFile) truncated(fd *os.File) (bool, error) {
- logger.Debug(f.filePath, "File truncation check")
-
- // Can not seek currently open FD.
- curPos, err := fd.Seek(0, os.SEEK_CUR)
- if err != nil {
- return true, err
- }
-
- // Can not open file at original path.
- pathFd, err := os.Open(f.filePath)
- if err != nil {
- return true, err
- }
- defer pathFd.Close()
-
- // Can not seek file at original path.
- pathPos, err := pathFd.Seek(0, io.SeekEnd)
- if err != nil {
- return true, err
- }
-
- if curPos > pathPos {
- return true, errors.New("File got truncated")
- }
-
- return false, nil
-}