summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/server/handlers/basehandler.go10
-rw-r--r--internal/server/handlers/readcommand.go52
-rw-r--r--internal/server/handlers/serverhandler.go10
3 files changed, 64 insertions, 8 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index a82c91a..bfc7ec2 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -385,7 +385,15 @@ func (h *baseHandler) flush() {
}
func (h *baseHandler) shutdown() {
- dlog.Server.Debug(h.user, "shutdown()")
+ // Log current state at shutdown
+ activeCommands := atomic.LoadInt32(&h.activeCommands)
+ dlog.Server.Info(h.user, "shutdown() called", "activeCommands", activeCommands, "turboMode", h.turboMode)
+
+ // In turbo mode, ensure all data is flushed before shutdown
+ if h.turboMode {
+ h.flushTurboData()
+ }
+
h.flush()
go func() {
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 91bd7c3..2920044 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -6,6 +6,7 @@ import (
"path/filepath"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/mimecast/dtail/internal/config"
@@ -103,12 +104,19 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
+ dlog.Server.Info(r.server.user, "Processing files", "count", len(paths), "glob", glob)
+
+ // Track pending files for this batch
+ atomic.AddInt32(&r.server.pendingFiles, int32(len(paths)))
+
var wg sync.WaitGroup
wg.Add(len(paths))
for _, path := range paths {
go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re)
}
wg.Wait()
+
+ dlog.Server.Info(r.server.user, "All files processed", "count", len(paths))
// In turbo mode, signal EOF after all files are processed
// This is crucial for proper shutdown in server mode
@@ -116,6 +124,11 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
if turboBoostEnabled && r.server.aggregate == nil &&
(r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
if r.server.IsTurboMode() && r.server.turboEOF != nil {
+ dlog.Server.Debug(r.server.user, "Turbo mode: flushing data before EOF signal")
+
+ // Ensure all turbo data is flushed before signaling EOF
+ r.server.flushTurboData()
+
// Signal EOF by closing the channel, but only if it hasn't been closed yet
select {
case <-r.server.turboEOF:
@@ -123,8 +136,19 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
default:
close(r.server.turboEOF)
}
- // Wait longer to ensure all data is transmitted for large batches
- time.Sleep(500 * time.Millisecond)
+
+ // Wait to ensure all data is transmitted
+ // This is especially important when files are queued due to concurrency limits
+ waitTime := 500 * time.Millisecond
+ if len(paths) > 10 {
+ // For many files, wait proportionally longer
+ waitTime = time.Duration(len(paths)*10) * time.Millisecond
+ if waitTime > 2*time.Second {
+ waitTime = 2 * time.Second
+ }
+ }
+ dlog.Server.Debug(r.server.user, "Waiting for data transmission", "duration", waitTime)
+ time.Sleep(waitTime)
}
}
@@ -137,6 +161,18 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
wg *sync.WaitGroup, path, glob string, re regex.Regex) {
defer wg.Done()
+ defer func() {
+ // Decrement pending files counter when this file is done
+ remaining := atomic.AddInt32(&r.server.pendingFiles, -1)
+ dlog.Server.Debug(r.server.user, "File processing complete", "path", path, "remainingPending", remaining)
+
+ // Check if we should trigger shutdown now
+ if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 {
+ dlog.Server.Debug(r.server.user, "No active commands and no pending files, triggering shutdown")
+ r.server.shutdown()
+ }
+ }()
+
globID := r.makeGlobID(path, glob)
if !r.server.user.HasFilePermission(path, "readfiles") {
dlog.Server.Error(r.server.user, "No permission to read file", path, globID)
@@ -176,14 +212,17 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
select {
case limiter <- struct{}{}:
+ dlog.Server.Debug(r.server.user, "Got limiter slot immediately", "path", path)
case <-ctx.Done():
+ dlog.Server.Debug(r.server.user, "Context cancelled while waiting for limiter", "path", path)
return
default:
- dlog.Server.Info("Server limit hit, queueing file", len(limiter), path)
+ dlog.Server.Info(r.server.user, "Server limit hit, queueing file", "limiterLen", len(limiter), "path", path, "maxConcurrent", cap(limiter))
select {
case limiter <- struct{}{}:
- dlog.Server.Info("Server limit OK now, processing file", len(limiter), path)
+ dlog.Server.Info(r.server.user, "Server limit OK now, processing file", "limiterLen", len(limiter), "path", path)
case <-ctx.Done():
+ dlog.Server.Debug(r.server.user, "Context cancelled while queued for limiter", "path", path)
return
}
}
@@ -294,14 +333,15 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L
}
// Create a direct writer based on the mode
- // In serverless mode, write directly to stdout
- // In server mode, use the turbo channel
+ // Each file gets its own writer instance to avoid race conditions
+ // when multiple files are processed concurrently
var writer TurboWriter
if r.server.serverless {
// In serverless mode, write directly to stdout
writer = NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless)
} else {
// In server mode, use the network writer with turbo channels
+ // Create a new instance for each file to ensure thread safety
writer = &TurboNetworkWriter{
handler: &r.server.baseHandler,
hostname: r.server.hostname,
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 69bebc4..9163447 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -3,6 +3,7 @@ package handlers
import (
"context"
"strings"
+ "sync/atomic"
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
@@ -21,6 +22,8 @@ type ServerHandler struct {
catLimiter chan struct{}
tailLimiter chan struct{}
regex string
+ // Track pending files waiting for limiter slots
+ pendingFiles int32
}
// NewServerHandler returns the server handler.
@@ -60,7 +63,12 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, ltx lcontext.LCon
dlog.Server.Debug(h.user, "Handling user command", argc, args)
h.incrementActiveCommands()
commandFinished := func() {
- if h.decrementActiveCommands() == 0 {
+ activeCommands := h.decrementActiveCommands()
+ pendingFiles := atomic.LoadInt32(&h.pendingFiles)
+ dlog.Server.Debug(h.user, "Command finished", "activeCommands", activeCommands, "pendingFiles", pendingFiles)
+
+ // Only shutdown if no active commands AND no pending files
+ if activeCommands == 0 && pendingFiles == 0 {
h.shutdown()
}
}