diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 09:15:26 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 09:15:26 +0200 |
| commit | e411633018ff2f9ad01037cc14b946f6885e5c0c (patch) | |
| tree | c6f6edca932714b9c6008209bf4f9ffc994df75d | |
| parent | 174bd919ab58e15a1841df428025ea9cc8ef7e3a (diff) | |
Consolidate read command paths via strategy loop (task 333)
| -rw-r--r-- | internal/server/handlers/readcommand.go | 209 |
1 files changed, 72 insertions, 137 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 3410499..f5fbfb9 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -24,6 +24,8 @@ type readCommand struct { mode omode.Mode } +type readStrategy func(context.Context, lcontext.LContext, fs.FileReader, regex.Regex) error + func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { return &readCommand{ server: server, @@ -211,15 +213,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, path, globID string, re regex.Regex) { dlog.Server.Info(r.server.user, "Start reading", path, globID) - - // Log if grep is using literal mode optimization - if r.mode == omode.GrepClient { - if re.IsLiteral() { - dlog.Server.Info(r.server.user, "Using optimized literal string matching for pattern:", re.Pattern()) - } else { - dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern()) - } - } + r.logRegexMode(re) var reader fs.FileReader var limiter chan struct{} @@ -276,94 +270,49 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, return } - // Original channel-based implementation - aggregate := r.server.aggregate - var lines chan *line.Line - - for { - if aggregate != nil { - // For MapReduce operations, create a new channel that goes only to the aggregate - // This prevents lines from being sent to the client - lines = make(chan *line.Line, 10000) - aggregate.NextLinesCh <- lines - } else { - // For non-MapReduce operations, use the server's lines channel - lines = r.server.lines - } - - if err := reader.Start(ctx, ltx, lines, re); err != nil { - dlog.Server.Error(r.server.user, path, globID, err) - } - if aggregate != nil { - // Also makes aggregate to Flush - close(lines) - } - - select { - case <-ctx.Done(): - return - default: - if !reader.Retry() { - return - } - } - time.Sleep(time.Second * 2) - dlog.Server.Info(path, globID, "Reading file again") - } + r.executeReadLoop(ctx, ltx, path, globID, re, reader, r.readViaChannels()) } -func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LContext, +func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.LContext, path, globID string, re regex.Regex, reader fs.FileReader) { - dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID) + dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID) + r.logRegexMode(re) - // Log if grep is using literal mode optimization - if r.mode == omode.GrepClient { - if re.IsLiteral() { - dlog.Server.Info(r.server.user, "Using optimized literal string matching for pattern:", re.Pattern()) - } else { - dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern()) - } + // Enable turbo mode if not already enabled + if !r.server.IsTurboMode() { + r.server.EnableTurboMode() } - // Use the existing lines channel but with the processor-based reader - aggregate := r.server.aggregate - var lines chan *line.Line - - // Use the optimized version if turbo boost is not disabled (enabled by default) - turboBoostEnabled := !r.server.serverCfg.TurboBoostDisable - - for { - if aggregate != nil { - // For MapReduce operations, create a new channel that goes only to the aggregate - // This prevents lines from being sent to the client - lines = make(chan *line.Line, 10000) - aggregate.NextLinesCh <- lines - } else { - // For non-MapReduce operations, use the server's lines channel - lines = r.server.lines + // Create a direct writer based on the mode + // Each file gets its own writer instance to avoid race conditions + // when multiple files are processed concurrently + var writer TurboWriter + if r.server.serverless { + // In serverless mode, write directly to stdout + writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless) + } else { + // In server mode, use the network writer with turbo channels + // Create a new instance for each file to ensure thread safety + writer = &TurboNetworkWriter{ + handler: &r.server.baseHandler, + hostname: r.server.hostname, + plain: r.server.plain, + serverless: r.server.serverless, } + } - // Create a processor that sends to the lines channel - processor := NewChannellessLineProcessor(lines, globID) - defer processor.Close() + r.executeReadLoop(ctx, ltx, path, globID, re, reader, r.readViaTurboProcessor(path, globID, writer)) +} - var err error - if turboBoostEnabled { - err = reader.StartWithProcessorOptimized(ctx, ltx, processor, re) - } else { - err = reader.StartWithProcessor(ctx, ltx, processor, re) - } +func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext, + path, globID string, re regex.Regex, reader fs.FileReader, strategy readStrategy) { - if err != nil { + for { + if err := strategy(ctx, ltx, reader, re); err != nil { dlog.Server.Error(r.server.user, path, globID, err) } - if aggregate != nil { - // Also makes aggregate to Flush - close(lines) - } - select { case <-ctx.Done(): return @@ -372,52 +321,40 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte return } } + time.Sleep(time.Second * 2) dlog.Server.Info(path, globID, "Reading file again") } } -func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.LContext, - path, globID string, re regex.Regex, reader fs.FileReader) { +func (r *readCommand) readViaChannels() readStrategy { + return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error { + aggregate := r.server.aggregate + var linesCh chan *line.Line - dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID) - - // Log if grep is using literal mode optimization - if r.mode == omode.GrepClient { - if re.IsLiteral() { - dlog.Server.Info(r.server.user, "Using optimized literal string matching for pattern:", re.Pattern()) + if aggregate != nil { + // For MapReduce operations, create a new channel that goes only to the aggregate. + linesCh = make(chan *line.Line, 10000) + aggregate.NextLinesCh <- linesCh } else { - dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern()) + // For non-MapReduce operations, use the server's shared lines channel. + linesCh = r.server.lines } - } - // Enable turbo mode if not already enabled - if !r.server.IsTurboMode() { - r.server.EnableTurboMode() - } - - // Create a direct writer based on the mode - // Each file gets its own writer instance to avoid race conditions - // when multiple files are processed concurrently - var writer TurboWriter - if r.server.serverless { - // In serverless mode, write directly to stdout - writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless) - } else { - // In server mode, use the network writer with turbo channels - // Create a new instance for each file to ensure thread safety - writer = &TurboNetworkWriter{ - handler: &r.server.baseHandler, - hostname: r.server.hostname, - plain: r.server.plain, - serverless: r.server.serverless, + err := reader.Start(ctx, ltx, linesCh, re) + if aggregate != nil { + // Closing the aggregate line channel triggers flush. + close(linesCh) } + + return err } +} - for { +func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWriter) readStrategy { + return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error { dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration") - // Create a processor based on whether we're doing MapReduce or not var processor interface { ProcessLine(*bytes.Buffer, uint64, string) error Flush() error @@ -425,34 +362,30 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L } if r.server.turboAggregate != nil { - // Use turbo aggregate processor for MapReduce operations + // Use turbo aggregate processor for MapReduce operations. dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID) processor = server.NewTurboAggregateProcessor(r.server.turboAggregate, globID) } else { - // Use direct line processor for cat/grep/tail + // Use direct line processor for cat/grep/tail. processor = NewDirectLineProcessor(writer, globID) } - // Use the optimized reader dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start") - err := reader.StartWithProcessorOptimized(ctx, ltx, processor, re) + startErr := reader.StartWithProcessorOptimized(ctx, ltx, processor, re) dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> completed") - if err != nil { - dlog.Server.Error(r.server.user, path, globID, err) - } - // Ensure we flush before closing + // Ensure we flush and close the processor before retry checks. dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> flushing processor") if flushErr := processor.Flush(); flushErr != nil { dlog.Server.Error(r.server.user, path, globID, "flush error", flushErr) } - - // Close the processor after each iteration dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> closing processor") - processor.Close() + if closeErr := processor.Close(); closeErr != nil { + dlog.Server.Error(r.server.user, path, globID, "close error", closeErr) + } dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> processor closed") - // Give time for data to be transmitted + // Give time for data to be transmitted. // This is crucial for integration tests to ensure all data is sent // Skip this delay in serverless mode since data is written directly to stdout if !r.server.serverless { @@ -460,16 +393,18 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L time.Sleep(50 * time.Millisecond) } - select { - case <-ctx.Done(): - return - default: - if !reader.Retry() { - return - } - } - time.Sleep(time.Second * 2) - dlog.Server.Info(path, globID, "Reading file again") + return startErr + } +} + +func (r *readCommand) logRegexMode(re regex.Regex) { + if r.mode != omode.GrepClient { + return + } + if re.IsLiteral() { + dlog.Server.Info(r.server.user, "Using optimized literal string matching for pattern:", re.Pattern()) + } else { + dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern()) } } |
