diff options
Diffstat (limited to 'internal/server/handlers/readcommand.go')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 89 |
1 files changed, 84 insertions, 5 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 44ba9e4..7a351ba 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -2,12 +2,14 @@ package handlers import ( "context" + "fmt" "os" "path/filepath" "strings" "sync" "time" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/fs" "github.com/mimecast/dtail/internal/io/line" @@ -30,7 +32,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, argc int, args []string, retries int) { - + re := regex.NewNoop() if argc >= 4 { deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " ")) @@ -49,7 +51,9 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, // In serverless mode, can also read data from pipe // e.g.: grep foo bar.log | dmap 'from STATS select ...' - if r.isInputFromPipe() { + // Only read from pipe if no file argument is provided + isPipe := r.isInputFromPipe() && (argc < 2 || args[1] == "" || args[1] == "-") + if isPipe { dlog.Server.Debug("Reading data from stdin pipe") // Empty file path and globID "-" represents reading from the stdin pipe. r.read(ctx, ltx, "", "-", re) @@ -123,19 +127,21 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, path, globID string, re regex.Regex) { - + dlog.Server.Info(r.server.user, "Start reading", path, globID) var reader fs.FileReader var limiter chan struct{} switch r.mode { case omode.GrepClient, omode.CatClient: - reader = fs.NewCatFile(path, globID, r.server.serverMessages) + catFile := fs.NewCatFile(path, globID, r.server.serverMessages) + reader = &catFile limiter = r.server.catLimiter case omode.TailClient: fallthrough default: - reader = fs.NewTailFile(path, globID, r.server.serverMessages) + tailFile := fs.NewTailFile(path, globID, r.server.serverMessages) + reader = &tailFile limiter = r.server.tailLimiter } @@ -160,6 +166,19 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, } } + // Check if we should use the channel-less implementation + channellessEnabled := config.Env("DTAIL_CHANNELLESS_GREP") + dlog.Server.Info(r.server.user, "Channel-less check: enabled=", channellessEnabled, "mode=", r.mode) + // Only enable channel-less for server mode, not serverless mode + // Use the serverless field directly as it's more reliable + if channellessEnabled && (r.mode == omode.CatClient || r.mode == omode.GrepClient) && !r.server.serverless { + // Log to stderr for testing verification - only in server mode + fmt.Fprintf(os.Stderr, "[DTAIL] Using channel-less implementation for %s\n", path) + r.readWithProcessor(ctx, ltx, path, globID, re, reader) + return + } + + // Original channel-based implementation lines := r.server.lines aggregate := r.server.aggregate @@ -189,6 +208,66 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, } } +func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LContext, + path, globID string, re regex.Regex, reader fs.FileReader) { + + dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID) + + // Use the existing lines channel but with the processor-based reader + lines := r.server.lines + aggregate := r.server.aggregate + + // Use the optimized version if available + useOptimized := config.Env("DTAIL_OPTIMIZED_READER") + + // Log to stderr for testing verification - only in server mode + if !r.server.serverless { + if useOptimized { + fmt.Fprintf(os.Stderr, "[DTAIL] Using optimized reader for %s\n", path) + } else { + fmt.Fprintf(os.Stderr, "[DTAIL] Using standard processor reader for %s\n", path) + } + } + + for { + if aggregate != nil { + lines = make(chan *line.Line, 100) + aggregate.NextLinesCh <- lines + } + + // Create a processor that sends to the lines channel + processor := NewChannellessLineProcessor(lines, globID) + defer processor.Close() + + var err error + if useOptimized { + err = reader.StartWithProcessorOptimized(ctx, ltx, processor, re) + } else { + err = reader.StartWithProcessor(ctx, ltx, processor, re) + } + + if err != nil { + dlog.Server.Error(r.server.user, path, globID, err) + } + + if aggregate != nil { + // Also makes aggregate to Flush + close(lines) + } + + select { + case <-ctx.Done(): + return + default: + if !reader.Retry() { + return + } + } + time.Sleep(time.Second * 2) + dlog.Server.Info(path, globID, "Reading file again") + } +} + func (r *readCommand) makeGlobID(path, glob string) string { var idParts []string pathParts := strings.Split(path, "/") |
