diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/server/handlers/basehandler.go | 10 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 52 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 10 |
3 files changed, 64 insertions, 8 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index a82c91a..bfc7ec2 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -385,7 +385,15 @@ func (h *baseHandler) flush() { } func (h *baseHandler) shutdown() { - dlog.Server.Debug(h.user, "shutdown()") + // Log current state at shutdown + activeCommands := atomic.LoadInt32(&h.activeCommands) + dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turboMode) + + // In turbo mode, ensure all data is flushed before shutdown + if h.turboMode { + h.flushTurboData() + } + h.flush() go func() { diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 91bd7c3..2920044 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -6,6 +6,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "time" "github.com/mimecast/dtail/internal/config" @@ -103,12 +104,19 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, paths []string, glob string, re regex.Regex, retryInterval time.Duration) { + dlog.Server.Info(r.server.user, "Processing files", "count", len(paths), "glob", glob) + + // Track pending files for this batch + atomic.AddInt32(&r.server.pendingFiles, int32(len(paths))) + var wg sync.WaitGroup wg.Add(len(paths)) for _, path := range paths { go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re) } wg.Wait() + + dlog.Server.Info(r.server.user, "All files processed", "count", len(paths)) // In turbo mode, signal EOF after all files are processed // This is crucial for proper shutdown in server mode @@ -116,6 +124,11 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, if turboBoostEnabled && r.server.aggregate == nil && (r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) { if r.server.IsTurboMode() && r.server.turboEOF != nil { + dlog.Server.Debug(r.server.user, "Turbo mode: flushing data before EOF signal") + + // Ensure all turbo data is flushed before signaling EOF + r.server.flushTurboData() + // Signal EOF by closing the channel, but only if it hasn't been closed yet select { case <-r.server.turboEOF: @@ -123,8 +136,19 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, default: close(r.server.turboEOF) } - // Wait longer to ensure all data is transmitted for large batches - time.Sleep(500 * time.Millisecond) + + // Wait to ensure all data is transmitted + // This is especially important when files are queued due to concurrency limits + waitTime := 500 * time.Millisecond + if len(paths) > 10 { + // For many files, wait proportionally longer + waitTime = time.Duration(len(paths)*10) * time.Millisecond + if waitTime > 2*time.Second { + waitTime = 2 * time.Second + } + } + dlog.Server.Debug(r.server.user, "Waiting for data transmission", "duration", waitTime) + time.Sleep(waitTime) } } @@ -137,6 +161,18 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC wg *sync.WaitGroup, path, glob string, re regex.Regex) { defer wg.Done() + defer func() { + // Decrement pending files counter when this file is done + remaining := atomic.AddInt32(&r.server.pendingFiles, -1) + dlog.Server.Debug(r.server.user, "File processing complete", "path", path, "remainingPending", remaining) + + // Check if we should trigger shutdown now + if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 { + dlog.Server.Debug(r.server.user, "No active commands and no pending files, triggering shutdown") + r.server.shutdown() + } + }() + globID := r.makeGlobID(path, glob) if !r.server.user.HasFilePermission(path, "readfiles") { dlog.Server.Error(r.server.user, "No permission to read file", path, globID) @@ -176,14 +212,17 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, select { case limiter <- struct{}{}: + dlog.Server.Debug(r.server.user, "Got limiter slot immediately", "path", path) case <-ctx.Done(): + dlog.Server.Debug(r.server.user, "Context cancelled while waiting for limiter", "path", path) return default: - dlog.Server.Info("Server limit hit, queueing file", len(limiter), path) + dlog.Server.Info(r.server.user, "Server limit hit, queueing file", "limiterLen", len(limiter), "path", path, "maxConcurrent", cap(limiter)) select { case limiter <- struct{}{}: - dlog.Server.Info("Server limit OK now, processing file", len(limiter), path) + dlog.Server.Info(r.server.user, "Server limit OK now, processing file", "limiterLen", len(limiter), "path", path) case <-ctx.Done(): + dlog.Server.Debug(r.server.user, "Context cancelled while queued for limiter", "path", path) return } } @@ -294,14 +333,15 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L } // Create a direct writer based on the mode - // In serverless mode, write directly to stdout - // In server mode, use the turbo channel + // Each file gets its own writer instance to avoid race conditions + // when multiple files are processed concurrently var writer TurboWriter if r.server.serverless { // In serverless mode, write directly to stdout writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless) } else { // In server mode, use the network writer with turbo channels + // Create a new instance for each file to ensure thread safety writer = &TurboNetworkWriter{ handler: &r.server.baseHandler, hostname: r.server.hostname, diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 69bebc4..9163447 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -3,6 +3,7 @@ package handlers import ( "context" "strings" + "sync/atomic" "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" @@ -21,6 +22,8 @@ type ServerHandler struct { catLimiter chan struct{} tailLimiter chan struct{} regex string + // Track pending files waiting for limiter slots + pendingFiles int32 } // NewServerHandler returns the server handler. @@ -60,7 +63,12 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon dlog.Server.Debug(h.user, "Handling user command", argc, args) h.incrementActiveCommands() commandFinished := func() { - if h.decrementActiveCommands() == 0 { + activeCommands := h.decrementActiveCommands() + pendingFiles := atomic.LoadInt32(&h.pendingFiles) + dlog.Server.Debug(h.user, "Command finished", "activeCommands", activeCommands, "pendingFiles", pendingFiles) + + // Only shutdown if no active commands AND no pending files + if activeCommands == 0 && pendingFiles == 0 { h.shutdown() } } |
