summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 09:24:02 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 09:24:02 +0200
commit0558e4259cddfab8b750e396218b7816feb4a62b (patch)
tree283f236faf1353d89de30714c8c31a3faca21dcc /internal/server
parente411633018ff2f9ad01037cc14b946f6885e5c0c (diff)
refactor(handlers): extract shutdown coordination from read command
Task: 45cfde84-3b56-4821-bc84-b8e9a90d2ca4
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/readcommand.go45
-rw-r--r--internal/server/handlers/shutdown_coordinator.go54
2 files changed, 61 insertions, 38 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index f5fbfb9..86ae708 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -20,16 +20,18 @@ import (
)
type readCommand struct {
- server *ServerHandler
- mode omode.Mode
+ server *ServerHandler
+ mode omode.Mode
+ shutdownCoordinator *shutdownCoordinator
}
type readStrategy func(context.Context, lcontext.LContext, fs.FileReader, regex.Regex) error
func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
return &readCommand{
- server: server,
- mode: mode,
+ server: server,
+ mode: mode,
+ shutdownCoordinator: newShutdownCoordinator(server),
}
}
@@ -163,40 +165,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
defer wg.Done()
defer func() {
- // Decrement pending files counter when this file is done
- remaining := atomic.AddInt32(&r.server.pendingFiles, -1)
- dlog.Server.Debug(r.server.user, "File processing complete", "path", path, "remainingPending", remaining)
-
- // Check if we should trigger shutdown now
- // Only shutdown if no files are pending AND no commands are active
- if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 {
- // If we have a turbo aggregate, trigger final serialization
- if r.server.turboAggregate != nil {
- dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization")
- r.server.turboAggregate.Serialize(context.Background())
- // Give more time for serialization to complete
- // This is critical when processing many files concurrently
- // In serverless mode, serialization is synchronous, so no wait needed
- if !r.server.serverless {
- time.Sleep(500 * time.Millisecond)
- }
- }
-
- // Double-check that we really have no pending work
- // In turbo mode, there might be a race condition
- // In serverless mode, no need for this delay
- if !r.server.serverless {
- time.Sleep(10 * time.Millisecond)
- }
- finalPending := atomic.LoadInt32(&r.server.pendingFiles)
- finalActive := atomic.LoadInt32(&r.server.activeCommands)
- if finalPending == 0 && finalActive == 0 {
- dlog.Server.Debug(r.server.user, "No active commands and no pending files after double-check, triggering shutdown")
- r.server.shutdown()
- } else {
- dlog.Server.Debug(r.server.user, "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive)
- }
- }
+ r.shutdownCoordinator.onFileProcessed(path)
}()
globID := r.makeGlobID(path, glob)
diff --git a/internal/server/handlers/shutdown_coordinator.go b/internal/server/handlers/shutdown_coordinator.go
new file mode 100644
index 0000000..54df09a
--- /dev/null
+++ b/internal/server/handlers/shutdown_coordinator.go
@@ -0,0 +1,54 @@
+package handlers
+
+import (
+ "context"
+ "sync/atomic"
+ "time"
+
+ "github.com/mimecast/dtail/internal/io/dlog"
+)
+
+type shutdownCoordinator struct {
+ server *ServerHandler
+}
+
+func newShutdownCoordinator(server *ServerHandler) *shutdownCoordinator {
+ return &shutdownCoordinator{server: server}
+}
+
+func (c *shutdownCoordinator) onFileProcessed(path string) {
+ remaining := atomic.AddInt32(&c.server.pendingFiles, -1)
+ dlog.Server.Debug(c.server.user, "File processing complete", "path", path, "remainingPending", remaining)
+
+ if remaining != 0 || atomic.LoadInt32(&c.server.activeCommands) != 0 {
+ return
+ }
+
+ c.finalizeWhenIdle()
+}
+
+func (c *shutdownCoordinator) finalizeWhenIdle() {
+ // If we have a turbo aggregate, trigger final serialization.
+ if c.server.turboAggregate != nil {
+ dlog.Server.Info(c.server.user, "Triggering final turbo aggregate serialization")
+ c.server.turboAggregate.Serialize(context.Background())
+ // In serverless mode, serialization is synchronous, so no wait needed.
+ if !c.server.serverless {
+ time.Sleep(500 * time.Millisecond)
+ }
+ }
+
+ // Double-check that we really have no pending work before shutdown.
+ if !c.server.serverless {
+ time.Sleep(10 * time.Millisecond)
+ }
+ finalPending := atomic.LoadInt32(&c.server.pendingFiles)
+ finalActive := atomic.LoadInt32(&c.server.activeCommands)
+ if finalPending == 0 && finalActive == 0 {
+ dlog.Server.Debug(c.server.user, "No active commands and no pending files after double-check, triggering shutdown")
+ c.server.shutdown()
+ return
+ }
+
+ dlog.Server.Debug(c.server.user, "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive)
+}