diff options
Diffstat (limited to 'internal/server/handlers/readcommand.go')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 83 |
1 files changed, 48 insertions, 35 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index b659c06..4728a55 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -7,8 +7,9 @@ import ( "sync" "time" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" @@ -26,39 +27,45 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { } } -func (r *readCommand) Start(ctx context.Context, lContext lcontext.LContext, argc int, args []string, retries int) { - re := regex.NewNoop() +func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, + argc int, args []string, retries int) { + re := regex.NewNoop() if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) if err != nil { - r.server.sendServerMessage(logger.Error(r.server.user, commandParseWarning, err)) + r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user, + "Unable to parse command", err)) return } re = deserializedRegex } if argc < 3 { - r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc)) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Unable to parse command", args, argc)) return } - r.readGlob(ctx, lContext, args[1], re, retries) + r.readGlob(ctx, ltx, args[1], re, retries) } -func (r *readCommand) readGlob(ctx context.Context, lContext lcontext.LContext, glob string, re regex.Regex, retries int) { +func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, + glob string, re regex.Regex, retries int) { + retryInterval := time.Second * 5 glob = filepath.Clean(glob) for retryCount := 0; retryCount < retries; retryCount++ { paths, err := filepath.Glob(glob) if err != nil { - logger.Warn(r.server.user, glob, err) + dlog.Server.Warn(r.server.user, glob, err) time.Sleep(retryInterval) continue } if numPaths := len(paths); numPaths == 0 { - logger.Error(r.server.user, "No such file(s) to read", glob) - r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) + dlog.Server.Error(r.server.user, "No such file(s) to read", glob) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Unable to read file(s), check server logs")) select { case <-ctx.Done(): return @@ -68,41 +75,44 @@ func (r *readCommand) readGlob(ctx context.Context, lContext lcontext.LContext, continue } - r.readFiles(ctx, lContext, paths, glob, re, retryInterval) + r.readFiles(ctx, ltx, paths, glob, re, retryInterval) return } - r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Giving up to read file(s)")) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Giving up to read file(s)")) return } -func (r *readCommand) readFiles(ctx context.Context, lContext lcontext.LContext, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { +func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, + paths []string, glob string, re regex.Regex, retryInterval time.Duration) { + var wg sync.WaitGroup wg.Add(len(paths)) - for _, path := range paths { - go r.readFileIfPermissions(ctx, lContext, &wg, path, glob, re) + go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re) } - wg.Wait() } -func (r *readCommand) readFileIfPermissions(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, path, glob string, re regex.Regex) { +func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LContext, + wg *sync.WaitGroup, path, glob string, re regex.Regex) { + defer wg.Done() globID := r.makeGlobID(path, glob) - if !r.server.user.HasFilePermission(path, "readfiles") { - logger.Error(r.server.user, "No permission to read file", path, globID) - r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs")) + dlog.Server.Error(r.server.user, "No permission to read file", path, globID) + r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user, + "Unable to read file(s), check server logs")) return } - - r.readFile(ctx, lContext, path, globID, re) + r.readFile(ctx, ltx, path, globID, re) } -func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext, path, globID string, re regex.Regex) { - logger.Info(r.server.user, "Start reading file", path, globID) +func (r *readCommand) readFile(ctx context.Context, ltx lcontext.LContext, + path, globID string, re regex.Regex) { + dlog.Server.Info(r.server.user, "Start reading file", path, globID) var reader fs.FileReader switch r.mode { case omode.TailClient: @@ -114,15 +124,19 @@ func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext, } lines := r.server.lines - - // Plug in mappreduce engine - if r.server.aggregate != nil { - lines = r.server.aggregate.Lines - } + aggregate := r.server.aggregate for { - if err := reader.Start(ctx, lContext, lines, re); err != nil { - logger.Error(r.server.user, path, globID, err) + if aggregate != nil { + lines = make(chan line.Line, 100) + aggregate.NextLinesCh <- lines + } + if err := reader.Start(ctx, ltx, lines, re); err != nil { + dlog.Server.Error(r.server.user, path, globID, err) + } + if aggregate != nil { + // Also makes aggregate to Flush + close(lines) } select { @@ -133,9 +147,8 @@ func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext, return } } - time.Sleep(time.Second * 2) - logger.Info(path, globID, "Reading file again") + dlog.Server.Info(path, globID, "Reading file again") } } @@ -152,11 +165,11 @@ func (r *readCommand) makeGlobID(path, glob string) string { if len(idParts) > 0 { return strings.Join(idParts, "/") } - if len(pathParts) > 0 { return pathParts[len(pathParts)-1] } - r.server.sendServerWarnMessage(logger.Warn("Empty file path given?", path, glob)) + r.server.send(r.server.serverMessages, + dlog.Server.Warn("Empty file path given?", path, glob)) return "" } |
