summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <git@mx.buetow.org>2021-02-05 19:16:52 +0000
committerPaul Buetow <git@mx.buetow.org>2021-02-05 19:16:52 +0000
commit742e6c444f7236ca3c9953050b0704bc88283ed3 (patch)
tree48a3f3f6174a732e84a483c6079ae0615a230cd5
parent6f093ff69c83526279b9f039aca079162c2b68d5 (diff)
cann parse local context to server file reader
-rw-r--r--internal/io/fs/filereader.go3
-rw-r--r--internal/io/fs/readfile.go12
-rw-r--r--internal/server/handlers/readcommand.go21
-rw-r--r--internal/server/handlers/serverhandler.go47
4 files changed, 58 insertions, 25 deletions
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
index 0774837..efd410e 100644
--- a/internal/io/fs/filereader.go
+++ b/internal/io/fs/filereader.go
@@ -4,12 +4,13 @@ import (
"context"
"github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
)
// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file.
type FileReader interface {
- Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error
+ Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error
FilePath() string
Retry() bool
}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 6757bd6..4b2af7c 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -14,6 +14,7 @@ import (
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
"github.com/DataDog/zstd"
@@ -59,7 +60,7 @@ func (f readFile) Retry() bool {
}
// Start tailing a log file.
-func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Regex) error {
+func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error {
logger.Debug("readFile", f)
defer func() {
select {
@@ -96,7 +97,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re
wg.Add(1)
go f.periodicTruncateCheck(ctx, truncate)
- go f.filter(ctx, &wg, rawLines, lines, re)
+ go f.filter(ctx, lContext, &wg, rawLines, lines, re)
err = f.read(ctx, fd, rawLines, truncate)
close(rawLines)
@@ -227,9 +228,14 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
}
// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
+func (f readFile) filter(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
defer wg.Done()
+ /*
+ beforeContext := make([]string, lContext.BeforeContext)
+ afterContext := make([]string, lContext.AfterContext)
+ */
+
for {
select {
case line, ok := <-rawLines:
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 5bab26f..b659c06 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -9,6 +9,7 @@ import (
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
)
@@ -25,7 +26,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
}
}
-func (r *readCommand) Start(ctx context.Context, argc int, args []string, retries int) {
+func (r *readCommand) Start(ctx context.Context, lContext lcontext.LContext, argc int, args []string, retries int) {
re := regex.NewNoop()
if argc >= 4 {
@@ -40,10 +41,10 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, retrie
r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc))
return
}
- r.readGlob(ctx, args[1], re, retries)
+ r.readGlob(ctx, lContext, args[1], re, retries)
}
-func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, retries int) {
+func (r *readCommand) readGlob(ctx context.Context, lContext lcontext.LContext, glob string, re regex.Regex, retries int) {
retryInterval := time.Second * 5
glob = filepath.Clean(glob)
@@ -67,7 +68,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
continue
}
- r.readFiles(ctx, paths, glob, re, retryInterval)
+ r.readFiles(ctx, lContext, paths, glob, re, retryInterval)
return
}
@@ -75,18 +76,18 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex,
return
}
-func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
+func (r *readCommand) readFiles(ctx context.Context, lContext lcontext.LContext, paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
var wg sync.WaitGroup
wg.Add(len(paths))
for _, path := range paths {
- go r.readFileIfPermissions(ctx, &wg, path, glob, re)
+ go r.readFileIfPermissions(ctx, lContext, &wg, path, glob, re)
}
wg.Wait()
}
-func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob string, re regex.Regex) {
+func (r *readCommand) readFileIfPermissions(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, path, glob string, re regex.Regex) {
defer wg.Done()
globID := r.makeGlobID(path, glob)
@@ -96,10 +97,10 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr
return
}
- r.readFile(ctx, path, globID, re)
+ r.readFile(ctx, lContext, path, globID, re)
}
-func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) {
+func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext, path, globID string, re regex.Regex) {
logger.Info(r.server.user, "Start reading file", path, globID)
var reader fs.FileReader
@@ -120,7 +121,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege
}
for {
- if err := reader.Start(ctx, lines, re); err != nil {
+ if err := reader.Start(ctx, lContext, lines, re); err != nil {
logger.Error(r.server.user, path, globID, err)
}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 169c1eb..07933d0 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
+ "strconv"
"strings"
"sync/atomic"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
user "github.com/mimecast/dtail/internal/user/server"
@@ -240,7 +242,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
splitted := strings.Split(args[0], ":")
commandName := splitted[0]
- options, err := readOptions(splitted[1:])
+ options, lContext, err := readOptions(splitted[1:])
if err != nil {
h.sendServerMessage(logger.Error(h.user, err))
commandFinished()
@@ -258,7 +260,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
command := newReadCommand(h, omode.CatClient)
go func() {
h.incrementActiveReaders()
- command.Start(ctx, argc, args, 1)
+ command.Start(ctx, lContext, argc, args, 1)
readerFinished()
commandFinished()
}()
@@ -267,7 +269,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
command := newReadCommand(h, omode.TailClient)
go func() {
h.incrementActiveReaders()
- command.Start(ctx, argc, args, 10)
+ command.Start(ctx, lContext, argc, args, 10)
readerFinished()
commandFinished()
}()
@@ -391,16 +393,15 @@ func (h *ServerHandler) decrementActiveReaders() int32 {
}
// TODO: All options related code should be in its own package (client + server)
-// Maybe we could move internal.clients.Args to internal.options.Options and
-// Use struct tagging to determine which ones should be serialized over the wire
-// from the client to the server.
-func readOptions(opts []string) (map[string]string, error) {
+func readOptions(opts []string) (map[string]string, lcontext.LContext, error) {
options := make(map[string]string, len(opts))
+ // Local search context
+ var lContext lcontext.LContext
for _, o := range opts {
kv := strings.SplitN(o, "=", 2)
if len(kv) != 2 {
- return options, fmt.Errorf("Unable to parse options: %v", kv)
+ return options, lContext, fmt.Errorf("Unable to parse options: %v", kv)
}
key := kv[0]
val := kv[1]
@@ -409,13 +410,37 @@ func readOptions(opts []string) (map[string]string, error) {
s := strings.SplitN(val, "%", 2)
decoded, err := base64.StdEncoding.DecodeString(s[1])
if err != nil {
- return options, err
+ return options, lContext, err
}
val = string(decoded)
}
- options[key] = val
+ switch key {
+ case "before":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ lContext.BeforeContext = iVal
+ case "after":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ lContext.AfterContext = iVal
+ case "max":
+ iVal, err := strconv.Atoi(val)
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ lContext.MaxCount = iVal
+ default:
+ options[key] = val
+ }
}
- return options, nil
+ return options, lContext, nil
}