diff options
| author | Paul Buetow <paul@buetow.org> | 2025-06-17 14:57:41 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-06-17 14:57:41 +0300 |
| commit | c9c037275e4a5f2c8c87d736f3521c0817770798 (patch) | |
| tree | 80911b418f51dfc903dd74e769f6b2120b3fdee3 /internal/server/handlers/readcommand.go | |
| parent | b2cb4ca0563cc73af20460fe3b319263a96a6989 (diff) | |
Fix channelless mode for DTail operations
- Exclude TailClient operations from channelless processing to ensure proper real-time file monitoring
- Add comprehensive MapReduce detection for both cat and tail commands with MAPREDUCE patterns and noop regex
- Add IsNoop() method to Regex type for proper noop regex detection in CSV logformat operations
- Update build instructions and testing guidance in CLAUDE.md
All integration tests now pass with channelless mode enabled.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/server/handlers/readcommand.go')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 23 |
1 files changed, 21 insertions, 2 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 17054df..89bb757 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -48,13 +48,19 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, } // Check if channelless mode is enabled - useChannelless := os.Getenv("DTAIL_USE_CHANNELLESS") == "yes" + // Note: MapReduce operations require the full channel-based aggregation infrastructure + // Note: Tail operations require continuous monitoring and real-time streaming + isMapReduceCmd := r.mode == omode.MapClient || r.isMapReduceCommand(re) + isTailCmd := r.mode == omode.TailClient + useChannelless := os.Getenv("DTAIL_USE_CHANNELLESS") == "yes" && !isMapReduceCmd && !isTailCmd if useChannelless { - dlog.Server.Debug("Using channelless processing mode") + dlog.Server.Debug("Using channelless processing mode for mode:", r.mode) r.startChannelless(ctx, ltx, args, re, retries) return } + + dlog.Server.Debug("Using channel-based processing mode for mode:", r.mode) // In serverless mode, can also read data from pipe // e.g.: grep foo bar.log | dmap 'from STATS select ...' @@ -335,6 +341,19 @@ func (r *readCommand) readChannellessStdin(ctx context.Context, ltx lcontext.LCo } } +// isMapReduceCommand checks if this is a command that's part of a MapReduce operation +func (r *readCommand) isMapReduceCommand(re regex.Regex) bool { + // Only cat and tail commands can be part of MapReduce operations + if r.mode != omode.CatClient && r.mode != omode.TailClient { + return false + } + + // Check if the regex contains MAPREDUCE pattern OR if it's a noop regex + // (noop regex is used for CSV logformat in MapReduce operations) + pattern := re.String() + return strings.Contains(pattern, "MAPREDUCE:") || re.IsNoop() +} + // createChannellessProcessor creates the appropriate processor based on command mode func (r *readCommand) createChannellessProcessor(re regex.Regex, ltx lcontext.LContext) fs.LineProcessor { hostname := r.server.hostname // Use server hostname |
