diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 09:24:02 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 09:24:02 +0200 |
| commit | 0558e4259cddfab8b750e396218b7816feb4a62b (patch) | |
| tree | 283f236faf1353d89de30714c8c31a3faca21dcc /internal/server | |
| parent | e411633018ff2f9ad01037cc14b946f6885e5c0c (diff) | |
refactor(handlers): extract shutdown coordination from read command
Task: 45cfde84-3b56-4821-bc84-b8e9a90d2ca4
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 45 | ||||
| -rw-r--r-- | internal/server/handlers/shutdown_coordinator.go | 54 |
2 files changed, 61 insertions, 38 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index f5fbfb9..86ae708 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -20,16 +20,18 @@ import ( ) type readCommand struct { - server *ServerHandler - mode omode.Mode + server *ServerHandler + mode omode.Mode + shutdownCoordinator *shutdownCoordinator } type readStrategy func(context.Context, lcontext.LContext, fs.FileReader, regex.Regex) error func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand { return &readCommand{ - server: server, - mode: mode, + server: server, + mode: mode, + shutdownCoordinator: newShutdownCoordinator(server), } } @@ -163,40 +165,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC defer wg.Done() defer func() { - // Decrement pending files counter when this file is done - remaining := atomic.AddInt32(&r.server.pendingFiles, -1) - dlog.Server.Debug(r.server.user, "File processing complete", "path", path, "remainingPending", remaining) - - // Check if we should trigger shutdown now - // Only shutdown if no files are pending AND no commands are active - if remaining == 0 && atomic.LoadInt32(&r.server.activeCommands) == 0 { - // If we have a turbo aggregate, trigger final serialization - if r.server.turboAggregate != nil { - dlog.Server.Info(r.server.user, "Triggering final turbo aggregate serialization") - r.server.turboAggregate.Serialize(context.Background()) - // Give more time for serialization to complete - // This is critical when processing many files concurrently - // In serverless mode, serialization is synchronous, so no wait needed - if !r.server.serverless { - time.Sleep(500 * time.Millisecond) - } - } - - // Double-check that we really have no pending work - // In turbo mode, there might be a race condition - // In serverless mode, no need for this delay - if !r.server.serverless { - time.Sleep(10 * time.Millisecond) - } - finalPending := atomic.LoadInt32(&r.server.pendingFiles) - finalActive := atomic.LoadInt32(&r.server.activeCommands) - if finalPending == 0 && finalActive == 0 { - dlog.Server.Debug(r.server.user, "No active commands and no pending files after double-check, triggering shutdown") - r.server.shutdown() - } else { - dlog.Server.Debug(r.server.user, "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive) - } - } + r.shutdownCoordinator.onFileProcessed(path) }() globID := r.makeGlobID(path, glob) diff --git a/internal/server/handlers/shutdown_coordinator.go b/internal/server/handlers/shutdown_coordinator.go new file mode 100644 index 0000000..54df09a --- /dev/null +++ b/internal/server/handlers/shutdown_coordinator.go @@ -0,0 +1,54 @@ +package handlers + +import ( + "context" + "sync/atomic" + "time" + + "github.com/mimecast/dtail/internal/io/dlog" +) + +type shutdownCoordinator struct { + server *ServerHandler +} + +func newShutdownCoordinator(server *ServerHandler) *shutdownCoordinator { + return &shutdownCoordinator{server: server} +} + +func (c *shutdownCoordinator) onFileProcessed(path string) { + remaining := atomic.AddInt32(&c.server.pendingFiles, -1) + dlog.Server.Debug(c.server.user, "File processing complete", "path", path, "remainingPending", remaining) + + if remaining != 0 || atomic.LoadInt32(&c.server.activeCommands) != 0 { + return + } + + c.finalizeWhenIdle() +} + +func (c *shutdownCoordinator) finalizeWhenIdle() { + // If we have a turbo aggregate, trigger final serialization. + if c.server.turboAggregate != nil { + dlog.Server.Info(c.server.user, "Triggering final turbo aggregate serialization") + c.server.turboAggregate.Serialize(context.Background()) + // In serverless mode, serialization is synchronous, so no wait needed. + if !c.server.serverless { + time.Sleep(500 * time.Millisecond) + } + } + + // Double-check that we really have no pending work before shutdown. + if !c.server.serverless { + time.Sleep(10 * time.Millisecond) + } + finalPending := atomic.LoadInt32(&c.server.pendingFiles) + finalActive := atomic.LoadInt32(&c.server.activeCommands) + if finalPending == 0 && finalActive == 0 { + dlog.Server.Debug(c.server.user, "No active commands and no pending files after double-check, triggering shutdown") + c.server.shutdown() + return + } + + dlog.Server.Debug(c.server.user, "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive) +} |
