1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
package handlers
import (
"context"
"time"
"github.com/mimecast/dtail/internal/io/dlog"
)
type shutdownCoordinator struct {
server readCommandServer
}
func newShutdownCoordinator(server readCommandServer) *shutdownCoordinator {
return &shutdownCoordinator{server: server}
}
func (c *shutdownCoordinator) onFileProcessed(path string) {
remaining, activeCommands := c.server.CompletePendingFile()
dlog.Server.Debug(c.server.LogContext(), "File processing complete", "path", path, "remainingPending", remaining)
if remaining != 0 || activeCommands != 0 {
return
}
c.finalizeWhenIdle()
}
func (c *shutdownCoordinator) finalizeWhenIdle() {
// If we have a turbo aggregate, trigger final serialization.
if turboAggregate := c.server.TurboAggregate(); turboAggregate != nil {
dlog.Server.Info(c.server.LogContext(), "Triggering final turbo aggregate serialization")
turboAggregate.Serialize(context.Background())
// In serverless mode, serialization is synchronous, so no wait needed.
if !c.server.Serverless() {
time.Sleep(c.server.ShutdownTurboSerializeWait())
}
}
// Double-check that we really have no pending work before shutdown.
if !c.server.Serverless() {
time.Sleep(c.server.ShutdownIdleRecheckWait())
}
finalPending, finalActive := c.server.PendingAndActive()
if finalPending == 0 && finalActive == 0 {
dlog.Server.Debug(c.server.LogContext(), "No active commands and no pending files after double-check, triggering shutdown")
c.server.TriggerShutdown()
return
}
dlog.Server.Debug(c.server.LogContext(), "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive)
}
|