summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <git@mx.buetow.org>2021-03-23 09:37:02 +0000
committerPaul Buetow <git@mx.buetow.org>2021-03-23 20:12:09 +0000
commit2b47630c2f68794a95d5065a7989d489990f7a19 (patch)
tree389166c157ebc7cec690a967b02255d9337c6988
parentd6dd896805faa074960f17bd1e8c516420e27f0d (diff)
context aware grep with -max -after and -before not work
-rw-r--r--docker/Dockerfile3
-rw-r--r--docker/Makefile3
-rw-r--r--internal/io/fs/filter.go167
-rw-r--r--internal/io/fs/readfile.go216
-rw-r--r--internal/io/fs/truncate.go61
-rw-r--r--internal/lcontext/lcontext.go14
-rw-r--r--internal/server/handlers/serverhandler.go2
-rw-r--r--internal/version/version.go4
8 files changed, 290 insertions, 180 deletions
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 8632832..61a1f7d 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -14,5 +14,6 @@ USER dserver
WORKDIR /var/run/dserver
EXPOSE 2222/tcp
+EXPOSE 8080/tcp
-CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json"]
+CMD ["/usr/local/bin/dserver", "-relaxedAuth", "-cfg", "/etc/dserver/dtail.json", "-pprof", "8080"]
diff --git a/docker/Makefile b/docker/Makefile
index 029adf6..f09d9e0 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -3,7 +3,8 @@ all:
docker build . -t dserver:develop
rm ./dserver
run:
- docker run -p 2222:2222 dserver:develop
+ # http://localhost:8080/debug/pprof/goroutines?debug=1
+ docker run -p 2222:2222 -p 8080:8080 dserver:develop
spinup:
./spinup.sh 10
spindown:
diff --git a/internal/io/fs/filter.go b/internal/io/fs/filter.go
new file mode 100644
index 0000000..c4f605e
--- /dev/null
+++ b/internal/io/fs/filter.go
@@ -0,0 +1,167 @@
+package fs
+
+import (
+ "context"
+
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
+ "github.com/mimecast/dtail/internal/regex"
+)
+
+func (f readFile) filter(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) {
+ // Do we have any kind of local context settings? If so then run the more complex
+ // filterWithLContext method.
+ if lContext.Has() {
+ // We can not skip transmitting any lines to the client with a local
+ // grep context specified.
+ f.canSkipLines = false
+ f.filterWithLContext(ctx, rawLines, lines, re, lContext)
+ return
+ }
+
+ f.filterWithoutLContext(ctx, rawLines, lines, re)
+}
+
+// Filter log lines matching a given regular expression, however with local grep context.
+func (f readFile) filterWithLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) {
+ // Scenario 1: Finish once maxCount hits found
+ maxCount := lContext.MaxCount
+ processMaxCount := maxCount > 0
+ maxReached := false
+
+ // Scenario 2: Print prev. N lines when current line matches.
+ before := lContext.BeforeContext
+ processBefore := before > 0
+ var beforeBuf chan []byte
+ if processBefore {
+ beforeBuf = make(chan []byte, before)
+ }
+
+ // Screnario 3: Print next N lines when current line matches.
+ after := 0
+ processAfter := lContext.AfterContext > 0
+
+ for rawLine := range rawLines {
+ // logger.Debug("rawLine", string(rawLine))
+ f.updatePosition()
+
+ if !re.Match(rawLine) {
+ f.updateLineNotMatched()
+
+ if processAfter && after > 0 {
+ after--
+ myLine := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100}
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+
+ } else if processBefore {
+ // Keep last num BeforeContext raw messages.
+ select {
+ case beforeBuf <- rawLine:
+ default:
+ <-beforeBuf
+ beforeBuf <- rawLine
+ }
+ }
+ continue
+ }
+
+ f.updateLineMatched()
+
+ if processAfter {
+ if maxReached {
+ return
+ }
+ after = lContext.AfterContext
+ }
+
+ if processBefore {
+ i := uint64(len(beforeBuf))
+ for {
+ select {
+ case myRawLine := <-beforeBuf:
+ myLine := line.Line{Content: myRawLine, SourceID: f.globID, Count: f.totalLineCount() - i, TransmittedPerc: 100}
+ i--
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+ default:
+ // beforeBuf is now empty.
+ }
+ if len(beforeBuf) == 0 {
+ break
+ }
+ }
+ }
+
+ line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100}
+
+ select {
+ case lines <- line:
+ if processMaxCount {
+ maxCount--
+ if maxCount == 0 {
+ if !processAfter || after == 0 {
+ return
+ }
+ // Unfortunatley we have to continue filter, as there might be more lines to print
+ maxReached = true
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression, there is no local grep context specified.
+func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
+ for {
+ select {
+ case rawLine, ok := <-rawLines:
+ f.updatePosition()
+ if !ok {
+ return
+ }
+
+ if f.lineUntransmittable(rawLine, len(lines), cap(lines), re) {
+ continue
+ }
+
+ line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc()}
+
+ select {
+ case lines <- line:
+ continue
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+}
+
+func (f readFile) lineUntransmittable(rawLine []byte, length, capacity int, re regex.Regex) bool {
+ if !re.Match(rawLine) {
+ f.updateLineNotMatched()
+ f.updateLineNotTransmitted()
+ // Regex dosn't match, so not interested in it.
+ return true
+ }
+ f.updateLineMatched()
+
+ // Can we actually send more messages, channel capacity reached?
+ if f.canSkipLines && length >= capacity {
+ f.updateLineNotTransmitted()
+ // Matching, not transmittable
+ return true
+ }
+ f.updateLineTransmitted()
+
+ // Matching, transmittable
+ return false
+}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 4ac82d8..161e3f0 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -4,12 +4,10 @@ import (
"bufio"
"compress/gzip"
"context"
- "errors"
"fmt"
"io"
"os"
"strings"
- "sync"
"time"
"github.com/mimecast/dtail/internal/io/line"
@@ -39,6 +37,28 @@ type readFile struct {
limiter chan struct{}
}
+func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
+ switch {
+ case strings.HasSuffix(f.FilePath(), ".gz"):
+ fallthrough
+ case strings.HasSuffix(f.FilePath(), ".gzip"):
+ logger.Info(f.FilePath(), "Detected gzip compression format")
+ var gzipReader *gzip.Reader
+ gzipReader, err = gzip.NewReader(fd)
+ if err != nil {
+ return
+ }
+ reader = bufio.NewReader(gzipReader)
+ case strings.HasSuffix(f.FilePath(), ".zst"):
+ logger.Info(f.FilePath(), "Detected zstd compression format")
+ reader = bufio.NewReader(zstd.NewReader(fd))
+ default:
+ reader = bufio.NewReader(fd)
+ }
+
+ return
+}
+
// String returns the string representation of the readFile
func (f readFile) String() string {
return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
@@ -91,58 +111,28 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c
}
rawLines := make(chan []byte, 100)
- truncate := make(chan struct{})
-
- var wg sync.WaitGroup
- wg.Add(1)
-
- go f.periodicTruncateCheck(ctx, truncate)
- go f.filter(ctx, lContext, &wg, rawLines, lines, re)
+ readCtx, readCancel := context.WithCancel(ctx)
+
+ filterDone := make(chan struct{})
+ go func() {
+ f.filter(ctx, rawLines, lines, re, lContext)
+ close(filterDone)
+ // If the filter stopped, make the reader stop too, no need to read
+ // more data if there is nothing more the filter wants to filter for!
+ // E.g. it could be that we only want to filter N matches but not more.
+ readCancel()
+ }()
- err = f.read(ctx, fd, rawLines, truncate)
+ err = f.read(readCtx, fd, rawLines)
close(rawLines)
- wg.Wait()
- return err
-}
-
-func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
- for {
- select {
- case <-time.After(time.Second * 3):
- select {
- case truncate <- struct{}{}:
- case <-ctx.Done():
- }
- case <-ctx.Done():
- return
- }
- }
-}
-
-func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
- switch {
- case strings.HasSuffix(f.FilePath(), ".gz"):
- fallthrough
- case strings.HasSuffix(f.FilePath(), ".gzip"):
- logger.Info(f.FilePath(), "Detected gzip compression format")
- var gzipReader *gzip.Reader
- gzipReader, err = gzip.NewReader(fd)
- if err != nil {
- return
- }
- reader = bufio.NewReader(gzipReader)
- case strings.HasSuffix(f.FilePath(), ".zst"):
- logger.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
- default:
- reader = bufio.NewReader(fd)
- }
+ // Filter may flushes some data still. So wait until it is done here.
+ <-filterDone
- return
+ return err
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error {
var offset uint64
reader, err := f.makeReader(fd)
@@ -154,6 +144,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
lineLengthThreshold := 1024 * 1024 // 1mb
longLineWarning := false
+ checkTruncate := f.truncateTimer(ctx)
+
for {
select {
case <-ctx.Done():
@@ -162,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
select {
- case <-truncate:
+ case <-checkTruncate:
if isTruncated, err := f.truncated(fd); isTruncated {
return err
}
@@ -226,129 +218,3 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
}
}
-
-// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
- defer wg.Done()
-
- for {
- select {
- case rawLine, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
-
- line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re)
- if transmittable {
- select {
- case lines <- line:
- continue
- case <-ctx.Done():
- return
- }
- }
- }
- }
-}
-
-// Filter log lines matching a given regular expression.
-func (f readFile) filterWithLContext(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
- defer wg.Done()
-
- var bPos, bCount int
- before := make([]*[]byte, lContext.BeforeContext)
-
- for {
- select {
- case rawLine, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
-
- if lContext.BeforeContext > 0 {
- before[bPos] = &rawLine
- bPos = (bPos + 1) % lContext.BeforeContext
- if bCount < lContext.BeforeContext {
- bCount++
- }
- }
-
- line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re)
- if transmittable {
- if lContext.BeforeContext > 0 {
- for bCount > 0 {
- bCount--
- }
- }
- select {
- case lines <- line:
- continue
- case <-ctx.Done():
- return
- }
- }
- // before[bPos] = line
- // bPos = (bPos+1) % lContext.BeforeContext
- // bCount = (bCount+1) % lContext.BeforeContext
- }
- }
-}
-
-func (f readFile) lineTransmittable(rawLine []byte, length, capacity int, re regex.Regex) (line.Line, bool, bool) {
- var read line.Line
-
- if !re.Match(rawLine) {
- f.updateLineNotMatched()
- f.updateLineNotTransmitted()
- return read, false, false
- }
- f.updateLineMatched()
-
- // Can we actually send more messages, channel capacity reached?
- if f.canSkipLines && length >= capacity {
- f.updateLineNotTransmitted()
- return read, true, false
- }
- f.updateLineTransmitted()
-
- read = line.Line{
- Content: rawLine,
- SourceID: f.globID,
- Count: f.totalLineCount(),
- TransmittedPerc: f.transmittedPerc(),
- }
-
- return read, true, true
-}
-
-// Check wether log file is truncated. Returns nil if not.
-func (f readFile) truncated(fd *os.File) (bool, error) {
- logger.Debug(f.filePath, "File truncation check")
-
- // Can not seek currently open FD.
- curPos, err := fd.Seek(0, os.SEEK_CUR)
- if err != nil {
- return true, err
- }
-
- // Can not open file at original path.
- pathFd, err := os.Open(f.filePath)
- if err != nil {
- return true, err
- }
- defer pathFd.Close()
-
- // Can not seek file at original path.
- pathPos, err := pathFd.Seek(0, io.SeekEnd)
- if err != nil {
- return true, err
- }
-
- if curPos > pathPos {
- return true, errors.New("File got truncated")
- }
-
- return false, nil
-}
diff --git a/internal/io/fs/truncate.go b/internal/io/fs/truncate.go
new file mode 100644
index 0000000..a8d59ac
--- /dev/null
+++ b/internal/io/fs/truncate.go
@@ -0,0 +1,61 @@
+package fs
+
+import (
+ "context"
+ "errors"
+ "io"
+ "os"
+ "time"
+
+ "github.com/mimecast/dtail/internal/io/logger"
+)
+
+func (f readFile) truncateTimer(ctx context.Context) (checkTruncate chan struct{}) {
+ checkTruncate = make(chan struct{})
+
+ go func() {
+ for {
+ select {
+ case <-time.After(time.Second * 3):
+ select {
+ case checkTruncate <- struct{}{}:
+ case <-ctx.Done():
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return
+}
+
+// Check wether log file is truncated. Returns nil if not.
+func (f readFile) truncated(fd *os.File) (bool, error) {
+ logger.Debug(f.filePath, "File truncation check")
+
+ // Can not seek currently open FD.
+ curPos, err := fd.Seek(0, os.SEEK_CUR)
+ if err != nil {
+ return true, err
+ }
+
+ // Can not open file at original path.
+ pathFd, err := os.Open(f.filePath)
+ if err != nil {
+ return true, err
+ }
+ defer pathFd.Close()
+
+ // Can not seek file at original path.
+ pathPos, err := pathFd.Seek(0, io.SeekEnd)
+ if err != nil {
+ return true, err
+ }
+
+ if curPos > pathPos {
+ return true, errors.New("File got truncated")
+ }
+
+ return false, nil
+}
diff --git a/internal/lcontext/lcontext.go b/internal/lcontext/lcontext.go
index bd51d94..89cb7c3 100644
--- a/internal/lcontext/lcontext.go
+++ b/internal/lcontext/lcontext.go
@@ -6,3 +6,17 @@ type LContext struct {
BeforeContext int
MaxCount int
}
+
+// Has returns true if it has any parameter set.
+func (c LContext) Has() bool {
+ if c.AfterContext > 0 {
+ return true
+ }
+ if c.BeforeContext > 0 {
+ return true
+ }
+ if c.MaxCount > 0 {
+ return true
+ }
+ return false
+}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 07933d0..7da6012 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -401,7 +401,7 @@ func readOptions(opts []string) (map[string]string, lcontext.LContext, error) {
for _, o := range opts {
kv := strings.SplitN(o, "=", 2)
if len(kv) != 2 {
- return options, lContext, fmt.Errorf("Unable to parse options: %v", kv)
+ continue
}
key := kv[0]
val := kv[1]
diff --git a/internal/version/version.go b/internal/version/version.go
index a6d6b05..f9acd56 100644
--- a/internal/version/version.go
+++ b/internal/version/version.go
@@ -11,9 +11,9 @@ const (
// Name of DTail.
Name string = "DTail"
// Version of DTail.
- Version string = "3.2.2"
+ Version string = "3.3.0"
// Additional information for DTail
- Additional string = ""
+ Additional string = "develop"
// ProtocolCompat -ibility version.
ProtocolCompat string = "3"
)