diff options
| author | Paul Buetow <git@mx.buetow.org> | 2021-02-05 19:16:52 +0000 |
|---|---|---|
| committer | Paul Buetow <git@mx.buetow.org> | 2021-02-05 19:16:52 +0000 |
| commit | 742e6c444f7236ca3c9953050b0704bc88283ed3 (patch) | |
| tree | 48a3f3f6174a732e84a483c6079ae0615a230cd5 | |
| parent | 6f093ff69c83526279b9f039aca079162c2b68d5 (diff) | |
cann parse local context to server file reader
| -rw-r--r-- | internal/io/fs/filereader.go | 3 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 12 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 21 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 47 |
4 files changed, 58 insertions, 25 deletions
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index 0774837..efd410e 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -4,12 +4,13 @@ import ( "context" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" ) // FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file. type FileReader interface { - Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error + Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error FilePath() string Retry() bool } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 6757bd6..4b2af7c 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -14,6 +14,7 @@ import ( "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -59,7 +60,7 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error { +func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error { logger.Debug("readFile", f) defer func() { select { @@ -96,7 +97,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re wg.Add(1) go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, &wg, rawLines, lines, re) + go f.filter(ctx, lContext, &wg, rawLines, lines, re) err = f.read(ctx, fd, rawLines, truncate) close(rawLines) @@ -227,9 +228,14 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t } // Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { +func (f readFile) filter(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { defer wg.Done() + /* + beforeContext := make([]string, lContext.BeforeContext) + afterContext := make([]string, lContext.AfterContext) + */ + for { select { case line, ok := <-rawLines: diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 5bab26f..b659c06 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -9,6 +9,7 @@ import ( "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" ) @@ -25,7 +26,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { } } -func (r *readCommand) Start(ctx context.Context, argc int, args []string, retries int) { +func (r *readCommand) Start(ctx context.Context, lContext lcontext.LContext, argc int, args []string, retries int) { re := regex.NewNoop() if argc >= 4 { @@ -40,10 +41,10 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, retrie r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) return } - r.readGlob(ctx, args[1], re, retries) + r.readGlob(ctx, lContext, args[1], re, retries) } -func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, retries int) { +func (r *readCommand) readGlob(ctx context.Context, lContext lcontext.LContext, glob string, re regex.Regex, retries int) { retryInterval := time.Second * 5 glob = filepath.Clean(glob) @@ -67,7 +68,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, continue } - r.readFiles(ctx, paths, glob, re, retryInterval) + r.readFiles(ctx, lContext, paths, glob, re, retryInterval) return } @@ -75,18 +76,18 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, return } -func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { +func (r *readCommand) readFiles(ctx context.Context, lContext lcontext.LContext, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { var wg sync.WaitGroup wg.Add(len(paths)) for _, path := range paths { - go r.readFileIfPermissions(ctx, &wg, path, glob, re) + go r.readFileIfPermissions(ctx, lContext, &wg, path, glob, re) } wg.Wait() } -func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob string, re regex.Regex) { +func (r *readCommand) readFileIfPermissions(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, path, glob string, re regex.Regex) { defer wg.Done() globID := r.makeGlobID(path, glob) @@ -96,10 +97,10 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr return } - r.readFile(ctx, path, globID, re) + r.readFile(ctx, lContext, path, globID, re) } -func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) { +func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext, path, globID string, re regex.Regex) { logger.Info(r.server.user, "Start reading file", path, globID) var reader fs.FileReader @@ -120,7 +121,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege } for { - if err := reader.Start(ctx, lines, re); err != nil { + if err := reader.Start(ctx, lContext, lines, re); err != nil { logger.Error(r.server.user, path, globID, err) } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 169c1eb..07933d0 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "strconv" "strings" "sync/atomic" "time" @@ -15,6 +16,7 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" user "github.com/mimecast/dtail/internal/user/server" @@ -240,7 +242,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] splitted := strings.Split(args[0], ":") commandName := splitted[0] - options, err := readOptions(splitted[1:]) + options, lContext, err := readOptions(splitted[1:]) if err != nil { h.sendServerMessage(logger.Error(h.user, err)) commandFinished() @@ -258,7 +260,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] command := newReadCommand(h, omode.CatClient) go func() { h.incrementActiveReaders() - command.Start(ctx, argc, args, 1) + command.Start(ctx, lContext, argc, args, 1) readerFinished() commandFinished() }() @@ -267,7 +269,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] command := newReadCommand(h, omode.TailClient) go func() { h.incrementActiveReaders() - command.Start(ctx, argc, args, 10) + command.Start(ctx, lContext, argc, args, 10) readerFinished() commandFinished() }() @@ -391,16 +393,15 @@ func (h *ServerHandler) decrementActiveReaders() int32 { } // TODO: All options related code should be in its own package (client + server) -// Maybe we could move internal.clients.Args to internal.options.Options and -// Use struct tagging to determine which ones should be serialized over the wire -// from the client to the server. -func readOptions(opts []string) (map[string]string, error) { +func readOptions(opts []string) (map[string]string, lcontext.LContext, error) { options := make(map[string]string, len(opts)) + // Local search context + var lContext lcontext.LContext for _, o := range opts { kv := strings.SplitN(o, "=", 2) if len(kv) != 2 { - return options, fmt.Errorf("Unable to parse options: %v", kv) + return options, lContext, fmt.Errorf("Unable to parse options: %v", kv) } key := kv[0] val := kv[1] @@ -409,13 +410,37 @@ func readOptions(opts []string) (map[string]string, error) { s := strings.SplitN(val, "%", 2) decoded, err := base64.StdEncoding.DecodeString(s[1]) if err != nil { - return options, err + return options, lContext, err } val = string(decoded) } - options[key] = val + switch key { + case "before": + iVal, err := strconv.Atoi(val) + if err != nil { + logger.Error(err) + continue + } + lContext.BeforeContext = iVal + case "after": + iVal, err := strconv.Atoi(val) + if err != nil { + logger.Error(err) + continue + } + lContext.AfterContext = iVal + case "max": + iVal, err := strconv.Atoi(val) + if err != nil { + logger.Error(err) + continue + } + lContext.MaxCount = iVal + default: + options[key] = val + } } - return options, nil + return options, lContext, nil } |
