summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 09:15:26 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 09:15:26 +0200
commite411633018ff2f9ad01037cc14b946f6885e5c0c (patch)
treec6f6edca932714b9c6008209bf4f9ffc994df75d /internal/server
parent174bd919ab58e15a1841df428025ea9cc8ef7e3a (diff)
Consolidate read command paths via strategy loop (task 333)
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/readcommand.go209
1 files changed, 72 insertions, 137 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 3410499..f5fbfb9 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -24,6 +24,8 @@ type readCommand struct {
mode omode.Mode
}
+type readStrategy func(context.Context, lcontext.LContext, fs.FileReader, regex.Regex) error
+
func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
return &readCommand{
server: server,
@@ -211,15 +213,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
dlog.Server.Info(r.server.user, "Start reading", path, globID)
-
- // Log if grep is using literal mode optimization
- if r.mode == omode.GrepClient {
- if re.IsLiteral() {
- dlog.Server.Info(r.server.user, "Using optimized literal string matching for pattern:", re.Pattern())
- } else {
- dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern())
- }
- }
+ r.logRegexMode(re)
var reader fs.FileReader
var limiter chan struct{}
@@ -276,94 +270,49 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
return
}
- // Original channel-based implementation
- aggregate := r.server.aggregate
- var lines chan *line.Line
-
- for {
- if aggregate != nil {
- // For MapReduce operations, create a new channel that goes only to the aggregate
- // This prevents lines from being sent to the client
- lines = make(chan *line.Line, 10000)
- aggregate.NextLinesCh <- lines
- } else {
- // For non-MapReduce operations, use the server's lines channel
- lines = r.server.lines
- }
-
- if err := reader.Start(ctx, ltx, lines, re); err != nil {
- dlog.Server.Error(r.server.user, path, globID, err)
- }
- if aggregate != nil {
- // Also makes aggregate to Flush
- close(lines)
- }
-
- select {
- case <-ctx.Done():
- return
- default:
- if !reader.Retry() {
- return
- }
- }
- time.Sleep(time.Second * 2)
- dlog.Server.Info(path, globID, "Reading file again")
- }
+ r.executeReadLoop(ctx, ltx, path, globID, re, reader, r.readViaChannels())
}
-func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LContext,
+func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex, reader fs.FileReader) {
- dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID)
+ dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID)
+ r.logRegexMode(re)
- // Log if grep is using literal mode optimization
- if r.mode == omode.GrepClient {
- if re.IsLiteral() {
- dlog.Server.Info(r.server.user, "Using optimized literal string matching for pattern:", re.Pattern())
- } else {
- dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern())
- }
+ // Enable turbo mode if not already enabled
+ if !r.server.IsTurboMode() {
+ r.server.EnableTurboMode()
}
- // Use the existing lines channel but with the processor-based reader
- aggregate := r.server.aggregate
- var lines chan *line.Line
-
- // Use the optimized version if turbo boost is not disabled (enabled by default)
- turboBoostEnabled := !r.server.serverCfg.TurboBoostDisable
-
- for {
- if aggregate != nil {
- // For MapReduce operations, create a new channel that goes only to the aggregate
- // This prevents lines from being sent to the client
- lines = make(chan *line.Line, 10000)
- aggregate.NextLinesCh <- lines
- } else {
- // For non-MapReduce operations, use the server's lines channel
- lines = r.server.lines
+ // Create a direct writer based on the mode
+ // Each file gets its own writer instance to avoid race conditions
+ // when multiple files are processed concurrently
+ var writer TurboWriter
+ if r.server.serverless {
+ // In serverless mode, write directly to stdout
+ writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless)
+ } else {
+ // In server mode, use the network writer with turbo channels
+ // Create a new instance for each file to ensure thread safety
+ writer = &TurboNetworkWriter{
+ handler: &r.server.baseHandler,
+ hostname: r.server.hostname,
+ plain: r.server.plain,
+ serverless: r.server.serverless,
}
+ }
- // Create a processor that sends to the lines channel
- processor := NewChannellessLineProcessor(lines, globID)
- defer processor.Close()
+ r.executeReadLoop(ctx, ltx, path, globID, re, reader, r.readViaTurboProcessor(path, globID, writer))
+}
- var err error
- if turboBoostEnabled {
- err = reader.StartWithProcessorOptimized(ctx, ltx, processor, re)
- } else {
- err = reader.StartWithProcessor(ctx, ltx, processor, re)
- }
+func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext,
+ path, globID string, re regex.Regex, reader fs.FileReader, strategy readStrategy) {
- if err != nil {
+ for {
+ if err := strategy(ctx, ltx, reader, re); err != nil {
dlog.Server.Error(r.server.user, path, globID, err)
}
- if aggregate != nil {
- // Also makes aggregate to Flush
- close(lines)
- }
-
select {
case <-ctx.Done():
return
@@ -372,52 +321,40 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
return
}
}
+
time.Sleep(time.Second * 2)
dlog.Server.Info(path, globID, "Reading file again")
}
}
-func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.LContext,
- path, globID string, re regex.Regex, reader fs.FileReader) {
+func (r *readCommand) readViaChannels() readStrategy {
+ return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error {
+ aggregate := r.server.aggregate
+ var linesCh chan *line.Line
- dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID)
-
- // Log if grep is using literal mode optimization
- if r.mode == omode.GrepClient {
- if re.IsLiteral() {
- dlog.Server.Info(r.server.user, "Using optimized literal string matching for pattern:", re.Pattern())
+ if aggregate != nil {
+ // For MapReduce operations, create a new channel that goes only to the aggregate.
+ linesCh = make(chan *line.Line, 10000)
+ aggregate.NextLinesCh <- linesCh
} else {
- dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern())
+ // For non-MapReduce operations, use the server's shared lines channel.
+ linesCh = r.server.lines
}
- }
- // Enable turbo mode if not already enabled
- if !r.server.IsTurboMode() {
- r.server.EnableTurboMode()
- }
-
- // Create a direct writer based on the mode
- // Each file gets its own writer instance to avoid race conditions
- // when multiple files are processed concurrently
- var writer TurboWriter
- if r.server.serverless {
- // In serverless mode, write directly to stdout
- writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless)
- } else {
- // In server mode, use the network writer with turbo channels
- // Create a new instance for each file to ensure thread safety
- writer = &TurboNetworkWriter{
- handler: &r.server.baseHandler,
- hostname: r.server.hostname,
- plain: r.server.plain,
- serverless: r.server.serverless,
+ err := reader.Start(ctx, ltx, linesCh, re)
+ if aggregate != nil {
+ // Closing the aggregate line channel triggers flush.
+ close(linesCh)
}
+
+ return err
}
+}
- for {
+func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWriter) readStrategy {
+ return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error {
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration")
- // Create a processor based on whether we're doing MapReduce or not
var processor interface {
ProcessLine(*bytes.Buffer, uint64, string) error
Flush() error
@@ -425,34 +362,30 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
}
if r.server.turboAggregate != nil {
- // Use turbo aggregate processor for MapReduce operations
+ // Use turbo aggregate processor for MapReduce operations.
dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID)
processor = server.NewTurboAggregateProcessor(r.server.turboAggregate, globID)
} else {
- // Use direct line processor for cat/grep/tail
+ // Use direct line processor for cat/grep/tail.
processor = NewDirectLineProcessor(writer, globID)
}
- // Use the optimized reader
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start")
- err := reader.StartWithProcessorOptimized(ctx, ltx, processor, re)
+ startErr := reader.StartWithProcessorOptimized(ctx, ltx, processor, re)
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> completed")
- if err != nil {
- dlog.Server.Error(r.server.user, path, globID, err)
- }
- // Ensure we flush before closing
+ // Ensure we flush and close the processor before retry checks.
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> flushing processor")
if flushErr := processor.Flush(); flushErr != nil {
dlog.Server.Error(r.server.user, path, globID, "flush error", flushErr)
}
-
- // Close the processor after each iteration
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> closing processor")
- processor.Close()
+ if closeErr := processor.Close(); closeErr != nil {
+ dlog.Server.Error(r.server.user, path, globID, "close error", closeErr)
+ }
dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> processor closed")
- // Give time for data to be transmitted
+ // Give time for data to be transmitted.
// This is crucial for integration tests to ensure all data is sent
// Skip this delay in serverless mode since data is written directly to stdout
if !r.server.serverless {
@@ -460,16 +393,18 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
time.Sleep(50 * time.Millisecond)
}
- select {
- case <-ctx.Done():
- return
- default:
- if !reader.Retry() {
- return
- }
- }
- time.Sleep(time.Second * 2)
- dlog.Server.Info(path, globID, "Reading file again")
+ return startErr
+ }
+}
+
+func (r *readCommand) logRegexMode(re regex.Regex) {
+ if r.mode != omode.GrepClient {
+ return
+ }
+ if re.IsLiteral() {
+ dlog.Server.Info(r.server.user, "Using optimized literal string matching for pattern:", re.Pattern())
+ } else {
+ dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern())
}
}