summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-30 23:22:56 +0300
committerPaul Buetow <paul@buetow.org>2025-06-30 23:22:56 +0300
commitb4ca43d97c83c3b9da7138b3b4d6f6cce6fed370 (patch)
tree1dba534b8c7b1784f712cec90ff945e3d7fb7a82 /internal/server/handlers/readcommand.go
parent88886206c2c758bf619362aaa484dd3e254b8ed1 (diff)
fix: ensure complete data transmission in turbo mode for dtail operations
This commit fixes integration test failures in turbo mode where data was not being fully transmitted before the connection closed. The main issue was that readWithTurboProcessor was returning too quickly without ensuring all data had been written to the network stream. Key changes: - Add comprehensive trace logging to track data flow in turbo mode - Fix turbo channel draining mechanism in baseHandler.Read() to wait for all data - Add proper flushing in TurboNetworkWriter with channel drain synchronization - Increase flush timeout from 10 to 100 iterations for turbo mode data volumes - Fix color formatting in serverless mode by processing lines individually - Add synchronization delays to ensure data transmission completes The fixes ensure that all data is properly transmitted before connection closure, resolving TestDcat integration test failures when DTAIL_TURBOBOOST_ENABLE is set. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go102
1 files changed, 79 insertions, 23 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 2245b7c..dc11aa9 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -2,7 +2,6 @@ package handlers
import (
"context"
- "fmt"
"os"
"path/filepath"
"strings"
@@ -32,7 +31,7 @@ func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
argc int, args []string, retries int) {
-
+
re := regex.NewNoop()
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
@@ -53,7 +52,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
// e.g.: grep foo bar.log | dmap 'from STATS select ...'
// Only read from pipe if no file argument is provided
isPipe := r.isInputFromPipe() && (argc < 2 || args[1] == "" || args[1] == "-")
-
+
if isPipe {
dlog.Server.Debug("Reading data from stdin pipe")
// Empty file path and globID "-" represents reading from the stdin pipe.
@@ -110,7 +109,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re)
}
wg.Wait()
-
+
// In turbo mode with aggregate, we don't close the shared channel here
// because it will be used across multiple invocations
// The aggregate will handle channel closure when it's done
@@ -132,7 +131,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
-
+
dlog.Server.Info(r.server.user, "Start reading", path, globID)
var reader fs.FileReader
var limiter chan struct{}
@@ -173,16 +172,11 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
// Check if we should use the turbo boost optimizations
turboBoostEnabled := config.Env("DTAIL_TURBOBOOST_ENABLE")
- dlog.Server.Info(r.server.user, "Turbo boost check: enabled=", turboBoostEnabled, "mode=", r.mode)
- // Only enable channel-less for server mode, not serverless mode
- // Use the serverless field directly as it's more reliable
// Enable turbo boost for cat/grep/tail modes, but NOT for aggregate (MapReduce) operations
// MapReduce requires the traditional channel-based approach to work correctly
- if turboBoostEnabled && !r.server.serverless && r.server.aggregate == nil &&
+ if turboBoostEnabled && r.server.aggregate == nil &&
(r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
- // Log to stderr for testing verification - only in server mode
- fmt.Fprintf(os.Stderr, "[DTAIL] Turbo boost enabled: using channel-less implementation for %s\n", path)
- r.readWithProcessor(ctx, ltx, path, globID, re, reader)
+ r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader)
return
}
@@ -222,7 +216,6 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
path, globID string, re regex.Regex, reader fs.FileReader) {
dlog.Server.Info(r.server.user, "Using channel-less grep implementation", path, globID)
-
// Use the existing lines channel but with the processor-based reader
lines := r.server.lines
@@ -230,15 +223,6 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
// Use the optimized version if turbo boost is enabled
turboBoostEnabled := config.Env("DTAIL_TURBOBOOST_ENABLE")
-
- // Log to stderr for testing verification - only in server mode
- if !r.server.serverless {
- if turboBoostEnabled {
- fmt.Fprintf(os.Stderr, "[DTAIL] Turbo boost enabled: using optimized reader for %s\n", path)
- } else {
- fmt.Fprintf(os.Stderr, "[DTAIL] Using standard processor reader for %s\n", path)
- }
- }
for {
if aggregate != nil {
@@ -258,7 +242,7 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
} else {
err = reader.StartWithProcessor(ctx, ltx, processor, re)
}
-
+
if err != nil {
dlog.Server.Error(r.server.user, path, globID, err)
}
@@ -281,6 +265,78 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte
}
}
+func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.LContext,
+ path, globID string, re regex.Regex, reader fs.FileReader) {
+
+ dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID)
+
+ // Enable turbo mode if not already enabled
+ if !r.server.IsTurboMode() {
+ r.server.EnableTurboMode()
+ }
+
+ // Create a direct writer based on the mode
+ // In serverless mode, write directly to stdout
+ // In server mode, use the turbo channel
+ var writer TurboWriter
+ if r.server.serverless {
+ // In serverless mode, write directly to stdout
+ writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless)
+ } else {
+ // In server mode, use the network writer with turbo channels
+ writer = &TurboNetworkWriter{
+ handler: &r.server.baseHandler,
+ hostname: r.server.hostname,
+ plain: r.server.plain,
+ serverless: r.server.serverless,
+ }
+ }
+
+ for {
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration")
+
+ // Create a direct processor that writes without channels
+ processor := NewDirectLineProcessor(writer, globID)
+
+ // Use the optimized reader
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start")
+ err := reader.StartWithProcessorOptimized(ctx, ltx, processor, re)
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> completed")
+ if err != nil {
+ dlog.Server.Error(r.server.user, path, globID, err)
+ }
+
+ // Ensure we flush before closing
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> flushing processor")
+ if flushErr := processor.Flush(); flushErr != nil {
+ dlog.Server.Error(r.server.user, path, globID, "flush error", flushErr)
+ }
+
+ // Close the processor after each iteration
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> closing processor")
+ processor.Close()
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> processor closed")
+
+ // Give time for data to be transmitted
+ // This is crucial for integration tests to ensure all data is sent
+ if !r.server.serverless {
+ dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> waiting for data transmission")
+ time.Sleep(50 * time.Millisecond)
+ }
+
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if !reader.Retry() {
+ return
+ }
+ }
+ time.Sleep(time.Second * 2)
+ dlog.Server.Info(path, globID, "Reading file again")
+ }
+}
+
func (r *readCommand) makeGlobID(path, glob string) string {
var idParts []string
pathParts := strings.Split(path, "/")