diff options
| author | Paul Buetow <paul@buetow.org> | 2025-06-30 23:58:30 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-06-30 23:58:30 +0300 |
| commit | a3f6bb625aad2cd4a0c86af44feaa22aa401331f (patch) | |
| tree | 79878744944bebf5a4346ca0bff338be83a20f23 | |
| parent | 7a917e6e81bf8e956eff2a4a54e9300ab2747949 (diff) | |
feat: track pending files to prevent premature server shutdown
- Add pendingFiles counter to ServerHandler to track files waiting for limiter slots
- Only shutdown when both activeCommands and pendingFiles are zero
- Increment pendingFiles when starting to process a batch of files
- Decrement as each file completes processing
- Add comprehensive logging for debugging shutdown issues
- Flush turbo data before signaling EOF to ensure all data is transmitted
This fixes the issue where the server would shutdown while files were still
queued in the catLimiter, causing incomplete processing when MaxConcurrentCats
is lower than the number of files being processed.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
| -rw-r--r-- | internal/server/handlers/basehandler.go | 10 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 52 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 10 |
3 files changed, 64 insertions, 8 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index a82c91a..bfc7ec2 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -385,7 +385,15 @@ func (h *baseHandler) flush() { } func (h *baseHandler) shutdown() { - dlog.Server.Debug(h.user, "shutdown()") + // Log current state at shutdown + activeCommands := atomic.LoadInt32(&h.activeCommands) + dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turboMode) + + // In turbo mode, ensure all data is flushed before shutdown + if h.turboMode { + h.flushTurboData() + } + h.flush() go func() { diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 91bd7c3..2920044 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -6,6 +6,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/mimecast/dtail/internal/config" @@ -103,12 +104,19 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { + dlog.Server.Info(r.server.user, "Processing files", "count", len(paths), "glob", glob) + + // Track pending files for this batch + atomic.AddInt32(&r.server.pendingFiles, int32(len(paths))) + var wg sync.WaitGroup wg.Add(len(paths)) for _, path := range paths { go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re) } wg.Wait() + + dlog.Server.Info(r.server.user, "All files processed", "count", len(paths)) // In turbo mode, signal EOF after all files are processed // This is crucial for proper shutdown in server mode @@ -116,6 +124,11 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, if turboBoostEnabled && r.server.aggregate == nil && (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { if r.server.IsTurboMode() && r.server.turboEOF != nil { + dlog.Server.Debug(r.server.user, "Turbo mode: flushing data before EOF signal") + + // Ensure all turbo data is flushed before signaling EOF + r.server.flushTurboData() + // Signal EOF by closing the channel, but only if it hasn't been closed yet select { case <-r.server.turboEOF: @@ -123,8 +136,19 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, default: close(r.server.turboEOF) } - // Wait longer to ensure all data is transmitted for large batches - time.Sleep(500 * time.Millisecond) + + // Wait to ensure all data is transmitted + // This is especially important when files are queued due to concurrency limits + waitTime := 500 * time.Millisecond + if len(paths) > 10 { + // For many files, wait proportionally longer + waitTime = time.Duration(len(paths)*10) * time.Millisecond + if waitTime > 2*time.Second { + waitTime = 2 * time.Second + } + } + dlog.Server.Debug(r.server.user, "Waiting for data transmission", "duration", waitTime) + time.Sleep(waitTime) } } @@ -137,6 +161,18 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC wg *sync.WaitGroup, path, glob string, re regex.Regex) { defer wg.Done() + defer func() { + // Decrement pending files counter when this file is done + remaining := atomic.AddInt32(&r.server.pendingFiles, -1) + dlog.Server.Debug(r.server.user, "File processing complete", "path", path, "remainingPending", remaining) + + // Check if we should trigger shutdown now + if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 { + dlog.Server.Debug(r.server.user, "No active commands and no pending files, triggering shutdown") + r.server.shutdown() + } + }() + globID := r.makeGlobID(path, glob) if !r.server.user.HasFilePermission(path, "readfiles") { dlog.Server.Error(r.server.user, "No permission to read file", path, globID) @@ -176,14 +212,17 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, select { case limiter <- struct{}{}: + dlog.Server.Debug(r.server.user, "Got limiter slot immediately", "path", path) case <-ctx.Done(): + dlog.Server.Debug(r.server.user, "Context cancelled while waiting for limiter", "path", path) return default: - dlog.Server.Info("Server limit hit, queueing file", len(limiter), path) + dlog.Server.Info(r.server.user, "Server limit hit, queueing file", "limiterLen", len(limiter), "path", path, "maxConcurrent", cap(limiter)) select { case limiter <- struct{}{}: - dlog.Server.Info("Server limit OK now, processing file", len(limiter), path) + dlog.Server.Info(r.server.user, "Server limit OK now, processing file", "limiterLen", len(limiter), "path", path) case <-ctx.Done(): + dlog.Server.Debug(r.server.user, "Context cancelled while queued for limiter", "path", path) return } } @@ -294,14 +333,15 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L } // Create a direct writer based on the mode - // In serverless mode, write directly to stdout - // In server mode, use the turbo channel + // Each file gets its own writer instance to avoid race conditions + // when multiple files are processed concurrently var writer TurboWriter if r.server.serverless { // In serverless mode, write directly to stdout writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless) } else { // In server mode, use the network writer with turbo channels + // Create a new instance for each file to ensure thread safety writer = &TurboNetworkWriter{ handler: &r.server.baseHandler, hostname: r.server.hostname, diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 69bebc4..9163447 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -3,6 +3,7 @@ package handlers import ( "context" "strings" + "sync/atomic" "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" @@ -21,6 +22,8 @@ type ServerHandler struct { catLimiter chan struct{} tailLimiter chan struct{} regex string + // Track pending files waiting for limiter slots + pendingFiles int32 } // NewServerHandler returns the server handler. @@ -60,7 +63,12 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon dlog.Server.Debug(h.user, "Handling user command", argc, args) h.incrementActiveCommands() commandFinished := func() { - if h.decrementActiveCommands() == 0 { + activeCommands := h.decrementActiveCommands() + pendingFiles := atomic.LoadInt32(&h.pendingFiles) + dlog.Server.Debug(h.user, "Command finished", "activeCommands", activeCommands, "pendingFiles", pendingFiles) + + // Only shutdown if no active commands AND no pending files + if activeCommands == 0 && pendingFiles == 0 { h.shutdown() } } |
