summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TODO.md1
-rw-r--r--cmd/dgrep/main.go3
-rw-r--r--cmd/dtail/main.go3
-rw-r--r--internal/config/args.go73
-rw-r--r--internal/io/fs/filereader.go4
-rw-r--r--internal/io/fs/readfile.go149
-rw-r--r--internal/lcontext/lcontext.go22
-rw-r--r--internal/server/handlers/basehandler.go12
-rw-r--r--internal/server/handlers/healthhandler.go5
-rw-r--r--internal/server/handlers/readcommand.go30
-rw-r--r--internal/server/handlers/serverhandler.go9
11 files changed, 269 insertions, 42 deletions
diff --git a/TODO.md b/TODO.md
index ec9ca2d..7cf897e 100644
--- a/TODO.md
+++ b/TODO.md
@@ -3,6 +3,7 @@ TODO
This is a loose list of what to do. Maybe for the next releae or maybe for a later one.
+[ ] Rename dlog package to l
[ ] Manually add back all changes from mimecast master to this branch.
[ ] Client 4.x should print an error and exit when trying to connect to a 3.x server.
[ ] Client 3.x should print an error and exit when trying to connect to a 4.x server.
diff --git a/cmd/dgrep/main.go b/cmd/dgrep/main.go
index da3f7a4..2d7e53b 100644
--- a/cmd/dgrep/main.go
+++ b/cmd/dgrep/main.go
@@ -30,6 +30,9 @@ func main() {
flag.BoolVar(&displayVersion, "version", false, "Display version")
flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU,
"How many connections established per CPU core concurrently")
+ flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines")
+ flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines")
+ flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines")
flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port")
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go
index bc4236c..ee019ae 100644
--- a/cmd/dtail/main.go
+++ b/cmd/dtail/main.go
@@ -46,6 +46,9 @@ func main() {
flag.BoolVar(&displayVersion, "version", false, "Display version")
flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU,
"How many connections established per CPU core concurrently")
+ flag.IntVar(&args.LContext.AfterContext, "after", 0, "Print lines of trailing context after matching lines")
+ flag.IntVar(&args.LContext.BeforeContext, "before", 0, "Print lines of leading context before matching lines")
+ flag.IntVar(&args.LContext.MaxCount, "max", 0, "Stop reading file after NUM matching lines")
flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port")
flag.IntVar(&args.Timeout, "timeout", 0, "Max time dtail server will collect data until disconnection")
flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port")
diff --git a/internal/config/args.go b/internal/config/args.go
index f721390..f94e0a9 100644
--- a/internal/config/args.go
+++ b/internal/config/args.go
@@ -3,8 +3,10 @@ package config
import (
"encoding/base64"
"fmt"
+ "strconv"
"strings"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/omode"
gossh "golang.org/x/crypto/ssh"
@@ -12,6 +14,7 @@ import (
// Args is a helper struct to summarize common client arguments.
type Args struct {
+ lcontext.LContext
Arguments []string
ConfigFile string
ConnectionsPerCPU int
@@ -74,17 +77,50 @@ func (a *Args) String() string {
// SerializeOptions returns a string ready to be sent over the wire to the server.
func (a *Args) SerializeOptions() string {
- return fmt.Sprintf("quiet=%v:spartan=%v:serverless=%v", a.Quiet, a.Spartan,
- a.Serverless)
+ options := make(map[string]string)
+
+ if a.Quiet {
+ options["quiet"] = fmt.Sprintf("%v", a.Quiet)
+ }
+ if a.Spartan {
+ options["spartan"] = fmt.Sprintf("%v", a.Spartan)
+ }
+ if a.Serverless {
+ options["serverless"] = fmt.Sprintf("%v", a.Serverless)
+ }
+ if a.LContext.MaxCount != 0 {
+ options["max"] = fmt.Sprintf("%d", a.LContext.MaxCount)
+ }
+ if a.LContext.BeforeContext != 0 {
+ options["before"] = fmt.Sprintf("%d", a.LContext.BeforeContext)
+ }
+ if a.LContext.AfterContext != 0 {
+ options["after"] = fmt.Sprintf("%d", a.LContext.AfterContext)
+ }
+
+ var sb strings.Builder
+ var i int
+ for k, v := range options {
+ if i > 0 {
+ sb.WriteString(":")
+ }
+ sb.WriteString(k)
+ sb.WriteString("=")
+ sb.WriteString(v)
+ i++
+ }
+ return sb.String()
}
// DeserializeOptions deserializes the options, but into a map.
-func DeserializeOptions(opts []string) (map[string]string, error) {
+func DeserializeOptions(opts []string) (map[string]string, lcontext.LContext, error) {
options := make(map[string]string, len(opts))
+ var ltx lcontext.LContext
+
for _, o := range opts {
kv := strings.SplitN(o, "=", 2)
if len(kv) != 2 {
- return options, fmt.Errorf("Unable to parse options: %v", kv)
+ return options, ltx, fmt.Errorf("Unable to parse options: %v", kv)
}
key := kv[0]
val := kv[1]
@@ -93,11 +129,34 @@ func DeserializeOptions(opts []string) (map[string]string, error) {
s := strings.SplitN(val, "%", 2)
decoded, err := base64.StdEncoding.DecodeString(s[1])
if err != nil {
- return options, err
+ return options, ltx, err
}
val = string(decoded)
}
- options[key] = val
+
+ switch key {
+ case "before":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ return options, ltx, err
+ }
+ ltx.BeforeContext = iVal
+ case "after":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ return options, ltx, err
+ }
+ ltx.AfterContext = iVal
+ case "max":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ return options, ltx, err
+ }
+ ltx.MaxCount = iVal
+ default:
+ options[key] = val
+ }
}
- return options, nil
+
+ return options, ltx, nil
}
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
index 7773142..b05fd39 100644
--- a/internal/io/fs/filereader.go
+++ b/internal/io/fs/filereader.go
@@ -4,13 +4,15 @@ import (
"context"
"github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
)
// FileReader is the interface used on the dtail server to read/cat/grep/mapr...
// a file.
type FileReader interface {
- Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error
+ Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line,
+ re regex.Regex) error
FilePath() string
Retry() bool
}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 92f85b6..88d467e 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -16,6 +16,7 @@ import (
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
"github.com/DataDog/zstd"
@@ -62,8 +63,8 @@ func (f readFile) Retry() bool {
}
// Start tailing a log file.
-func (f readFile) Start(ctx context.Context, lines chan<- line.Line,
- re regex.Regex) error {
+func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
+ lines chan<- line.Line, re regex.Regex) error {
dlog.Common.Debug("readFile", f)
defer func() {
@@ -102,7 +103,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line,
wg.Add(1)
go f.periodicTruncateCheck(ctx, truncate)
- go f.filter(ctx, &wg, rawLines, lines, re)
+ go f.filter(ctx, ltx, &wg, rawLines, lines, re)
err = f.read(ctx, fd, rawLines, truncate)
close(rawLines)
@@ -213,10 +214,27 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu
}
// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup,
- rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+func (f readFile) filter(ctx context.Context, ltx lcontext.LContext,
+ wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line,
+ re regex.Regex) {
defer wg.Done()
+ // Do we have any kind of local context settings? If so then run the more complex
+ // filterWithLContext method.
+ if ltx.Has() {
+ // We can not skip transmitting any lines to the client with a local
+ // grep context specified.
+ f.canSkipLines = false
+ f.filterWithLContext(ctx, ltx, rawLines, lines, re)
+ return
+ }
+
+ f.filterWithoutLContext(ctx, rawLines, lines, re)
+}
+
+func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer,
+ lines chan<- line.Line, re regex.Regex) {
+
for {
select {
case line, ok := <-rawLines:
@@ -235,11 +253,126 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup,
}
}
-func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int,
+// Filter log lines matching a given regular expression, however with local grep context.
+func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext,
+ rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+
+ // Scenario 1: Finish once maxCount hits found
+ maxCount := ltx.MaxCount
+ processMaxCount := maxCount > 0
+ maxReached := false
+
+ // Scenario 2: Print prev. N lines when current line matches.
+ before := ltx.BeforeContext
+ processBefore := before > 0
+ var beforeBuf chan *bytes.Buffer
+ if processBefore {
+ beforeBuf = make(chan *bytes.Buffer, before)
+ }
+
+ // Screnario 3: Print next N lines when current line matches.
+ after := 0
+ processAfter := ltx.AfterContext > 0
+
+ for lineBytesBuffer := range rawLines {
+ f.updatePosition()
+
+ if !re.Match(lineBytesBuffer.Bytes()) {
+ f.updateLineNotMatched()
+
+ if processAfter && after > 0 {
+ after--
+ myLine := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: 100,
+ }
+
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+
+ } else if processBefore {
+ // Keep last num BeforeContext raw messages.
+ select {
+ case beforeBuf <- lineBytesBuffer:
+ default:
+ pool.RecycleBytesBuffer(<-beforeBuf)
+ beforeBuf <- lineBytesBuffer
+ }
+ }
+ continue
+ }
+
+ f.updateLineMatched()
+
+ if processAfter {
+ if maxReached {
+ return
+ }
+ after = ltx.AfterContext
+ }
+
+ if processBefore {
+ i := uint64(len(beforeBuf))
+ for {
+ select {
+ case lineBytesBuffer := <-beforeBuf:
+ myLine := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount() - i,
+ TransmittedPerc: 100,
+ }
+ i--
+
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+ default:
+ // beforeBuf is now empty.
+ }
+ if len(beforeBuf) == 0 {
+ break
+ }
+ }
+ }
+
+ line := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: 100,
+ }
+
+ select {
+ case lines <- line:
+ if processMaxCount {
+ maxCount--
+ if maxCount == 0 {
+ if !processAfter || after == 0 {
+ return
+ }
+ // Unfortunatley we have to continue filter, as there might be more lines to print
+ maxReached = true
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int,
re regex.Regex) (line.Line, bool) {
var read line.Line
- if !re.Match(lineBytes.Bytes()) {
+ if !re.Match(lineBytesBuffer.Bytes()) {
f.updateLineNotMatched()
f.updateLineNotTransmitted()
return read, false
@@ -254,7 +387,7 @@ func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int,
f.updateLineTransmitted()
read = line.Line{
- Content: lineBytes,
+ Content: lineBytesBuffer,
SourceID: f.globID,
Count: f.totalLineCount(),
TransmittedPerc: f.transmittedPerc(),
diff --git a/internal/lcontext/lcontext.go b/internal/lcontext/lcontext.go
new file mode 100644
index 0000000..183ceb5
--- /dev/null
+++ b/internal/lcontext/lcontext.go
@@ -0,0 +1,22 @@
+package lcontext
+
+// LContext stands for line context (used by context aware grep queries e.g.)
+type LContext struct {
+ AfterContext int
+ BeforeContext int
+ MaxCount int
+}
+
+// Has returns true if it has any parameter set.
+func (c LContext) Has() bool {
+ if c.AfterContext > 0 {
+ return true
+ }
+ if c.BeforeContext > 0 {
+ return true
+ }
+ if c.MaxCount > 0 {
+ return true
+ }
+ return false
+}
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 6bc8268..c25f85a 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -18,12 +18,13 @@ import (
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/protocol"
user "github.com/mimecast/dtail/internal/user/server"
)
-type handleCommandCb func(context.Context, int, []string, string)
+type handleCommandCb func(context.Context, lcontext.LContext, int, []string, string)
type baseHandler struct {
done *internal.Done
@@ -160,14 +161,13 @@ func (h *baseHandler) handleCommand(commandStr string) {
splitted := strings.Split(args[0], ":")
commandName := splitted[0]
- options, err := config.DeserializeOptions(splitted[1:])
+ options, ltx, err := config.DeserializeOptions(splitted[1:])
if err != nil {
h.send(h.serverMessages, dlog.Server.Error(h.user, err))
return
}
- h.setOptions(options)
-
- h.handleCommandCb(ctx, argc, args, commandName)
+ h.handleOptions(options)
+ h.handleCommandCb(ctx, ltx, argc, args, commandName)
}
func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) {
@@ -238,7 +238,7 @@ func (h *baseHandler) handleAckCommand(argc int, args []string) {
}
}
-func (h *baseHandler) setOptions(options map[string]string) {
+func (h *baseHandler) handleOptions(options map[string]string) {
// We have to make sure that this block is executed only once.
h.mutex.Lock()
defer h.mutex.Unlock()
diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go
index 0425696..6dd9872 100644
--- a/internal/server/handlers/healthhandler.go
+++ b/internal/server/handlers/healthhandler.go
@@ -8,6 +8,7 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
user "github.com/mimecast/dtail/internal/user/server"
)
@@ -40,8 +41,8 @@ func NewHealthHandler(user *user.User) *HealthHandler {
return &h
}
-func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int,
- args []string, commandName string) {
+func (h *HealthHandler) handleHealthCommand(ctx context.Context,
+ ltx lcontext.LContext, argc int, args []string, commandName string) {
dlog.Server.Debug(h.user, "Handling health command", argc, args)
switch commandName {
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 384e966..4728a55 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -10,6 +10,7 @@ import (
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
)
@@ -26,8 +27,8 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
}
}
-func (r *readCommand) Start(ctx context.Context, argc int, args []string,
- retries int) {
+func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
+ argc int, args []string, retries int) {
re := regex.NewNoop()
if argc >= 4 {
@@ -44,11 +45,11 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string,
"Unable to parse command", args, argc))
return
}
- r.readGlob(ctx, args[1], re, retries)
+ r.readGlob(ctx, ltx, args[1], re, retries)
}
-func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
- retries int) {
+func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
+ glob string, re regex.Regex, retries int) {
retryInterval := time.Second * 5
glob = filepath.Clean(glob)
@@ -74,7 +75,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
continue
}
- r.readFiles(ctx, paths, glob, re, retryInterval)
+ r.readFiles(ctx, ltx, paths, glob, re, retryInterval)
return
}
@@ -83,18 +84,18 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
return
}
-func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string,
- re regex.Regex, retryInterval time.Duration) {
+func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
+ paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
var wg sync.WaitGroup
wg.Add(len(paths))
for _, path := range paths {
- go r.readFileIfPermissions(ctx, &wg, path, glob, re)
+ go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re)
}
wg.Wait()
}
-func (r *readCommand) readFileIfPermissions(ctx context.Context,
+func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LContext,
wg *sync.WaitGroup, path, glob string, re regex.Regex) {
defer wg.Done()
@@ -105,12 +106,13 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context,
"Unable to read file(s), check server logs"))
return
}
- r.readFile(ctx, path, globID, re)
+ r.readFile(ctx, ltx, path, globID, re)
}
-func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) {
- dlog.Server.Info(r.server.user, "Start reading file", path, globID)
+func (r *readCommand) readFile(ctx context.Context, ltx lcontext.LContext,
+ path, globID string, re regex.Regex) {
+ dlog.Server.Info(r.server.user, "Start reading file", path, globID)
var reader fs.FileReader
switch r.mode {
case omode.TailClient:
@@ -129,7 +131,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege
lines = make(chan line.Line, 100)
aggregate.NextLinesCh <- lines
}
- if err := reader.Start(ctx, lines, re); err != nil {
+ if err := reader.Start(ctx, ltx, lines, re); err != nil {
dlog.Server.Error(r.server.user, path, globID, err)
}
if aggregate != nil {
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 52c4570..36574a9 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -8,6 +8,7 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/omode"
user "github.com/mimecast/dtail/internal/user/server"
)
@@ -53,8 +54,8 @@ func NewServerHandler(user *user.User, catLimiter,
return &h
}
-func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int,
- args []string, commandName string) {
+func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LContext,
+ argc int, args []string, commandName string) {
dlog.Server.Debug(h.user, "Handling user command", argc, args)
h.incrementActiveCommands()
@@ -68,13 +69,13 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int,
case "grep", "cat":
command := newReadCommand(h, omode.CatClient)
go func() {
- command.Start(ctx, argc, args, 1)
+ command.Start(ctx, ltx, argc, args, 1)
commandFinished()
}()
case "tail":
command := newReadCommand(h, omode.TailClient)
go func() {
- command.Start(ctx, argc, args, 10)
+ command.Start(ctx, ltx, argc, args, 10)
commandFinished()
}()
case "map":