diff options
| author | Paul Buetow <git@mx.buetow.org> | 2021-03-23 09:37:02 +0000 |
|---|---|---|
| committer | Paul Buetow <git@mx.buetow.org> | 2021-03-23 20:12:09 +0000 |
| commit | 2b47630c2f68794a95d5065a7989d489990f7a19 (patch) | |
| tree | 389166c157ebc7cec690a967b02255d9337c6988 | |
| parent | d6dd896805faa074960f17bd1e8c516420e27f0d (diff) | |
context aware grep with -max -after and -before not work
| -rw-r--r-- | docker/Dockerfile | 3 | ||||
| -rw-r--r-- | docker/Makefile | 3 | ||||
| -rw-r--r-- | internal/io/fs/filter.go | 167 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 216 | ||||
| -rw-r--r-- | internal/io/fs/truncate.go | 61 | ||||
| -rw-r--r-- | internal/lcontext/lcontext.go | 14 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 2 | ||||
| -rw-r--r-- | internal/version/version.go | 4 |
8 files changed, 290 insertions, 180 deletions
diff --git a/docker/Dockerfile b/docker/Dockerfile index 8632832..61a1f7d 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -14,5 +14,6 @@ USER dserver WORKDIR /var/run/dserver EXPOSE 2222/tcp +EXPOSE 8080/tcp -CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json"] +CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json", "-pprof", "8080"] diff --git a/docker/Makefile b/docker/Makefile index 029adf6..f09d9e0 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -3,7 +3,8 @@ all: docker build . -t dserver:develop rm ./dserver run: - docker run -p 2222:2222 dserver:develop + # http://localhost:8080/debug/pprof/goroutines?debug=1 + docker run -p 2222:2222 -p 8080:8080 dserver:develop spinup: ./spinup.sh 10 spindown: diff --git a/internal/io/fs/filter.go b/internal/io/fs/filter.go new file mode 100644 index 0000000..c4f605e --- /dev/null +++ b/internal/io/fs/filter.go @@ -0,0 +1,167 @@ +package fs + +import ( + "context" + + "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" + "github.com/mimecast/dtail/internal/regex" +) + +func (f readFile) filter(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) { + // Do we have any kind of local context settings? If so then run the more complex + // filterWithLContext method. + if lContext.Has() { + // We can not skip transmitting any lines to the client with a local + // grep context specified. + f.canSkipLines = false + f.filterWithLContext(ctx, rawLines, lines, re, lContext) + return + } + + f.filterWithoutLContext(ctx, rawLines, lines, re) +} + +// Filter log lines matching a given regular expression, however with local grep context. +func (f readFile) filterWithLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) { + // Scenario 1: Finish once maxCount hits found + maxCount := lContext.MaxCount + processMaxCount := maxCount > 0 + maxReached := false + + // Scenario 2: Print prev. N lines when current line matches. + before := lContext.BeforeContext + processBefore := before > 0 + var beforeBuf chan []byte + if processBefore { + beforeBuf = make(chan []byte, before) + } + + // Screnario 3: Print next N lines when current line matches. + after := 0 + processAfter := lContext.AfterContext > 0 + + for rawLine := range rawLines { + // logger.Debug("rawLine", string(rawLine)) + f.updatePosition() + + if !re.Match(rawLine) { + f.updateLineNotMatched() + + if processAfter && after > 0 { + after-- + myLine := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100} + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + + } else if processBefore { + // Keep last num BeforeContext raw messages. + select { + case beforeBuf <- rawLine: + default: + <-beforeBuf + beforeBuf <- rawLine + } + } + continue + } + + f.updateLineMatched() + + if processAfter { + if maxReached { + return + } + after = lContext.AfterContext + } + + if processBefore { + i := uint64(len(beforeBuf)) + for { + select { + case myRawLine := <-beforeBuf: + myLine := line.Line{Content: myRawLine, SourceID: f.globID, Count: f.totalLineCount() - i, TransmittedPerc: 100} + i-- + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + default: + // beforeBuf is now empty. + } + if len(beforeBuf) == 0 { + break + } + } + } + + line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100} + + select { + case lines <- line: + if processMaxCount { + maxCount-- + if maxCount == 0 { + if !processAfter || after == 0 { + return + } + // Unfortunatley we have to continue filter, as there might be more lines to print + maxReached = true + } + } + case <-ctx.Done(): + return + } + } +} + +// Filter log lines matching a given regular expression, there is no local grep context specified. +func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { + for { + select { + case rawLine, ok := <-rawLines: + f.updatePosition() + if !ok { + return + } + + if f.lineUntransmittable(rawLine, len(lines), cap(lines), re) { + continue + } + + line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc()} + + select { + case lines <- line: + continue + case <-ctx.Done(): + return + } + } + } +} + +func (f readFile) lineUntransmittable(rawLine []byte, length, capacity int, re regex.Regex) bool { + if !re.Match(rawLine) { + f.updateLineNotMatched() + f.updateLineNotTransmitted() + // Regex dosn't match, so not interested in it. + return true + } + f.updateLineMatched() + + // Can we actually send more messages, channel capacity reached? + if f.canSkipLines && length >= capacity { + f.updateLineNotTransmitted() + // Matching, not transmittable + return true + } + f.updateLineTransmitted() + + // Matching, transmittable + return false +} diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 4ac82d8..161e3f0 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -4,12 +4,10 @@ import ( "bufio" "compress/gzip" "context" - "errors" "fmt" "io" "os" "strings" - "sync" "time" "github.com/mimecast/dtail/internal/io/line" @@ -39,6 +37,28 @@ type readFile struct { limiter chan struct{} } +func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { + switch { + case strings.HasSuffix(f.FilePath(), ".gz"): + fallthrough + case strings.HasSuffix(f.FilePath(), ".gzip"): + logger.Info(f.FilePath(), "Detected gzip compression format") + var gzipReader *gzip.Reader + gzipReader, err = gzip.NewReader(fd) + if err != nil { + return + } + reader = bufio.NewReader(gzipReader) + case strings.HasSuffix(f.FilePath(), ".zst"): + logger.Info(f.FilePath(), "Detected zstd compression format") + reader = bufio.NewReader(zstd.NewReader(fd)) + default: + reader = bufio.NewReader(fd) + } + + return +} + // String returns the string representation of the readFile func (f readFile) String() string { return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", @@ -91,58 +111,28 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c } rawLines := make(chan []byte, 100) - truncate := make(chan struct{}) - - var wg sync.WaitGroup - wg.Add(1) - - go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, lContext, &wg, rawLines, lines, re) + readCtx, readCancel := context.WithCancel(ctx) + + filterDone := make(chan struct{}) + go func() { + f.filter(ctx, rawLines, lines, re, lContext) + close(filterDone) + // If the filter stopped, make the reader stop too, no need to read + // more data if there is nothing more the filter wants to filter for! + // E.g. it could be that we only want to filter N matches but not more. + readCancel() + }() - err = f.read(ctx, fd, rawLines, truncate) + err = f.read(readCtx, fd, rawLines) close(rawLines) - wg.Wait() - return err -} - -func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { - for { - select { - case <-time.After(time.Second * 3): - select { - case truncate <- struct{}{}: - case <-ctx.Done(): - } - case <-ctx.Done(): - return - } - } -} - -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { - switch { - case strings.HasSuffix(f.FilePath(), ".gz"): - fallthrough - case strings.HasSuffix(f.FilePath(), ".gzip"): - logger.Info(f.FilePath(), "Detected gzip compression format") - var gzipReader *gzip.Reader - gzipReader, err = gzip.NewReader(fd) - if err != nil { - return - } - reader = bufio.NewReader(gzipReader) - case strings.HasSuffix(f.FilePath(), ".zst"): - logger.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) - default: - reader = bufio.NewReader(fd) - } + // Filter may flushes some data still. So wait until it is done here. + <-filterDone - return + return err } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { +func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error { var offset uint64 reader, err := f.makeReader(fd) @@ -154,6 +144,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t lineLengthThreshold := 1024 * 1024 // 1mb longLineWarning := false + checkTruncate := f.truncateTimer(ctx) + for { select { case <-ctx.Done(): @@ -162,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } select { - case <-truncate: + case <-checkTruncate: if isTruncated, err := f.truncated(fd); isTruncated { return err } @@ -226,129 +218,3 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } } } - -// Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() - - for { - select { - case rawLine, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - - line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re) - if transmittable { - select { - case lines <- line: - continue - case <-ctx.Done(): - return - } - } - } - } -} - -// Filter log lines matching a given regular expression. -func (f readFile) filterWithLContext(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() - - var bPos, bCount int - before := make([]*[]byte, lContext.BeforeContext) - - for { - select { - case rawLine, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - - if lContext.BeforeContext > 0 { - before[bPos] = &rawLine - bPos = (bPos + 1) % lContext.BeforeContext - if bCount < lContext.BeforeContext { - bCount++ - } - } - - line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re) - if transmittable { - if lContext.BeforeContext > 0 { - for bCount > 0 { - bCount-- - } - } - select { - case lines <- line: - continue - case <-ctx.Done(): - return - } - } - // before[bPos] = line - // bPos = (bPos+1) % lContext.BeforeContext - // bCount = (bCount+1) % lContext.BeforeContext - } - } -} - -func (f readFile) lineTransmittable(rawLine []byte, length, capacity int, re regex.Regex) (line.Line, bool, bool) { - var read line.Line - - if !re.Match(rawLine) { - f.updateLineNotMatched() - f.updateLineNotTransmitted() - return read, false, false - } - f.updateLineMatched() - - // Can we actually send more messages, channel capacity reached? - if f.canSkipLines && length >= capacity { - f.updateLineNotTransmitted() - return read, true, false - } - f.updateLineTransmitted() - - read = line.Line{ - Content: rawLine, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: f.transmittedPerc(), - } - - return read, true, true -} - -// Check wether log file is truncated. Returns nil if not. -func (f readFile) truncated(fd *os.File) (bool, error) { - logger.Debug(f.filePath, "File truncation check") - - // Can not seek currently open FD. - curPos, err := fd.Seek(0, os.SEEK_CUR) - if err != nil { - return true, err - } - - // Can not open file at original path. - pathFd, err := os.Open(f.filePath) - if err != nil { - return true, err - } - defer pathFd.Close() - - // Can not seek file at original path. - pathPos, err := pathFd.Seek(0, io.SeekEnd) - if err != nil { - return true, err - } - - if curPos > pathPos { - return true, errors.New("File got truncated") - } - - return false, nil -} diff --git a/internal/io/fs/truncate.go b/internal/io/fs/truncate.go new file mode 100644 index 0000000..a8d59ac --- /dev/null +++ b/internal/io/fs/truncate.go @@ -0,0 +1,61 @@ +package fs + +import ( + "context" + "errors" + "io" + "os" + "time" + + "github.com/mimecast/dtail/internal/io/logger" +) + +func (f readFile) truncateTimer(ctx context.Context) (checkTruncate chan struct{}) { + checkTruncate = make(chan struct{}) + + go func() { + for { + select { + case <-time.After(time.Second * 3): + select { + case checkTruncate <- struct{}{}: + case <-ctx.Done(): + } + case <-ctx.Done(): + return + } + } + }() + + return +} + +// Check wether log file is truncated. Returns nil if not. +func (f readFile) truncated(fd *os.File) (bool, error) { + logger.Debug(f.filePath, "File truncation check") + + // Can not seek currently open FD. + curPos, err := fd.Seek(0, os.SEEK_CUR) + if err != nil { + return true, err + } + + // Can not open file at original path. + pathFd, err := os.Open(f.filePath) + if err != nil { + return true, err + } + defer pathFd.Close() + + // Can not seek file at original path. + pathPos, err := pathFd.Seek(0, io.SeekEnd) + if err != nil { + return true, err + } + + if curPos > pathPos { + return true, errors.New("File got truncated") + } + + return false, nil +} diff --git a/internal/lcontext/lcontext.go b/internal/lcontext/lcontext.go index bd51d94..89cb7c3 100644 --- a/internal/lcontext/lcontext.go +++ b/internal/lcontext/lcontext.go @@ -6,3 +6,17 @@ type LContext struct { BeforeContext int MaxCount int } + +// Has returns true if it has any parameter set. +func (c LContext) Has() bool { + if c.AfterContext > 0 { + return true + } + if c.BeforeContext > 0 { + return true + } + if c.MaxCount > 0 { + return true + } + return false +} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 07933d0..7da6012 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -401,7 +401,7 @@ func readOptions(opts []string) (map[string]string, lcontext.LContext, error) { for _, o := range opts { kv := strings.SplitN(o, "=", 2) if len(kv) != 2 { - return options, lContext, fmt.Errorf("Unable to parse options: %v", kv) + continue } key := kv[0] val := kv[1] diff --git a/internal/version/version.go b/internal/version/version.go index a6d6b05..f9acd56 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -11,9 +11,9 @@ const ( // Name of DTail. Name string = "DTail" // Version of DTail. - Version string = "3.2.2" + Version string = "3.3.0" // Additional information for DTail - Additional string = "" + Additional string = "develop" // ProtocolCompat -ibility version. ProtocolCompat string = "3" ) |
