From 2ebe7e9d63ba62c6f19749c39fe0a577d86ca775 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 12 Sep 2021 19:04:42 +0300 Subject: bugfix: dmap skipped the last couple of mapreduce lines --- internal/server/handlers/readcommand.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'internal/server/handlers/readcommand.go') diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 5bab26f..69dd4a5 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -8,6 +8,7 @@ import ( "time" "github.com/mimecast/dtail/internal/io/fs" + "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" @@ -113,16 +114,20 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege } lines := r.server.lines - - // Plug in mappreduce engine - if r.server.aggregate != nil { - lines = r.server.aggregate.Lines - } + aggregate := r.server.aggregate for { + if aggregate != nil { + lines = make(chan line.Line, 100) + aggregate.NextLinesCh <- lines + } if err := reader.Start(ctx, lines, re); err != nil { logger.Error(r.server.user, path, globID, err) } + if aggregate != nil { + // Also makes aggregate to Flush + close(lines) + } select { case <-ctx.Done(): -- cgit v1.2.3 From fe3e68afd99d8ea246be52893730f987e138ec24 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 19 Sep 2021 13:22:59 +0300 Subject: move args to config package logger package rewrite as dlog --- internal/server/handlers/readcommand.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) (limited to 'internal/server/handlers/readcommand.go') diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 69dd4a5..60ad2a0 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -9,7 +9,7 @@ import ( "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" ) @@ -32,13 +32,13 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, retrie if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) if err != nil { - r.server.sendServerMessage(logger.Error(r.server.user, commandParseWarning, err)) + r.server.sendServerMessage(dlog.Server.Error(r.server.user, commandParseWarning, err)) return } re = deserializedRegex } if argc < 3 { - r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) + r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, commandParseWarning, args, argc)) return } r.readGlob(ctx, args[1], re, retries) @@ -51,14 +51,14 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, for retryCount := 0; retryCount < retries; retryCount++ { paths, err := filepath.Glob(glob) if err != nil { - logger.Warn(r.server.user, glob, err) + dlog.Server.Warn(r.server.user, glob, err) time.Sleep(retryInterval) continue } if numPaths := len(paths); numPaths == 0 { - logger.Error(r.server.user, "No such file(s) to read", glob) - r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) + dlog.Server.Error(r.server.user, "No such file(s) to read", glob) + r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs")) select { case <-ctx.Done(): return @@ -72,7 +72,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, return } - r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Giving up to read file(s)")) + r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, "Giving up to read file(s)")) return } @@ -92,8 +92,8 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr globID := r.makeGlobID(path, glob) if !r.server.user.HasFilePermission(path, "readfiles") { - logger.Error(r.server.user, "No permission to read file", path, globID) - r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) + dlog.Server.Error(r.server.user, "No permission to read file", path, globID) + r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs")) return } @@ -101,7 +101,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr } func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) { - logger.Info(r.server.user, "Start reading file", path, globID) + dlog.Server.Info(r.server.user, "Start reading file", path, globID) var reader fs.FileReader switch r.mode { @@ -122,7 +122,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege aggregate.NextLinesCh <- lines } if err := reader.Start(ctx, lines, re); err != nil { - logger.Error(r.server.user, path, globID, err) + dlog.Server.Error(r.server.user, path, globID, err) } if aggregate != nil { // Also makes aggregate to Flush @@ -139,7 +139,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege } time.Sleep(time.Second * 2) - logger.Info(path, globID, "Reading file again") + dlog.Server.Info(path, globID, "Reading file again") } } @@ -161,6 +161,6 @@ func (r *readCommand) makeGlobID(path, glob string) string { return pathParts[len(pathParts)-1] } - r.server.sendServerWarnMessage(logger.Warn("Empty file path given?", path, glob)) + r.server.sendServerWarnMessage(dlog.Server.Warn("Empty file path given?", path, glob)) return "" } -- cgit v1.2.3 From fcaa94c7453efa0d74e330128c0f5c2cde8f11b3 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 26 Sep 2021 16:42:47 +0300 Subject: refactor config reader - also looks in additional search paths for config file unless NONE is specified --- internal/server/handlers/readcommand.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/server/handlers/readcommand.go') diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 60ad2a0..c76ae2a 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -7,9 +7,9 @@ import ( "sync" "time" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" ) -- cgit v1.2.3 From 12c79f68bb5bda6673819d7b754820ecfe6d08ff Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 2 Oct 2021 11:54:07 +0300 Subject: reduce logging in serverless mode --- internal/server/handlers/readcommand.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'internal/server/handlers/readcommand.go') diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index c76ae2a..6579018 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -32,13 +32,13 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, retrie if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) if err != nil { - r.server.sendServerMessage(dlog.Server.Error(r.server.user, commandParseWarning, err)) + r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user, commandParseWarning, err)) return } re = deserializedRegex } if argc < 3 { - r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, commandParseWarning, args, argc)) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, commandParseWarning, args, argc)) return } r.readGlob(ctx, args[1], re, retries) @@ -58,7 +58,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, if numPaths := len(paths); numPaths == 0 { dlog.Server.Error(r.server.user, "No such file(s) to read", glob) - r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs")) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs")) select { case <-ctx.Done(): return @@ -72,7 +72,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, return } - r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, "Giving up to read file(s)")) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Giving up to read file(s)")) return } @@ -93,7 +93,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGr if !r.server.user.HasFilePermission(path, "readfiles") { dlog.Server.Error(r.server.user, "No permission to read file", path, globID) - r.server.sendServerWarnMessage(dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs")) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs")) return } @@ -161,6 +161,6 @@ func (r *readCommand) makeGlobID(path, glob string) string { return pathParts[len(pathParts)-1] } - r.server.sendServerWarnMessage(dlog.Server.Warn("Empty file path given?", path, glob)) + r.server.send(r.server.serverMessages, dlog.Server.Warn("Empty file path given?", path, glob)) return "" } -- cgit v1.2.3 From f70622f307629a2542ea5eb128dea8c1043d3a40 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 5 Oct 2021 10:00:38 +0300 Subject: more on this --- internal/server/handlers/readcommand.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'internal/server/handlers/readcommand.go') diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 6579018..abc44c7 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -32,13 +32,13 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, retrie if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) if err != nil { - r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user, commandParseWarning, err)) + r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user, "Unable to parse command", err)) return } re = deserializedRegex } if argc < 3 { - r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, commandParseWarning, args, argc)) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to parse command", args, argc)) return } r.readGlob(ctx, args[1], re, retries) -- cgit v1.2.3 From 97747ea0f3178f7f5890512d483fdccaa82846b0 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 21:10:29 +0300 Subject: vetting and linting and some code restyling --- internal/server/handlers/readcommand.go | 41 +++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 17 deletions(-) (limited to 'internal/server/handlers/readcommand.go') diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index abc44c7..384e966 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -26,25 +26,30 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { } } -func (r *readCommand) Start(ctx context.Context, argc int, args []string, retries int) { - re := regex.NewNoop() +func (r *readCommand) Start(ctx context.Context, argc int, args []string, + retries int) { + re := regex.NewNoop() if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) if err != nil { - r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user, "Unable to parse command", err)) + r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user, + "Unable to parse command", err)) return } re = deserializedRegex } if argc < 3 { - r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to parse command", args, argc)) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Unable to parse command", args, argc)) return } r.readGlob(ctx, args[1], re, retries) } -func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, retries int) { +func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, + retries int) { + retryInterval := time.Second * 5 glob = filepath.Clean(glob) @@ -58,7 +63,8 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, if numPaths := len(paths); numPaths == 0 { dlog.Server.Error(r.server.user, "No such file(s) to read", glob) - r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs")) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Unable to read file(s), check server logs")) select { case <-ctx.Done(): return @@ -72,31 +78,33 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, return } - r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Giving up to read file(s)")) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Giving up to read file(s)")) return } -func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { +func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, + re regex.Regex, retryInterval time.Duration) { + var wg sync.WaitGroup wg.Add(len(paths)) - for _, path := range paths { go r.readFileIfPermissions(ctx, &wg, path, glob, re) } - wg.Wait() } -func (r *readCommand) readFileIfPermissions(ctx context.Context, wg *sync.WaitGroup, path, glob string, re regex.Regex) { +func (r *readCommand) readFileIfPermissions(ctx context.Context, + wg *sync.WaitGroup, path, glob string, re regex.Regex) { + defer wg.Done() globID := r.makeGlobID(path, glob) - if !r.server.user.HasFilePermission(path, "readfiles") { dlog.Server.Error(r.server.user, "No permission to read file", path, globID) - r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, "Unable to read file(s), check server logs")) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Unable to read file(s), check server logs")) return } - r.readFile(ctx, path, globID, re) } @@ -137,7 +145,6 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege return } } - time.Sleep(time.Second * 2) dlog.Server.Info(path, globID, "Reading file again") } @@ -156,11 +163,11 @@ func (r *readCommand) makeGlobID(path, glob string) string { if len(idParts) > 0 { return strings.Join(idParts, "/") } - if len(pathParts) > 0 { return pathParts[len(pathParts)-1] } - r.server.send(r.server.serverMessages, dlog.Server.Warn("Empty file path given?", path, glob)) + r.server.send(r.server.serverMessages, + dlog.Server.Warn("Empty file path given?", path, glob)) return "" } -- cgit v1.2.3 From 1dead22129a26e4f532e68c2c63fe4122b519506 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 13 Oct 2021 21:10:28 +0300 Subject: Merging grep context from master --- internal/server/handlers/readcommand.go | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) (limited to 'internal/server/handlers/readcommand.go') diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 384e966..4728a55 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -10,6 +10,7 @@ import ( "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" ) @@ -26,8 +27,8 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { } } -func (r *readCommand) Start(ctx context.Context, argc int, args []string, - retries int) { +func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, + argc int, args []string, retries int) { re := regex.NewNoop() if argc >= 4 { @@ -44,11 +45,11 @@ func (r *readCommand) Start(ctx context.Context, argc int, args []string, "Unable to parse command", args, argc)) return } - r.readGlob(ctx, args[1], re, retries) + r.readGlob(ctx, ltx, args[1], re, retries) } -func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, - retries int) { +func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, + glob string, re regex.Regex, retries int) { retryInterval := time.Second * 5 glob = filepath.Clean(glob) @@ -74,7 +75,7 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, continue } - r.readFiles(ctx, paths, glob, re, retryInterval) + r.readFiles(ctx, ltx, paths, glob, re, retryInterval) return } @@ -83,18 +84,18 @@ func (r *readCommand) readGlob(ctx context.Context, glob string, re regex.Regex, return } -func (r *readCommand) readFiles(ctx context.Context, paths []string, glob string, - re regex.Regex, retryInterval time.Duration) { +func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, + paths []string, glob string, re regex.Regex, retryInterval time.Duration) { var wg sync.WaitGroup wg.Add(len(paths)) for _, path := range paths { - go r.readFileIfPermissions(ctx, &wg, path, glob, re) + go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re) } wg.Wait() } -func (r *readCommand) readFileIfPermissions(ctx context.Context, +func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LContext, wg *sync.WaitGroup, path, glob string, re regex.Regex) { defer wg.Done() @@ -105,12 +106,13 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, "Unable to read file(s), check server logs")) return } - r.readFile(ctx, path, globID, re) + r.readFile(ctx, ltx, path, globID, re) } -func (r *readCommand) readFile(ctx context.Context, path, globID string, re regex.Regex) { - dlog.Server.Info(r.server.user, "Start reading file", path, globID) +func (r *readCommand) readFile(ctx context.Context, ltx lcontext.LContext, + path, globID string, re regex.Regex) { + dlog.Server.Info(r.server.user, "Start reading file", path, globID) var reader fs.FileReader switch r.mode { case omode.TailClient: @@ -129,7 +131,7 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege lines = make(chan line.Line, 100) aggregate.NextLinesCh <- lines } - if err := reader.Start(ctx, lines, re); err != nil { + if err := reader.Start(ctx, ltx, lines, re); err != nil { dlog.Server.Error(r.server.user, path, globID, err) } if aggregate != nil { -- cgit v1.2.3