From c9c037275e4a5f2c8c87d736f3521c0817770798 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 17 Jun 2025 14:57:41 +0300 Subject: Fix channelless mode for DTail operations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Exclude TailClient operations from channelless processing to ensure proper real-time file monitoring - Add comprehensive MapReduce detection for both cat and tail commands with MAPREDUCE patterns and noop regex - Add IsNoop() method to Regex type for proper noop regex detection in CSV logformat operations - Update build instructions and testing guidance in CLAUDE.md All integration tests now pass with channelless mode enabled. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- internal/server/handlers/readcommand.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) (limited to 'internal/server/handlers/readcommand.go') diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 17054df..89bb757 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -48,13 +48,19 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, } // Check if channelless mode is enabled - useChannelless := os.Getenv("DTAIL_USE_CHANNELLESS") == "yes" + // Note: MapReduce operations require the full channel-based aggregation infrastructure + // Note: Tail operations require continuous monitoring and real-time streaming + isMapReduceCmd := r.mode == omode.MapClient || r.isMapReduceCommand(re) + isTailCmd := r.mode == omode.TailClient + useChannelless := os.Getenv("DTAIL_USE_CHANNELLESS") == "yes" && !isMapReduceCmd && !isTailCmd if useChannelless { - dlog.Server.Debug("Using channelless processing mode") + dlog.Server.Debug("Using channelless processing mode for mode:", r.mode) r.startChannelless(ctx, ltx, args, re, retries) return } + + dlog.Server.Debug("Using channel-based processing mode for mode:", r.mode) // In serverless mode, can also read data from pipe // e.g.: grep foo bar.log | dmap 'from STATS select ...' @@ -335,6 +341,19 @@ func (r *readCommand) readChannellessStdin(ctx context.Context, ltx lcontext.LCo } } +// isMapReduceCommand checks if this is a command that's part of a MapReduce operation +func (r *readCommand) isMapReduceCommand(re regex.Regex) bool { + // Only cat and tail commands can be part of MapReduce operations + if r.mode != omode.CatClient && r.mode != omode.TailClient { + return false + } + + // Check if the regex contains MAPREDUCE pattern OR if it's a noop regex + // (noop regex is used for CSV logformat in MapReduce operations) + pattern := re.String() + return strings.Contains(pattern, "MAPREDUCE:") || re.IsNoop() +} + // createChannellessProcessor creates the appropriate processor based on command mode func (r *readCommand) createChannellessProcessor(re regex.Regex, ltx lcontext.LContext) fs.LineProcessor { hostname := r.server.hostname // Use server hostname -- cgit v1.2.3