summaryrefslogtreecommitdiff
path: root/internal/server/handlers/shutdown_coordinator.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/server/handlers/shutdown_coordinator.go')
-rw-r--r--internal/server/handlers/shutdown_coordinator.go30
1 files changed, 14 insertions, 16 deletions
diff --git a/internal/server/handlers/shutdown_coordinator.go b/internal/server/handlers/shutdown_coordinator.go
index 54df09a..6d3eda7 100644
--- a/internal/server/handlers/shutdown_coordinator.go
+++ b/internal/server/handlers/shutdown_coordinator.go
@@ -2,25 +2,24 @@ package handlers
import (
"context"
- "sync/atomic"
"time"
"github.com/mimecast/dtail/internal/io/dlog"
)
type shutdownCoordinator struct {
- server *ServerHandler
+ server readCommandServer
}
-func newShutdownCoordinator(server *ServerHandler) *shutdownCoordinator {
+func newShutdownCoordinator(server readCommandServer) *shutdownCoordinator {
return &shutdownCoordinator{server: server}
}
func (c *shutdownCoordinator) onFileProcessed(path string) {
- remaining := atomic.AddInt32(&c.server.pendingFiles, -1)
- dlog.Server.Debug(c.server.user, "File processing complete", "path", path, "remainingPending", remaining)
+ remaining, activeCommands := c.server.CompletePendingFile()
+ dlog.Server.Debug(c.server.LogContext(), "File processing complete", "path", path, "remainingPending", remaining)
- if remaining != 0 || atomic.LoadInt32(&c.server.activeCommands) != 0 {
+ if remaining != 0 || activeCommands != 0 {
return
}
@@ -29,26 +28,25 @@ func (c *shutdownCoordinator) onFileProcessed(path string) {
func (c *shutdownCoordinator) finalizeWhenIdle() {
// If we have a turbo aggregate, trigger final serialization.
- if c.server.turboAggregate != nil {
- dlog.Server.Info(c.server.user, "Triggering final turbo aggregate serialization")
- c.server.turboAggregate.Serialize(context.Background())
+ if turboAggregate := c.server.TurboAggregate(); turboAggregate != nil {
+ dlog.Server.Info(c.server.LogContext(), "Triggering final turbo aggregate serialization")
+ turboAggregate.Serialize(context.Background())
// In serverless mode, serialization is synchronous, so no wait needed.
- if !c.server.serverless {
+ if !c.server.Serverless() {
time.Sleep(500 * time.Millisecond)
}
}
// Double-check that we really have no pending work before shutdown.
- if !c.server.serverless {
+ if !c.server.Serverless() {
time.Sleep(10 * time.Millisecond)
}
- finalPending := atomic.LoadInt32(&c.server.pendingFiles)
- finalActive := atomic.LoadInt32(&c.server.activeCommands)
+ finalPending, finalActive := c.server.PendingAndActive()
if finalPending == 0 && finalActive == 0 {
- dlog.Server.Debug(c.server.user, "No active commands and no pending files after double-check, triggering shutdown")
- c.server.shutdown()
+ dlog.Server.Debug(c.server.LogContext(), "No active commands and no pending files after double-check, triggering shutdown")
+ c.server.TriggerShutdown()
return
}
- dlog.Server.Debug(c.server.user, "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive)
+ dlog.Server.Debug(c.server.LogContext(), "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive)
}