diff options
Diffstat (limited to 'internal/server/handlers/shutdown_coordinator.go')
| -rw-r--r-- | internal/server/handlers/shutdown_coordinator.go | 30 |
1 files changed, 14 insertions, 16 deletions
diff --git a/internal/server/handlers/shutdown_coordinator.go b/internal/server/handlers/shutdown_coordinator.go index 54df09a..6d3eda7 100644 --- a/internal/server/handlers/shutdown_coordinator.go +++ b/internal/server/handlers/shutdown_coordinator.go @@ -2,25 +2,24 @@ package handlers import ( "context" - "sync/atomic" "time" "github.com/mimecast/dtail/internal/io/dlog" ) type shutdownCoordinator struct { - server *ServerHandler + server readCommandServer } -func newShutdownCoordinator(server *ServerHandler) *shutdownCoordinator { +func newShutdownCoordinator(server readCommandServer) *shutdownCoordinator { return &shutdownCoordinator{server: server} } func (c *shutdownCoordinator) onFileProcessed(path string) { - remaining := atomic.AddInt32(&c.server.pendingFiles, -1) - dlog.Server.Debug(c.server.user, "File processing complete", "path", path, "remainingPending", remaining) + remaining, activeCommands := c.server.CompletePendingFile() + dlog.Server.Debug(c.server.LogContext(), "File processing complete", "path", path, "remainingPending", remaining) - if remaining != 0 || atomic.LoadInt32(&c.server.activeCommands) != 0 { + if remaining != 0 || activeCommands != 0 { return } @@ -29,26 +28,25 @@ func (c *shutdownCoordinator) onFileProcessed(path string) { func (c *shutdownCoordinator) finalizeWhenIdle() { // If we have a turbo aggregate, trigger final serialization. - if c.server.turboAggregate != nil { - dlog.Server.Info(c.server.user, "Triggering final turbo aggregate serialization") - c.server.turboAggregate.Serialize(context.Background()) + if turboAggregate := c.server.TurboAggregate(); turboAggregate != nil { + dlog.Server.Info(c.server.LogContext(), "Triggering final turbo aggregate serialization") + turboAggregate.Serialize(context.Background()) // In serverless mode, serialization is synchronous, so no wait needed. - if !c.server.serverless { + if !c.server.Serverless() { time.Sleep(500 * time.Millisecond) } } // Double-check that we really have no pending work before shutdown. - if !c.server.serverless { + if !c.server.Serverless() { time.Sleep(10 * time.Millisecond) } - finalPending := atomic.LoadInt32(&c.server.pendingFiles) - finalActive := atomic.LoadInt32(&c.server.activeCommands) + finalPending, finalActive := c.server.PendingAndActive() if finalPending == 0 && finalActive == 0 { - dlog.Server.Debug(c.server.user, "No active commands and no pending files after double-check, triggering shutdown") - c.server.shutdown() + dlog.Server.Debug(c.server.LogContext(), "No active commands and no pending files after double-check, triggering shutdown") + c.server.TriggerShutdown() return } - dlog.Server.Debug(c.server.user, "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive) + dlog.Server.Debug(c.server.LogContext(), "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive) } |
