summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-17 14:57:41 +0300
committerPaul Buetow <paul@buetow.org>2025-06-17 14:57:41 +0300
commitc9c037275e4a5f2c8c87d736f3521c0817770798 (patch)
tree80911b418f51dfc903dd74e769f6b2120b3fdee3 /internal/server/handlers/readcommand.go
parentb2cb4ca0563cc73af20460fe3b319263a96a6989 (diff)
Fix channelless mode for DTail operations
- Exclude TailClient operations from channelless processing to ensure proper real-time file monitoring - Add comprehensive MapReduce detection for both cat and tail commands with MAPREDUCE patterns and noop regex - Add IsNoop() method to Regex type for proper noop regex detection in CSV logformat operations - Update build instructions and testing guidance in CLAUDE.md All integration tests now pass with channelless mode enabled. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go23
1 files changed, 21 insertions, 2 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 17054df..89bb757 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -48,13 +48,19 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
}
// Check if channelless mode is enabled
- useChannelless := os.Getenv("DTAIL_USE_CHANNELLESS") == "yes"
+ // Note: MapReduce operations require the full channel-based aggregation infrastructure
+ // Note: Tail operations require continuous monitoring and real-time streaming
+ isMapReduceCmd := r.mode == omode.MapClient || r.isMapReduceCommand(re)
+ isTailCmd := r.mode == omode.TailClient
+ useChannelless := os.Getenv("DTAIL_USE_CHANNELLESS") == "yes" && !isMapReduceCmd && !isTailCmd
if useChannelless {
- dlog.Server.Debug("Using channelless processing mode")
+ dlog.Server.Debug("Using channelless processing mode for mode:", r.mode)
r.startChannelless(ctx, ltx, args, re, retries)
return
}
+
+ dlog.Server.Debug("Using channel-based processing mode for mode:", r.mode)
// In serverless mode, can also read data from pipe
// e.g.: grep foo bar.log | dmap 'from STATS select ...'
@@ -335,6 +341,19 @@ func (r *readCommand) readChannellessStdin(ctx context.Context, ltx lcontext.LCo
}
}
+// isMapReduceCommand checks if this is a command that's part of a MapReduce operation
+func (r *readCommand) isMapReduceCommand(re regex.Regex) bool {
+ // Only cat and tail commands can be part of MapReduce operations
+ if r.mode != omode.CatClient && r.mode != omode.TailClient {
+ return false
+ }
+
+ // Check if the regex contains MAPREDUCE pattern OR if it's a noop regex
+ // (noop regex is used for CSV logformat in MapReduce operations)
+ pattern := re.String()
+ return strings.Contains(pattern, "MAPREDUCE:") || re.IsNoop()
+}
+
// createChannellessProcessor creates the appropriate processor based on command mode
func (r *readCommand) createChannellessProcessor(re regex.Regex, ltx lcontext.LContext) fs.LineProcessor {
hostname := r.server.hostname // Use server hostname