summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go89
1 files changed, 84 insertions, 5 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 44ba9e4..7a351ba 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -2,12 +2,14 @@ package handlers
import (
"context"
+ "fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/fs"
"github.com/mimecast/dtail/internal/io/line"
@@ -30,7 +32,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
argc int, args []string, retries int) {
-
+
re := regex.NewNoop()
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
@@ -49,7 +51,9 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
// In serverless mode, can also read data from pipe
// e.g.: grep foo bar.log | dmap 'from STATS select ...'
- if r.isInputFromPipe() {
+ // Only read from pipe if no file argument is provided
+ isPipe := r.isInputFromPipe() && (argc < 2 || args[1] == "" || args[1] == "-")
+ if isPipe {
dlog.Server.Debug("Reading data from stdin pipe")
// Empty file path and globID "-" represents reading from the stdin pipe.
r.read(ctx, ltx, "", "-", re)
@@ -123,19 +127,21 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
-
+
dlog.Server.Info(r.server.user, "Start reading", path, globID)
var reader fs.FileReader
var limiter chan struct{}
switch r.mode {
case omode.GrepClient, omode.CatClient:
- reader = fs.NewCatFile(path, globID, r.server.serverMessages)
+ catFile := fs.NewCatFile(path, globID, r.server.serverMessages)
+ reader = &catFile
limiter = r.server.catLimiter
case omode.TailClient:
fallthrough
default:
- reader = fs.NewTailFile(path, globID, r.server.serverMessages)
+ tailFile := fs.NewTailFile(path, globID, r.server.serverMessages)
+ reader = &tailFile
limiter = r.server.tailLimiter
}
@@ -160,6 +166,19 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
}
}
+ // Check if we should use the channel-less implementation
+ channellessEnabled := config.Env("DTAIL_CHANNELLESS_GREP")
+ dlog.Server.Info(r.server.user, "Channel-less check: enabled=", channellessEnabled, "mode=", r.mode)
+ // Only enable channel-less for server mode, not serverless mode
+ // Use the serverless field directly as it's more reliable
+ if channellessEnabled && (r.mode == omode.CatClient || r.mode == omode.GrepClient) && !r.server.serverless {
+ // Log to stderr for testing verification - only in server mode
+ fmt.Fprintf(os.Stderr, "[DTAIL] Using channel-less implementation for %s\n", path)
+ r.readWithProcessor(ctx, ltx, path, globID, re, reader)
+ return
+ }
+
+ // Original channel-based implementation
lines := r.server.lines
aggregate := r.server.aggregate
@@ -189,6 +208,66 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
}
}
+func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LContext,
+ path, globID string, re regex.Regex, reader fs.FileReader) {
+
+ dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID)
+
+ // Use the existing lines channel but with the processor-based reader
+ lines := r.server.lines
+ aggregate := r.server.aggregate
+
+ // Use the optimized version if available
+ useOptimized := config.Env("DTAIL_OPTIMIZED_READER")
+
+ // Log to stderr for testing verification - only in server mode
+ if !r.server.serverless {
+ if useOptimized {
+ fmt.Fprintf(os.Stderr, "[DTAIL] Using optimized reader for %s\n", path)
+ } else {
+ fmt.Fprintf(os.Stderr, "[DTAIL] Using standard processor reader for %s\n", path)
+ }
+ }
+
+ for {
+ if aggregate != nil {
+ lines = make(chan *line.Line, 100)
+ aggregate.NextLinesCh <- lines
+ }
+
+ // Create a processor that sends to the lines channel
+ processor := NewChannellessLineProcessor(lines, globID)
+ defer processor.Close()
+
+ var err error
+ if useOptimized {
+ err = reader.StartWithProcessorOptimized(ctx, ltx, processor, re)
+ } else {
+ err = reader.StartWithProcessor(ctx, ltx, processor, re)
+ }
+
+ if err != nil {
+ dlog.Server.Error(r.server.user, path, globID, err)
+ }
+
+ if aggregate != nil {
+ // Also makes aggregate to Flush
+ close(lines)
+ }
+
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if !reader.Retry() {
+ return
+ }
+ }
+ time.Sleep(time.Second * 2)
+ dlog.Server.Info(path, globID, "Reading file again")
+ }
+}
+
func (r *readCommand) makeGlobID(path, glob string) string {
var idParts []string
pathParts := strings.Split(path, "/")