summaryrefslogtreecommitdiff
path: root/internal/server/handlers/shutdown_coordinator.go
blob: 973cc61c08dc23c0a2aea163a537a75ba6f63c0b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package handlers

import (
	"context"
	"time"

	"github.com/mimecast/dtail/internal/io/dlog"
)

type shutdownCoordinator struct {
	server readCommandServer
}

func newShutdownCoordinator(server readCommandServer) *shutdownCoordinator {
	return &shutdownCoordinator{server: server}
}

func (c *shutdownCoordinator) onFileProcessed(path string) {
	remaining, activeCommands := c.server.CompletePendingFile()
	dlog.Server.Debug(c.server.LogContext(), "File processing complete", "path", path, "remainingPending", remaining)

	if remaining != 0 || activeCommands != 0 {
		return
	}

	c.finalizeWhenIdle()
}

func (c *shutdownCoordinator) finalizeWhenIdle() {
	// If we have a turbo aggregate, trigger final serialization.
	if turboAggregate := c.server.TurboAggregate(); turboAggregate != nil {
		dlog.Server.Info(c.server.LogContext(), "Triggering final turbo aggregate serialization")
		turboAggregate.Serialize(context.Background())
		// In serverless mode, serialization is synchronous, so no wait needed.
		if !c.server.Serverless() {
			time.Sleep(c.server.ShutdownTurboSerializeWait())
		}
	}

	// Double-check that we really have no pending work before shutdown.
	if !c.server.Serverless() {
		time.Sleep(c.server.ShutdownIdleRecheckWait())
	}
	finalPending, finalActive := c.server.PendingAndActive()
	if finalPending == 0 && finalActive == 0 {
		dlog.Server.Debug(c.server.LogContext(), "No active commands and no pending files after double-check, triggering shutdown")
		c.server.TriggerShutdown()
		return
	}

	dlog.Server.Debug(c.server.LogContext(), "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive)
}