summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go83
1 files changed, 48 insertions, 35 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index b659c06..4728a55 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -7,8 +7,9 @@ import (
"sync"
"time"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
@@ -26,39 +27,45 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
}
}
-func (r *readCommand) Start(ctx context.Context, lContext lcontext.LContext, argc int, args []string, retries int) {
- re := regex.NewNoop()
+func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
+ argc int, args []string, retries int) {
+ re := regex.NewNoop()
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
if err != nil {
- r.server.sendServerMessage(logger.Error(r.server.user, commandParseWarning, err))
+ r.server.send(r.server.serverMessages, dlog.Server.Error(r.server.user,
+ "Unable to parse command", err))
return
}
re = deserializedRegex
}
if argc < 3 {
- r.server.sendServerWarnMessage(logger.Warn(r.server.user, commandParseWarning, args, argc))
+ r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ "Unable to parse command", args, argc))
return
}
- r.readGlob(ctx, lContext, args[1], re, retries)
+ r.readGlob(ctx, ltx, args[1], re, retries)
}
-func (r *readCommand) readGlob(ctx context.Context, lContext lcontext.LContext, glob string, re regex.Regex, retries int) {
+func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
+ glob string, re regex.Regex, retries int) {
+
retryInterval := time.Second * 5
glob = filepath.Clean(glob)
for retryCount := 0; retryCount < retries; retryCount++ {
paths, err := filepath.Glob(glob)
if err != nil {
- logger.Warn(r.server.user, glob, err)
+ dlog.Server.Warn(r.server.user, glob, err)
time.Sleep(retryInterval)
continue
}
if numPaths := len(paths); numPaths == 0 {
- logger.Error(r.server.user, "No such file(s) to read", glob)
- r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs"))
+ dlog.Server.Error(r.server.user, "No such file(s) to read", glob)
+ r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ "Unable to read file(s), check server logs"))
select {
case <-ctx.Done():
return
@@ -68,41 +75,44 @@ func (r *readCommand) readGlob(ctx context.Context, lContext lcontext.LContext,
continue
}
- r.readFiles(ctx, lContext, paths, glob, re, retryInterval)
+ r.readFiles(ctx, ltx, paths, glob, re, retryInterval)
return
}
- r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Giving up to read file(s)"))
+ r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ "Giving up to read file(s)"))
return
}
-func (r *readCommand) readFiles(ctx context.Context, lContext lcontext.LContext, paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
+func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
+ paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
+
var wg sync.WaitGroup
wg.Add(len(paths))
-
for _, path := range paths {
- go r.readFileIfPermissions(ctx, lContext, &wg, path, glob, re)
+ go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re)
}
-
wg.Wait()
}
-func (r *readCommand) readFileIfPermissions(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, path, glob string, re regex.Regex) {
+func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LContext,
+ wg *sync.WaitGroup, path, glob string, re regex.Regex) {
+
defer wg.Done()
globID := r.makeGlobID(path, glob)
-
if !r.server.user.HasFilePermission(path, "readfiles") {
- logger.Error(r.server.user, "No permission to read file", path, globID)
- r.server.sendServerWarnMessage(logger.Warn(r.server.user, "Unable to read file(s), check server logs"))
+ dlog.Server.Error(r.server.user, "No permission to read file", path, globID)
+ r.server.send(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ "Unable to read file(s), check server logs"))
return
}
-
- r.readFile(ctx, lContext, path, globID, re)
+ r.readFile(ctx, ltx, path, globID, re)
}
-func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext, path, globID string, re regex.Regex) {
- logger.Info(r.server.user, "Start reading file", path, globID)
+func (r *readCommand) readFile(ctx context.Context, ltx lcontext.LContext,
+ path, globID string, re regex.Regex) {
+ dlog.Server.Info(r.server.user, "Start reading file", path, globID)
var reader fs.FileReader
switch r.mode {
case omode.TailClient:
@@ -114,15 +124,19 @@ func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext,
}
lines := r.server.lines
-
- // Plug in mappreduce engine
- if r.server.aggregate != nil {
- lines = r.server.aggregate.Lines
- }
+ aggregate := r.server.aggregate
for {
- if err := reader.Start(ctx, lContext, lines, re); err != nil {
- logger.Error(r.server.user, path, globID, err)
+ if aggregate != nil {
+ lines = make(chan line.Line, 100)
+ aggregate.NextLinesCh <- lines
+ }
+ if err := reader.Start(ctx, ltx, lines, re); err != nil {
+ dlog.Server.Error(r.server.user, path, globID, err)
+ }
+ if aggregate != nil {
+ // Also makes aggregate to Flush
+ close(lines)
}
select {
@@ -133,9 +147,8 @@ func (r *readCommand) readFile(ctx context.Context, lContext lcontext.LContext,
return
}
}
-
time.Sleep(time.Second * 2)
- logger.Info(path, globID, "Reading file again")
+ dlog.Server.Info(path, globID, "Reading file again")
}
}
@@ -152,11 +165,11 @@ func (r *readCommand) makeGlobID(path, glob string) string {
if len(idParts) > 0 {
return strings.Join(idParts, "/")
}
-
if len(pathParts) > 0 {
return pathParts[len(pathParts)-1]
}
- r.server.sendServerWarnMessage(logger.Warn("Empty file path given?", path, glob))
+ r.server.send(r.server.serverMessages,
+ dlog.Server.Warn("Empty file path given?", path, glob))
return ""
}