summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2025-06-30 23:58:30 +0300
committerPaul Buetow <paul@buetow.org>2025-06-30 23:58:30 +0300
commita3f6bb625aad2cd4a0c86af44feaa22aa401331f (patch)
tree79878744944bebf5a4346ca0bff338be83a20f23
parent7a917e6e81bf8e956eff2a4a54e9300ab2747949 (diff)
feat: track pending files to prevent premature server shutdown
- Add pendingFiles counter to ServerHandler to track files waiting for limiter slots - Only shutdown when both activeCommands and pendingFiles are zero - Increment pendingFiles when starting to process a batch of files - Decrement as each file completes processing - Add comprehensive logging for debugging shutdown issues - Flush turbo data before signaling EOF to ensure all data is transmitted This fixes the issue where the server would shutdown while files were still queued in the catLimiter, causing incomplete processing when MaxConcurrentCats is lower than the number of files being processed. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
-rw-r--r--internal/server/handlers/basehandler.go10
-rw-r--r--internal/server/handlers/readcommand.go52
-rw-r--r--internal/server/handlers/serverhandler.go10
3 files changed, 64 insertions, 8 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index a82c91a..bfc7ec2 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -385,7 +385,15 @@ func (h *baseHandler) flush() {
}
func (h *baseHandler) shutdown() {
- dlog.Server.Debug(h.user, "shutdown()")
+ // Log current state at shutdown
+ activeCommands := atomic.LoadInt32(&h.activeCommands)
+ dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turboMode)
+
+ // In turbo mode, ensure all data is flushed before shutdown
+ if h.turboMode {
+ h.flushTurboData()
+ }
+
h.flush()
go func() {
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 91bd7c3..2920044 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -6,6 +6,7 @@ import (
"path/filepath"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/mimecast/dtail/internal/config"
@@ -103,12 +104,19 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
+ dlog.Server.Info(r.server.user, "Processing files", "count", len(paths), "glob", glob)
+
+ // Track pending files for this batch
+ atomic.AddInt32(&r.server.pendingFiles, int32(len(paths)))
+
var wg sync.WaitGroup
wg.Add(len(paths))
for _, path := range paths {
go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re)
}
wg.Wait()
+
+ dlog.Server.Info(r.server.user, "All files processed", "count", len(paths))
// In turbo mode, signal EOF after all files are processed
// This is crucial for proper shutdown in server mode
@@ -116,6 +124,11 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
if turboBoostEnabled && r.server.aggregate == nil &&
(r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
if r.server.IsTurboMode() && r.server.turboEOF != nil {
+ dlog.Server.Debug(r.server.user, "Turbo mode: flushing data before EOF signal")
+
+ // Ensure all turbo data is flushed before signaling EOF
+ r.server.flushTurboData()
+
// Signal EOF by closing the channel, but only if it hasn't been closed yet
select {
case <-r.server.turboEOF:
@@ -123,8 +136,19 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
default:
close(r.server.turboEOF)
}
- // Wait longer to ensure all data is transmitted for large batches
- time.Sleep(500 * time.Millisecond)
+
+ // Wait to ensure all data is transmitted
+ // This is especially important when files are queued due to concurrency limits
+ waitTime := 500 * time.Millisecond
+ if len(paths) > 10 {
+ // For many files, wait proportionally longer
+ waitTime = time.Duration(len(paths)*10) * time.Millisecond
+ if waitTime > 2*time.Second {
+ waitTime = 2 * time.Second
+ }
+ }
+ dlog.Server.Debug(r.server.user, "Waiting for data transmission", "duration", waitTime)
+ time.Sleep(waitTime)
}
}
@@ -137,6 +161,18 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
wg *sync.WaitGroup, path, glob string, re regex.Regex) {
defer wg.Done()
+ defer func() {
+ // Decrement pending files counter when this file is done
+ remaining := atomic.AddInt32(&r.server.pendingFiles, -1)
+ dlog.Server.Debug(r.server.user, "File processing complete", "path", path, "remainingPending", remaining)
+
+ // Check if we should trigger shutdown now
+ if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 {
+ dlog.Server.Debug(r.server.user, "No active commands and no pending files, triggering shutdown")
+ r.server.shutdown()
+ }
+ }()
+
globID := r.makeGlobID(path, glob)
if !r.server.user.HasFilePermission(path, "readfiles") {
dlog.Server.Error(r.server.user, "No permission to read file", path, globID)
@@ -176,14 +212,17 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
select {
case limiter <- struct{}{}:
+ dlog.Server.Debug(r.server.user, "Got limiter slot immediately", "path", path)
case <-ctx.Done():
+ dlog.Server.Debug(r.server.user, "Context cancelled while waiting for limiter", "path", path)
return
default:
- dlog.Server.Info("Server limit hit, queueing file", len(limiter), path)
+ dlog.Server.Info(r.server.user, "Server limit hit, queueing file", "limiterLen", len(limiter), "path", path, "maxConcurrent", cap(limiter))
select {
case limiter <- struct{}{}:
- dlog.Server.Info("Server limit OK now, processing file", len(limiter), path)
+ dlog.Server.Info(r.server.user, "Server limit OK now, processing file", "limiterLen", len(limiter), "path", path)
case <-ctx.Done():
+ dlog.Server.Debug(r.server.user, "Context cancelled while queued for limiter", "path", path)
return
}
}
@@ -294,14 +333,15 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
}
// Create a direct writer based on the mode
- // In serverless mode, write directly to stdout
- // In server mode, use the turbo channel
+ // Each file gets its own writer instance to avoid race conditions
+ // when multiple files are processed concurrently
var writer TurboWriter
if r.server.serverless {
// In serverless mode, write directly to stdout
writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless)
} else {
// In server mode, use the network writer with turbo channels
+ // Create a new instance for each file to ensure thread safety
writer = &TurboNetworkWriter{
handler: &r.server.baseHandler,
hostname: r.server.hostname,
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 69bebc4..9163447 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -3,6 +3,7 @@ package handlers
import (
"context"
"strings"
+ "sync/atomic"
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
@@ -21,6 +22,8 @@ type ServerHandler struct {
catLimiter chan struct{}
tailLimiter chan struct{}
regex string
+ // Track pending files waiting for limiter slots
+ pendingFiles int32
}
// NewServerHandler returns the server handler.
@@ -60,7 +63,12 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon
dlog.Server.Debug(h.user, "Handling user command", argc, args)
h.incrementActiveCommands()
commandFinished := func() {
- if h.decrementActiveCommands() == 0 {
+ activeCommands := h.decrementActiveCommands()
+ pendingFiles := atomic.LoadInt32(&h.pendingFiles)
+ dlog.Server.Debug(h.user, "Command finished", "activeCommands", activeCommands, "pendingFiles", pendingFiles)
+
+ // Only shutdown if no active commands AND no pending files
+ if activeCommands == 0 && pendingFiles == 0 {
h.shutdown()
}
}