diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-13 21:10:28 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-14 20:11:23 +0300 |
| commit | 1dead22129a26e4f532e68c2c63fe4122b519506 (patch) | |
| tree | b5ab02d2400767410b8dec5183aa9f83d092f0d4 | |
| parent | 5f3e6b8569b5b71853208949506bbcd3c44488b5 (diff) | |
Merging grep context from master
| -rw-r--r-- | TODO.md | 1 | ||||
| -rw-r--r-- | cmd/dgrep/main.go | 3 | ||||
| -rw-r--r-- | cmd/dtail/main.go | 3 | ||||
| -rw-r--r-- | internal/config/args.go | 73 | ||||
| -rw-r--r-- | internal/io/fs/filereader.go | 4 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 149 | ||||
| -rw-r--r-- | internal/lcontext/lcontext.go | 22 | ||||
| -rw-r--r-- | internal/server/handlers/basehandler.go | 12 | ||||
| -rw-r--r-- | internal/server/handlers/healthhandler.go | 5 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 30 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 9 |
11 files changed, 269 insertions, 42 deletions
@@ -3,6 +3,7 @@ TODO This is a loose list of what to do. Maybe for the next releae or maybe for a later one. +[ ] Rename dlog package to l [ ] Manually add back all changes from mimecast master to this branch. [ ] Client 4.x should print an error and exit when trying to connect to a 3.x server. [ ] Client 3.x should print an error and exit when trying to connect to a 4.x server. diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go index da3f7a4..2d7e53b 100644 --- a/cmd/dgrep/main.go +++ b/cmd/dgrep/main.go @@ -30,6 +30,9 @@ func main() { flag.BoolVar(&displayVersion, "version", false, "Display version") flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, "How many connections established per CPU core concurrently") + flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines") + flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines") + flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines") flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go index bc4236c..ee019ae 100644 --- a/cmd/dtail/main.go +++ b/cmd/dtail/main.go @@ -46,6 +46,9 @@ func main() { flag.BoolVar(&displayVersion, "version", false, "Display version") flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, "How many connections established per CPU core concurrently") + flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines") + flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines") + flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines") flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection") flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port") diff --git a/internal/config/args.go b/internal/config/args.go index f721390..f94e0a9 100644 --- a/internal/config/args.go +++ b/internal/config/args.go @@ -3,8 +3,10 @@ package config import ( "encoding/base64" "fmt" + "strconv" "strings" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/omode" gossh "golang.org/x/crypto/ssh" @@ -12,6 +14,7 @@ import ( // Args is a helper struct to summarize common client arguments. type Args struct { + lcontext.LContext Arguments []string ConfigFile string ConnectionsPerCPU int @@ -74,17 +77,50 @@ func (a *Args) String() string { // SerializeOptions returns a string ready to be sent over the wire to the server. func (a *Args) SerializeOptions() string { - return fmt.Sprintf("quiet=%v:spartan=%v:serverless=%v", a.Quiet, a.Spartan, - a.Serverless) + options := make(map[string]string) + + if a.Quiet { + options["quiet"] = fmt.Sprintf("%v", a.Quiet) + } + if a.Spartan { + options["spartan"] = fmt.Sprintf("%v", a.Spartan) + } + if a.Serverless { + options["serverless"] = fmt.Sprintf("%v", a.Serverless) + } + if a.LContext.MaxCount != 0 { + options["max"] = fmt.Sprintf("%d", a.LContext.MaxCount) + } + if a.LContext.BeforeContext != 0 { + options["before"] = fmt.Sprintf("%d", a.LContext.BeforeContext) + } + if a.LContext.AfterContext != 0 { + options["after"] = fmt.Sprintf("%d", a.LContext.AfterContext) + } + + var sb strings.Builder + var i int + for k, v := range options { + if i > 0 { + sb.WriteString(":") + } + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(v) + i++ + } + return sb.String() } // DeserializeOptions deserializes the options, but into a map. -func DeserializeOptions(opts []string) (map[string]string, error) { +func DeserializeOptions(opts []string) (map[string]string, lcontext.LContext, error) { options := make(map[string]string, len(opts)) + var ltx lcontext.LContext + for _, o := range opts { kv := strings.SplitN(o, "=", 2) if len(kv) != 2 { - return options, fmt.Errorf("Unable to parse options: %v", kv) + return options, ltx, fmt.Errorf("Unable to parse options: %v", kv) } key := kv[0] val := kv[1] @@ -93,11 +129,34 @@ func DeserializeOptions(opts []string) (map[string]string, error) { s := strings.SplitN(val, "%", 2) decoded, err := base64.StdEncoding.DecodeString(s[1]) if err != nil { - return options, err + return options, ltx, err } val = string(decoded) } - options[key] = val + + switch key { + case "before": + iVal, err := strconv.Atoi(val) + if err != nil { + return options, ltx, err + } + ltx.BeforeContext = iVal + case "after": + iVal, err := strconv.Atoi(val) + if err != nil { + return options, ltx, err + } + ltx.AfterContext = iVal + case "max": + iVal, err := strconv.Atoi(val) + if err != nil { + return options, ltx, err + } + ltx.MaxCount = iVal + default: + options[key] = val + } } - return options, nil + + return options, ltx, nil } diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index 7773142..b05fd39 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -4,13 +4,15 @@ import ( "context" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" ) // FileReader is the interface used on the dtail server to read/cat/grep/mapr... // a file. type FileReader interface { - Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error + Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line, + re regex.Regex) error FilePath() string Retry() bool } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 92f85b6..88d467e 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -16,6 +16,7 @@ import ( "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -62,8 +63,8 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lines chan<- line.Line, - re regex.Regex) error { +func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, + lines chan<- line.Line, re regex.Regex) error { dlog.Common.Debug("readFile", f) defer func() { @@ -102,7 +103,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, wg.Add(1) go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, &wg, rawLines, lines, re) + go f.filter(ctx, ltx, &wg, rawLines, lines, re) err = f.read(ctx, fd, rawLines, truncate) close(rawLines) @@ -213,10 +214,27 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu } // Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, - rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { +func (f readFile) filter(ctx context.Context, ltx lcontext.LContext, + wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, + re regex.Regex) { defer wg.Done() + // Do we have any kind of local context settings? If so then run the more complex + // filterWithLContext method. + if ltx.Has() { + // We can not skip transmitting any lines to the client with a local + // grep context specified. + f.canSkipLines = false + f.filterWithLContext(ctx, ltx, rawLines, lines, re) + return + } + + f.filterWithoutLContext(ctx, rawLines, lines, re) +} + +func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer, + lines chan<- line.Line, re regex.Regex) { + for { select { case line, ok := <-rawLines: @@ -235,11 +253,126 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, } } -func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, +// Filter log lines matching a given regular expression, however with local grep context. +func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + + // Scenario 1: Finish once maxCount hits found + maxCount := ltx.MaxCount + processMaxCount := maxCount > 0 + maxReached := false + + // Scenario 2: Print prev. N lines when current line matches. + before := ltx.BeforeContext + processBefore := before > 0 + var beforeBuf chan *bytes.Buffer + if processBefore { + beforeBuf = make(chan *bytes.Buffer, before) + } + + // Screnario 3: Print next N lines when current line matches. + after := 0 + processAfter := ltx.AfterContext > 0 + + for lineBytesBuffer := range rawLines { + f.updatePosition() + + if !re.Match(lineBytesBuffer.Bytes()) { + f.updateLineNotMatched() + + if processAfter && after > 0 { + after-- + myLine := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: 100, + } + + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + + } else if processBefore { + // Keep last num BeforeContext raw messages. + select { + case beforeBuf <- lineBytesBuffer: + default: + pool.RecycleBytesBuffer(<-beforeBuf) + beforeBuf <- lineBytesBuffer + } + } + continue + } + + f.updateLineMatched() + + if processAfter { + if maxReached { + return + } + after = ltx.AfterContext + } + + if processBefore { + i := uint64(len(beforeBuf)) + for { + select { + case lineBytesBuffer := <-beforeBuf: + myLine := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount() - i, + TransmittedPerc: 100, + } + i-- + + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + default: + // beforeBuf is now empty. + } + if len(beforeBuf) == 0 { + break + } + } + } + + line := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: 100, + } + + select { + case lines <- line: + if processMaxCount { + maxCount-- + if maxCount == 0 { + if !processAfter || after == 0 { + return + } + // Unfortunatley we have to continue filter, as there might be more lines to print + maxReached = true + } + } + case <-ctx.Done(): + return + } + } +} + +func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) { var read line.Line - if !re.Match(lineBytes.Bytes()) { + if !re.Match(lineBytesBuffer.Bytes()) { f.updateLineNotMatched() f.updateLineNotTransmitted() return read, false @@ -254,7 +387,7 @@ func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, f.updateLineTransmitted() read = line.Line{ - Content: lineBytes, + Content: lineBytesBuffer, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc(), diff --git a/internal/lcontext/lcontext.go b/internal/lcontext/lcontext.go new file mode 100644 index 0000000..183ceb5 --- /dev/null +++ b/internal/lcontext/lcontext.go @@ -0,0 +1,22 @@ +package lcontext + +// LContext stands for line context (used by context aware grep queries e.g.) +type LContext struct { + AfterContext int + BeforeContext int + MaxCount int +} + +// Has returns true if it has any parameter set. +func (c LContext) Has() bool { + if c.AfterContext > 0 { + return true + } + if c.BeforeContext > 0 { + return true + } + if c.MaxCount > 0 { + return true + } + return false +} diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 6bc8268..c25f85a 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -18,12 +18,13 @@ import ( "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/protocol" user "github.com/mimecast/dtail/internal/user/server" ) -type handleCommandCb func(context.Context, int, []string, string) +type handleCommandCb func(context.Context, lcontext.LContext, int, []string, string) type baseHandler struct { done *internal.Done @@ -160,14 +161,13 @@ func (h *baseHandler) handleCommand(commandStr string) { splitted := strings.Split(args[0], ":") commandName := splitted[0] - options, err := config.DeserializeOptions(splitted[1:]) + options, ltx, err := config.DeserializeOptions(splitted[1:]) if err != nil { h.send(h.serverMessages, dlog.Server.Error(h.user, err)) return } - h.setOptions(options) - - h.handleCommandCb(ctx, argc, args, commandName) + h.handleOptions(options) + h.handleCommandCb(ctx, ltx, argc, args, commandName) } func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) { @@ -238,7 +238,7 @@ func (h *baseHandler) handleAckCommand(argc int, args []string) { } } -func (h *baseHandler) setOptions(options map[string]string) { +func (h *baseHandler) handleOptions(options map[string]string) { // We have to make sure that this block is executed only once. h.mutex.Lock() defer h.mutex.Unlock() diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go index 0425696..6dd9872 100644 --- a/internal/server/handlers/healthhandler.go +++ b/internal/server/handlers/healthhandler.go @@ -8,6 +8,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" user "github.com/mimecast/dtail/internal/user/server" ) @@ -40,8 +41,8 @@ func NewHealthHandler(user *user.User) *HealthHandler { return &h } -func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int, - args []string, commandName string) { +func (h *HealthHandler) handleHealthCommand(ctx context.Context, + ltx lcontext.LContext, argc int, args []string, commandName string) { dlog.Server.Debug(h.user, "Handling health command", argc, args) switch commandName { diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 384e966..4728a55 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -10,6 +10,7 @@ import ( "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" ) @@ -26,8 +27,8 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { } } -func (r *readCommand) Start(ctx context.Context, argc int, args []string, - retries int) { +func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, + argc int, args []string, retries int) { re := regex.NewNoop() if argc >= 4 { @@ -44,11 +45,11 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, "Unable to parse command", args, argc)) return } - r.readGlob(ctx, args[1], re, retries) + r.readGlob(ctx, ltx, args[1], re, retries) } -func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, - retries int) { +func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, + glob string, re regex.Regex, retries int) { retryInterval := time.Second * 5 glob = filepath.Clean(glob) @@ -74,7 +75,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, continue } - r.readFiles(ctx, paths, glob, re, retryInterval) + r.readFiles(ctx, ltx, paths, glob, re, retryInterval) return } @@ -83,18 +84,18 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, return } -func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, - re regex.Regex, retryInterval time.Duration) { +func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, + paths []string, glob string, re regex.Regex, retryInterval time.Duration) { var wg sync.WaitGroup wg.Add(len(paths)) for _, path := range paths { - go r.readFileIfPermissions(ctx, &wg, path, glob, re) + go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re) } wg.Wait() } -func (r *readCommand) readFileIfPermissions(ctx context.Context, +func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LContext, wg *sync.WaitGroup, path, glob string, re regex.Regex) { defer wg.Done() @@ -105,12 +106,13 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, "Unable to read file(s), check server logs")) return } - r.readFile(ctx, path, globID, re) + r.readFile(ctx, ltx, path, globID, re) } -func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) { - dlog.Server.Info(r.server.user, "Start reading file", path, globID) +func (r *readCommand) readFile(ctx context.Context, ltx lcontext.LContext, + path, globID string, re regex.Regex) { + dlog.Server.Info(r.server.user, "Start reading file", path, globID) var reader fs.FileReader switch r.mode { case omode.TailClient: @@ -129,7 +131,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege lines = make(chan line.Line, 100) aggregate.NextLinesCh <- lines } - if err := reader.Start(ctx, lines, re); err != nil { + if err := reader.Start(ctx, ltx, lines, re); err != nil { dlog.Server.Error(r.server.user, path, globID, err) } if aggregate != nil { diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 52c4570..36574a9 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -8,6 +8,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/omode" user "github.com/mimecast/dtail/internal/user/server" ) @@ -53,8 +54,8 @@ func NewServerHandler(user *user.User, catLimiter, return &h } -func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, - args []string, commandName string) { +func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LContext, + argc int, args []string, commandName string) { dlog.Server.Debug(h.user, "Handling user command", argc, args) h.incrementActiveCommands() @@ -68,13 +69,13 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, case "grep", "cat": command := newReadCommand(h, omode.CatClient) go func() { - command.Start(ctx, argc, args, 1) + command.Start(ctx, ltx, argc, args, 1) commandFinished() }() case "tail": command := newReadCommand(h, omode.TailClient) go func() { - command.Start(ctx, argc, args, 10) + command.Start(ctx, ltx, argc, args, 10) commandFinished() }() case "map": |
