summaryrefslogtreecommitdiff
path: root/internal/io
diff options
context:
space:
mode:
Diffstat (limited to 'internal/io')
-rw-r--r--internal/io/fs/filereader.go3
-rw-r--r--internal/io/fs/filter.go167
-rw-r--r--internal/io/fs/readfile.go172
-rw-r--r--internal/io/fs/truncate.go61
4 files changed, 273 insertions, 130 deletions
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
index 0774837..efd410e 100644
--- a/internal/io/fs/filereader.go
+++ b/internal/io/fs/filereader.go
@@ -4,12 +4,13 @@ import (
"context"
"github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
)
// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file.
type FileReader interface {
- Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error
+ Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error
FilePath() string
Retry() bool
}
diff --git a/internal/io/fs/filter.go b/internal/io/fs/filter.go
new file mode 100644
index 0000000..c4f605e
--- /dev/null
+++ b/internal/io/fs/filter.go
@@ -0,0 +1,167 @@
+package fs
+
+import (
+ "context"
+
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/regex"
+)
+
+func (f readFile) filter(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) {
+ // Do we have any kind of local context settings? If so then run the more complex
+ // filterWithLContext method.
+ if lContext.Has() {
+ // We can not skip transmitting any lines to the client with a local
+ // grep context specified.
+ f.canSkipLines = false
+ f.filterWithLContext(ctx, rawLines, lines, re, lContext)
+ return
+ }
+
+ f.filterWithoutLContext(ctx, rawLines, lines, re)
+}
+
+// Filter log lines matching a given regular expression, however with local grep context.
+func (f readFile) filterWithLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) {
+ // Scenario 1: Finish once maxCount hits found
+ maxCount := lContext.MaxCount
+ processMaxCount := maxCount > 0
+ maxReached := false
+
+ // Scenario 2: Print prev. N lines when current line matches.
+ before := lContext.BeforeContext
+ processBefore := before > 0
+ var beforeBuf chan []byte
+ if processBefore {
+ beforeBuf = make(chan []byte, before)
+ }
+
+ // Screnario 3: Print next N lines when current line matches.
+ after := 0
+ processAfter := lContext.AfterContext > 0
+
+ for rawLine := range rawLines {
+ // logger.Debug("rawLine", string(rawLine))
+ f.updatePosition()
+
+ if !re.Match(rawLine) {
+ f.updateLineNotMatched()
+
+ if processAfter && after > 0 {
+ after--
+ myLine := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100}
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+
+ } else if processBefore {
+ // Keep last num BeforeContext raw messages.
+ select {
+ case beforeBuf <- rawLine:
+ default:
+ <-beforeBuf
+ beforeBuf <- rawLine
+ }
+ }
+ continue
+ }
+
+ f.updateLineMatched()
+
+ if processAfter {
+ if maxReached {
+ return
+ }
+ after = lContext.AfterContext
+ }
+
+ if processBefore {
+ i := uint64(len(beforeBuf))
+ for {
+ select {
+ case myRawLine := <-beforeBuf:
+ myLine := line.Line{Content: myRawLine, SourceID: f.globID, Count: f.totalLineCount() - i, TransmittedPerc: 100}
+ i--
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+ default:
+ // beforeBuf is now empty.
+ }
+ if len(beforeBuf) == 0 {
+ break
+ }
+ }
+ }
+
+ line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100}
+
+ select {
+ case lines <- line:
+ if processMaxCount {
+ maxCount--
+ if maxCount == 0 {
+ if !processAfter || after == 0 {
+ return
+ }
+ // Unfortunatley we have to continue filter, as there might be more lines to print
+ maxReached = true
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression, there is no local grep context specified.
+func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
+ for {
+ select {
+ case rawLine, ok := <-rawLines:
+ f.updatePosition()
+ if !ok {
+ return
+ }
+
+ if f.lineUntransmittable(rawLine, len(lines), cap(lines), re) {
+ continue
+ }
+
+ line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc()}
+
+ select {
+ case lines <- line:
+ continue
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+}
+
+func (f readFile) lineUntransmittable(rawLine []byte, length, capacity int, re regex.Regex) bool {
+ if !re.Match(rawLine) {
+ f.updateLineNotMatched()
+ f.updateLineNotTransmitted()
+ // Regex dosn't match, so not interested in it.
+ return true
+ }
+ f.updateLineMatched()
+
+ // Can we actually send more messages, channel capacity reached?
+ if f.canSkipLines && length >= capacity {
+ f.updateLineNotTransmitted()
+ // Matching, not transmittable
+ return true
+ }
+ f.updateLineTransmitted()
+
+ // Matching, transmittable
+ return false
+}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 6757bd6..161e3f0 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -4,16 +4,15 @@ import (
"bufio"
"compress/gzip"
"context"
- "errors"
"fmt"
"io"
"os"
"strings"
- "sync"
"time"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
"github.com/DataDog/zstd"
@@ -38,6 +37,28 @@ type readFile struct {
limiter chan struct{}
}
+func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
+ switch {
+ case strings.HasSuffix(f.FilePath(), ".gz"):
+ fallthrough
+ case strings.HasSuffix(f.FilePath(), ".gzip"):
+ logger.Info(f.FilePath(), "Detected gzip compression format")
+ var gzipReader *gzip.Reader
+ gzipReader, err = gzip.NewReader(fd)
+ if err != nil {
+ return
+ }
+ reader = bufio.NewReader(gzipReader)
+ case strings.HasSuffix(f.FilePath(), ".zst"):
+ logger.Info(f.FilePath(), "Detected zstd compression format")
+ reader = bufio.NewReader(zstd.NewReader(fd))
+ default:
+ reader = bufio.NewReader(fd)
+ }
+
+ return
+}
+
// String returns the string representation of the readFile
func (f readFile) String() string {
return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
@@ -59,7 +80,7 @@ func (f readFile) Retry() bool {
}
// Start tailing a log file.
-func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error {
+func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error {
logger.Debug("readFile", f)
defer func() {
select {
@@ -90,58 +111,28 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re
}
rawLines := make(chan []byte, 100)
- truncate := make(chan struct{})
-
- var wg sync.WaitGroup
- wg.Add(1)
-
- go f.periodicTruncateCheck(ctx, truncate)
- go f.filter(ctx, &wg, rawLines, lines, re)
+ readCtx, readCancel := context.WithCancel(ctx)
+
+ filterDone := make(chan struct{})
+ go func() {
+ f.filter(ctx, rawLines, lines, re, lContext)
+ close(filterDone)
+ // If the filter stopped, make the reader stop too, no need to read
+ // more data if there is nothing more the filter wants to filter for!
+ // E.g. it could be that we only want to filter N matches but not more.
+ readCancel()
+ }()
- err = f.read(ctx, fd, rawLines, truncate)
+ err = f.read(readCtx, fd, rawLines)
close(rawLines)
- wg.Wait()
-
- return err
-}
-func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
- for {
- select {
- case <-time.After(time.Second * 3):
- select {
- case truncate <- struct{}{}:
- case <-ctx.Done():
- }
- case <-ctx.Done():
- return
- }
- }
-}
-
-func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
- switch {
- case strings.HasSuffix(f.FilePath(), ".gz"):
- fallthrough
- case strings.HasSuffix(f.FilePath(), ".gzip"):
- logger.Info(f.FilePath(), "Detected gzip compression format")
- var gzipReader *gzip.Reader
- gzipReader, err = gzip.NewReader(fd)
- if err != nil {
- return
- }
- reader = bufio.NewReader(gzipReader)
- case strings.HasSuffix(f.FilePath(), ".zst"):
- logger.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
- default:
- reader = bufio.NewReader(fd)
- }
+ // Filter may flushes some data still. So wait until it is done here.
+ <-filterDone
- return
+ return err
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error {
var offset uint64
reader, err := f.makeReader(fd)
@@ -153,6 +144,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
lineLengthThreshold := 1024 * 1024 // 1mb
longLineWarning := false
+ checkTruncate := f.truncateTimer(ctx)
+
for {
select {
case <-ctx.Done():
@@ -161,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
select {
- case <-truncate:
+ case <-checkTruncate:
if isTruncated, err := f.truncated(fd); isTruncated {
return err
}
@@ -225,82 +218,3 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
}
}
-
-// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
- defer wg.Done()
-
- for {
- select {
- case line, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
- if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok {
- select {
- case lines <- filteredLine:
- case <-ctx.Done():
- return
- }
- }
- }
- }
-}
-
-func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) {
- var read line.Line
-
- if !re.Match(lineBytes) {
- f.updateLineNotMatched()
- f.updateLineNotTransmitted()
- return read, false
- }
- f.updateLineMatched()
-
- // Can we actually send more messages, channel capacity reached?
- if f.canSkipLines && length >= capacity {
- f.updateLineNotTransmitted()
- return read, false
- }
- f.updateLineTransmitted()
-
- read = line.Line{
- Content: lineBytes,
- SourceID: f.globID,
- Count: f.totalLineCount(),
- TransmittedPerc: f.transmittedPerc(),
- }
-
- return read, true
-}
-
-// Check wether log file is truncated. Returns nil if not.
-func (f readFile) truncated(fd *os.File) (bool, error) {
- logger.Debug(f.filePath, "File truncation check")
-
- // Can not seek currently open FD.
- curPos, err := fd.Seek(0, os.SEEK_CUR)
- if err != nil {
- return true, err
- }
-
- // Can not open file at original path.
- pathFd, err := os.Open(f.filePath)
- if err != nil {
- return true, err
- }
- defer pathFd.Close()
-
- // Can not seek file at original path.
- pathPos, err := pathFd.Seek(0, io.SeekEnd)
- if err != nil {
- return true, err
- }
-
- if curPos > pathPos {
- return true, errors.New("File got truncated")
- }
-
- return false, nil
-}
diff --git a/internal/io/fs/truncate.go b/internal/io/fs/truncate.go
new file mode 100644
index 0000000..a8d59ac
--- /dev/null
+++ b/internal/io/fs/truncate.go
@@ -0,0 +1,61 @@
+package fs
+
+import (
+ "context"
+ "errors"
+ "io"
+ "os"
+ "time"
+
+ "github.com/mimecast/dtail/internal/io/logger"
+)
+
+func (f readFile) truncateTimer(ctx context.Context) (checkTruncate chan struct{}) {
+ checkTruncate = make(chan struct{})
+
+ go func() {
+ for {
+ select {
+ case <-time.After(time.Second * 3):
+ select {
+ case checkTruncate <- struct{}{}:
+ case <-ctx.Done():
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return
+}
+
+// Check wether log file is truncated. Returns nil if not.
+func (f readFile) truncated(fd *os.File) (bool, error) {
+ logger.Debug(f.filePath, "File truncation check")
+
+ // Can not seek currently open FD.
+ curPos, err := fd.Seek(0, os.SEEK_CUR)
+ if err != nil {
+ return true, err
+ }
+
+ // Can not open file at original path.
+ pathFd, err := os.Open(f.filePath)
+ if err != nil {
+ return true, err
+ }
+ defer pathFd.Close()
+
+ // Can not seek file at original path.
+ pathPos, err := pathFd.Seek(0, io.SeekEnd)
+ if err != nil {
+ return true, err
+ }
+
+ if curPos > pathPos {
+ return true, errors.New("File got truncated")
+ }
+
+ return false, nil
+}