summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 10:22:11 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 10:22:11 +0200
commit8c08e4e60219782e50c3a5f20a051e706196f48c (patch)
treeccc953967f594eb423c4567579ba5f1eb63d1b45
parent3389e64c2fc2d7bdafb8d1d48118bdaae94a8190 (diff)
refactor: add readcommand facade for server dependencies
-rw-r--r--internal/server/handlers/readcommand.go133
-rw-r--r--internal/server/handlers/readcommand_server.go119
-rw-r--r--internal/server/handlers/shutdown_coordinator.go30
3 files changed, 199 insertions, 83 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 9ad7a6b..dd49f5d 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -7,7 +7,6 @@ import (
"path/filepath"
"strings"
"sync"
- "sync/atomic"
"time"
"github.com/mimecast/dtail/internal/io/dlog"
@@ -20,7 +19,7 @@ import (
)
type readCommand struct {
- server *ServerHandler
+ server readCommandServer
mode omode.Mode
shutdownCoordinator *shutdownCoordinator
}
@@ -33,7 +32,7 @@ type turboReadProcessor interface {
Close() error
}
-func newReadCommand(server *ServerHandler, mode omode.Mode) *readCommand {
+func newReadCommand(server readCommandServer, mode omode.Mode) *readCommand {
return &readCommand{
server: server,
mode: mode,
@@ -48,14 +47,14 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
if argc >= 4 {
deserializedRegex, err := regex.Deserialize(strings.Join(args[2:], " "))
if err != nil {
- r.server.sendln(r.server.serverMessages, dlog.Server.Error(r.server.user,
+ r.server.SendServerMessage(dlog.Server.Error(r.server.LogContext(),
"Unable to parse command", err))
return
}
re = deserializedRegex
}
if argc < 3 {
- r.server.sendln(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Unable to parse command", args, argc))
return
}
@@ -85,14 +84,14 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
for retryCount := 0; retryCount < retries; retryCount++ {
paths, err := filepath.Glob(glob)
if err != nil {
- dlog.Server.Warn(r.server.user, glob, err)
+ dlog.Server.Warn(r.server.LogContext(), glob, err)
time.Sleep(retryInterval)
continue
}
if numPaths := len(paths); numPaths == 0 {
- dlog.Server.Error(r.server.user, "No such file(s) to read", glob)
- r.server.sendln(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ dlog.Server.Error(r.server.LogContext(), "No such file(s) to read", glob)
+ r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Unable to read file(s), check server logs"))
select {
case <-ctx.Done():
@@ -107,7 +106,7 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
return
}
- r.server.sendln(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Giving up to read file(s)"))
return
}
@@ -115,11 +114,11 @@ func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
paths []string, glob string, re regex.Regex, retryInterval time.Duration) {
- dlog.Server.Info(r.server.user, "Processing files", "count", len(paths), "glob", glob)
+ dlog.Server.Info(r.server.LogContext(), "Processing files", "count", len(paths), "glob", glob)
// Track pending files for this batch
- atomic.AddInt32(&r.server.pendingFiles, int32(len(paths)))
- dlog.Server.Info(r.server.user, "Added pending files", "count", len(paths), "totalPending", atomic.LoadInt32(&r.server.pendingFiles))
+ totalPending := r.server.AddPendingFiles(int32(len(paths)))
+ dlog.Server.Info(r.server.LogContext(), "Added pending files", "count", len(paths), "totalPending", totalPending)
var wg sync.WaitGroup
wg.Add(len(paths))
@@ -128,17 +127,17 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
}
wg.Wait()
- dlog.Server.Info(r.server.user, "All files processed", "count", len(paths))
+ dlog.Server.Info(r.server.LogContext(), "All files processed", "count", len(paths))
// In turbo mode, signal EOF after all files are processed
// This is crucial for proper shutdown in server mode
- if !r.server.serverCfg.TurboBoostDisable && r.server.aggregate == nil &&
+ if !r.server.TurboBoostDisabled() && !r.server.HasRegularAggregate() &&
(r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) {
if r.server.IsTurboMode() && r.server.HasTurboEOF() {
- dlog.Server.Debug(r.server.user, "Turbo mode: flushing data before EOF signal")
+ dlog.Server.Debug(r.server.LogContext(), "Turbo mode: flushing data before EOF signal")
// Ensure all turbo data is flushed before signaling EOF
- r.server.flushTurboData()
+ r.server.FlushTurboData()
// Signal EOF by closing the channel, but only once.
r.server.SignalTurboEOF()
@@ -146,7 +145,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
// Wait to ensure all data is transmitted
// This is especially important when files are queued due to concurrency limits
// In serverless mode, data is written directly to stdout, so no wait is needed
- if !r.server.serverless {
+ if !r.server.Serverless() {
waitTime := 500 * time.Millisecond
if len(paths) > 10 {
// For many files, wait proportionally longer
@@ -155,7 +154,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
waitTime = 2 * time.Second
}
}
- dlog.Server.Debug(r.server.user, "Waiting for data transmission", "duration", waitTime)
+ dlog.Server.Debug(r.server.LogContext(), "Waiting for data transmission", "duration", waitTime)
time.Sleep(waitTime)
}
}
@@ -175,9 +174,9 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
}()
globID := r.makeGlobID(path, glob)
- if !r.server.user.HasFilePermission(path, "readfiles") {
- dlog.Server.Error(r.server.user, "No permission to read file", path, globID)
- r.server.sendln(r.server.serverMessages, dlog.Server.Warn(r.server.user,
+ if !r.server.CanReadFile(path) {
+ dlog.Server.Error(r.server.LogContext(), "No permission to read file", path, globID)
+ r.server.SendServerMessage(dlog.Server.Warn(r.server.LogContext(),
"Unable to read file(s), check server logs"))
return
}
@@ -187,7 +186,7 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC
func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex) {
- dlog.Server.Info(r.server.user, "Start reading", path, globID)
+ dlog.Server.Info(r.server.LogContext(), "Start reading", path, globID)
r.logRegexMode(re)
var reader fs.FileReader
@@ -195,15 +194,15 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
switch r.mode {
case omode.GrepClient, omode.CatClient:
- catFile := fs.NewCatFile(path, globID, r.server.serverMessages)
+ catFile := fs.NewCatFile(path, globID, r.server.ServerMessagesChannel())
reader = &catFile
- limiter = r.server.catLimiter
+ limiter = r.server.CatLimiter()
case omode.TailClient:
fallthrough
default:
- tailFile := fs.NewTailFile(path, globID, r.server.serverMessages)
+ tailFile := fs.NewTailFile(path, globID, r.server.ServerMessagesChannel())
reader = &tailFile
- limiter = r.server.tailLimiter
+ limiter = r.server.TailLimiter()
}
defer func() {
@@ -215,17 +214,17 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
select {
case limiter <- struct{}{}:
- dlog.Server.Debug(r.server.user, "Got limiter slot immediately", "path", path)
+ dlog.Server.Debug(r.server.LogContext(), "Got limiter slot immediately", "path", path)
case <-ctx.Done():
- dlog.Server.Debug(r.server.user, "Context cancelled while waiting for limiter", "path", path)
+ dlog.Server.Debug(r.server.LogContext(), "Context cancelled while waiting for limiter", "path", path)
return
default:
- dlog.Server.Info(r.server.user, "Server limit hit, queueing file", "limiterLen", len(limiter), "path", path, "maxConcurrent", cap(limiter))
+ dlog.Server.Info(r.server.LogContext(), "Server limit hit, queueing file", "limiterLen", len(limiter), "path", path, "maxConcurrent", cap(limiter))
select {
case limiter <- struct{}{}:
- dlog.Server.Info(r.server.user, "Server limit OK now, processing file", "limiterLen", len(limiter), "path", path)
+ dlog.Server.Info(r.server.LogContext(), "Server limit OK now, processing file", "limiterLen", len(limiter), "path", path)
case <-ctx.Done():
- dlog.Server.Debug(r.server.user, "Context cancelled while queued for limiter", "path", path)
+ dlog.Server.Debug(r.server.LogContext(), "Context cancelled while queued for limiter", "path", path)
return
}
}
@@ -233,14 +232,14 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
// Check if we should use the turbo boost optimizations
// Enable turbo boost for cat/grep/tail modes, and now also for MapReduce operations
// MapReduce now has a turbo mode implementation that bypasses channels
- dlog.Server.Debug(r.server.user, "Checking turbo mode", "turboBoostDisable", r.server.serverCfg.TurboBoostDisable,
- "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil, "hasAggregate", r.server.aggregate != nil)
+ dlog.Server.Debug(r.server.LogContext(), "Checking turbo mode", "turboBoostDisable", r.server.TurboBoostDisabled(),
+ "mode", r.mode, "hasTurboAggregate", r.server.TurboAggregate() != nil, "hasAggregate", r.server.HasRegularAggregate())
// Only use turbo mode if:
// 1. Turbo boost is NOT disabled (it's enabled by default) AND
// 2. We have a turbo aggregate OR (we're in cat/grep/tail mode AND we don't have a regular aggregate)
- if !r.server.serverCfg.TurboBoostDisable &&
- (r.server.turboAggregate != nil || ((r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && r.server.aggregate == nil)) {
- dlog.Server.Info(r.server.user, "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.turboAggregate != nil)
+ if !r.server.TurboBoostDisabled() &&
+ (r.server.TurboAggregate() != nil || ((r.mode == omode.CatClient || r.mode == omode.GrepClient || r.mode == omode.TailClient) && !r.server.HasRegularAggregate())) {
+ dlog.Server.Info(r.server.LogContext(), "Using turbo mode for reading", path, "mode", r.mode, "hasTurboAggregate", r.server.TurboAggregate() != nil)
r.readWithTurboProcessor(ctx, ltx, path, globID, re, reader)
return
}
@@ -251,7 +250,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.LContext,
path, globID string, re regex.Regex, reader fs.FileReader) {
- dlog.Server.Info(r.server.user, "Using turbo channel-less implementation", path, globID)
+ dlog.Server.Info(r.server.LogContext(), "Using turbo channel-less implementation", path, globID)
r.logRegexMode(re)
r.ensureTurboModeEnabled()
@@ -265,7 +264,7 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext
for {
if err := strategy(ctx, ltx, reader, re); err != nil {
- dlog.Server.Error(r.server.user, path, globID, err)
+ dlog.Server.Error(r.server.LogContext(), path, globID, err)
}
select {
@@ -284,20 +283,21 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext
func (r *readCommand) readViaChannels() readStrategy {
return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error {
- aggregate := r.server.aggregate
var linesCh chan *line.Line
+ closeLines := false
- if aggregate != nil {
+ if r.server.HasRegularAggregate() {
// For MapReduce operations, create a new channel that goes only to the aggregate.
linesCh = make(chan *line.Line, 10000)
- aggregate.NextLinesCh <- linesCh
+ r.server.RegisterAggregateLines(linesCh)
+ closeLines = true
} else {
// For non-MapReduce operations, use the server's shared lines channel.
- linesCh = r.server.lines
+ linesCh = r.server.SharedLinesChannel()
}
err := reader.Start(ctx, ltx, linesCh, re)
- if aggregate != nil {
+ if closeLines {
// Closing the aggregate line channel triggers flush.
close(linesCh)
}
@@ -308,30 +308,30 @@ func (r *readCommand) readViaChannels() readStrategy {
func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWriter) readStrategy {
return func(ctx context.Context, ltx lcontext.LContext, reader fs.FileReader, re regex.Regex) error {
- dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> starting read loop iteration")
+ dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> starting read loop iteration")
processor := r.makeTurboProcessor(path, globID, writer)
- dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start")
+ dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> about to start")
startErr := reader.StartWithProcessorOptimized(ctx, ltx, processor, re)
- dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> completed")
+ dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> reader.StartWithPocessorOptimized -> completed")
// Ensure we flush and close the processor before retry checks.
- dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> flushing processor")
+ dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> flushing processor")
if flushErr := processor.Flush(); flushErr != nil {
- dlog.Server.Error(r.server.user, path, globID, "flush error", flushErr)
+ dlog.Server.Error(r.server.LogContext(), path, globID, "flush error", flushErr)
}
- dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> closing processor")
+ dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> closing processor")
if closeErr := processor.Close(); closeErr != nil {
- dlog.Server.Error(r.server.user, path, globID, "close error", closeErr)
+ dlog.Server.Error(r.server.LogContext(), path, globID, "close error", closeErr)
}
- dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> processor closed")
+ dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> processor closed")
// Give time for data to be transmitted.
// This is crucial for integration tests to ensure all data is sent
// Skip this delay in serverless mode since data is written directly to stdout
- if !r.server.serverless {
- dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> waiting for data transmission")
+ if !r.server.Serverless() {
+ dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> waiting for data transmission")
time.Sleep(50 * time.Millisecond)
}
@@ -347,23 +347,23 @@ func (r *readCommand) ensureTurboModeEnabled() {
func (r *readCommand) makeTurboWriter() TurboWriter {
// Create a writer instance per file to keep concurrent processing isolated.
- if r.server.serverless {
- return NewDirectTurboWriter(os.Stdout, r.server.hostname, r.server.plain, r.server.serverless)
+ if r.server.Serverless() {
+ return NewDirectTurboWriter(os.Stdout, r.server.Hostname(), r.server.PlainOutput(), r.server.Serverless())
}
return &TurboNetworkWriter{
turboLines: r.server.GetTurboChannel(),
- serverMessages: r.server.serverMessages,
- hostname: r.server.hostname,
- plain: r.server.plain,
- serverless: r.server.serverless,
+ serverMessages: r.server.ServerMessagesChannel(),
+ hostname: r.server.Hostname(),
+ plain: r.server.PlainOutput(),
+ serverless: r.server.Serverless(),
}
}
func (r *readCommand) makeTurboProcessor(path, globID string, writer TurboWriter) turboReadProcessor {
- if r.server.turboAggregate != nil {
- dlog.Server.Info(r.server.user, "Using turbo aggregate processor for MapReduce", path, globID)
- return server.NewTurboAggregateProcessor(r.server.turboAggregate, globID)
+ if aggregate := r.server.TurboAggregate(); aggregate != nil {
+ dlog.Server.Info(r.server.LogContext(), "Using turbo aggregate processor for MapReduce", path, globID)
+ return server.NewTurboAggregateProcessor(aggregate, globID)
}
return NewDirectLineProcessor(writer, globID)
@@ -374,9 +374,9 @@ func (r *readCommand) logRegexMode(re regex.Regex) {
return
}
if re.IsLiteral() {
- dlog.Server.Info(r.server.user, "Using optimized literal string matching for pattern:", re.Pattern())
+ dlog.Server.Info(r.server.LogContext(), "Using optimized literal string matching for pattern:", re.Pattern())
} else {
- dlog.Server.Info(r.server.user, "Using regex matching for pattern:", re.Pattern())
+ dlog.Server.Info(r.server.LogContext(), "Using regex matching for pattern:", re.Pattern())
}
}
@@ -397,13 +397,12 @@ func (r *readCommand) makeGlobID(path, glob string) string {
return pathParts[len(pathParts)-1]
}
- r.server.sendln(r.server.serverMessages,
- dlog.Server.Warn("Empty file path given?", path, glob))
+ r.server.SendServerMessage(dlog.Server.Warn("Empty file path given?", path, glob))
return ""
}
func (r *readCommand) isInputFromPipe() bool {
- if !r.server.serverless {
+ if !r.server.Serverless() {
// Can read from pipe only in serverless mode.
return false
}
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go
new file mode 100644
index 0000000..5160c5c
--- /dev/null
+++ b/internal/server/handlers/readcommand_server.go
@@ -0,0 +1,119 @@
+package handlers
+
+import (
+ "sync/atomic"
+
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/mapr/server"
+)
+
+type readCommandServer interface {
+ LogContext() interface{}
+ SendServerMessage(message string)
+ CanReadFile(path string) bool
+ ServerMessagesChannel() chan string
+ CatLimiter() chan struct{}
+ TailLimiter() chan struct{}
+ Hostname() string
+ PlainOutput() bool
+ Serverless() bool
+ TurboBoostDisabled() bool
+ HasRegularAggregate() bool
+ RegisterAggregateLines(lines chan *line.Line)
+ SharedLinesChannel() chan *line.Line
+ TurboAggregate() *server.TurboAggregate
+ AddPendingFiles(delta int32) int32
+ CompletePendingFile() (remaining int32, activeCommands int32)
+ PendingAndActive() (pending int32, activeCommands int32)
+ TriggerShutdown()
+ IsTurboMode() bool
+ EnableTurboMode()
+ HasTurboEOF() bool
+ FlushTurboData()
+ SignalTurboEOF()
+ GetTurboChannel() chan []byte
+}
+
+var _ readCommandServer = (*ServerHandler)(nil)
+
+func (h *ServerHandler) LogContext() interface{} {
+ return h.user
+}
+
+func (h *ServerHandler) SendServerMessage(message string) {
+ h.sendln(h.serverMessages, message)
+}
+
+func (h *ServerHandler) CanReadFile(path string) bool {
+ return h.user.HasFilePermission(path, "readfiles")
+}
+
+func (h *ServerHandler) ServerMessagesChannel() chan string {
+ return h.serverMessages
+}
+
+func (h *ServerHandler) CatLimiter() chan struct{} {
+ return h.catLimiter
+}
+
+func (h *ServerHandler) TailLimiter() chan struct{} {
+ return h.tailLimiter
+}
+
+func (h *ServerHandler) Hostname() string {
+ return h.hostname
+}
+
+func (h *ServerHandler) PlainOutput() bool {
+ return h.plain
+}
+
+func (h *ServerHandler) Serverless() bool {
+ return h.serverless
+}
+
+func (h *ServerHandler) TurboBoostDisabled() bool {
+ return h.serverCfg.TurboBoostDisable
+}
+
+func (h *ServerHandler) HasRegularAggregate() bool {
+ return h.aggregate != nil
+}
+
+func (h *ServerHandler) RegisterAggregateLines(lines chan *line.Line) {
+ if h.aggregate != nil {
+ h.aggregate.NextLinesCh <- lines
+ }
+}
+
+func (h *ServerHandler) SharedLinesChannel() chan *line.Line {
+ return h.lines
+}
+
+func (h *ServerHandler) TurboAggregate() *server.TurboAggregate {
+ return h.turboAggregate
+}
+
+func (h *ServerHandler) AddPendingFiles(delta int32) int32 {
+ return atomic.AddInt32(&h.pendingFiles, delta)
+}
+
+func (h *ServerHandler) CompletePendingFile() (remaining int32, activeCommands int32) {
+ remaining = atomic.AddInt32(&h.pendingFiles, -1)
+ activeCommands = atomic.LoadInt32(&h.activeCommands)
+ return remaining, activeCommands
+}
+
+func (h *ServerHandler) PendingAndActive() (pending int32, activeCommands int32) {
+ pending = atomic.LoadInt32(&h.pendingFiles)
+ activeCommands = atomic.LoadInt32(&h.activeCommands)
+ return pending, activeCommands
+}
+
+func (h *ServerHandler) TriggerShutdown() {
+ h.shutdown()
+}
+
+func (h *ServerHandler) FlushTurboData() {
+ h.flushTurboData()
+}
diff --git a/internal/server/handlers/shutdown_coordinator.go b/internal/server/handlers/shutdown_coordinator.go
index 54df09a..6d3eda7 100644
--- a/internal/server/handlers/shutdown_coordinator.go
+++ b/internal/server/handlers/shutdown_coordinator.go
@@ -2,25 +2,24 @@ package handlers
import (
"context"
- "sync/atomic"
"time"
"github.com/mimecast/dtail/internal/io/dlog"
)
type shutdownCoordinator struct {
- server *ServerHandler
+ server readCommandServer
}
-func newShutdownCoordinator(server *ServerHandler) *shutdownCoordinator {
+func newShutdownCoordinator(server readCommandServer) *shutdownCoordinator {
return &shutdownCoordinator{server: server}
}
func (c *shutdownCoordinator) onFileProcessed(path string) {
- remaining := atomic.AddInt32(&c.server.pendingFiles, -1)
- dlog.Server.Debug(c.server.user, "File processing complete", "path", path, "remainingPending", remaining)
+ remaining, activeCommands := c.server.CompletePendingFile()
+ dlog.Server.Debug(c.server.LogContext(), "File processing complete", "path", path, "remainingPending", remaining)
- if remaining != 0 || atomic.LoadInt32(&c.server.activeCommands) != 0 {
+ if remaining != 0 || activeCommands != 0 {
return
}
@@ -29,26 +28,25 @@ func (c *shutdownCoordinator) onFileProcessed(path string) {
func (c *shutdownCoordinator) finalizeWhenIdle() {
// If we have a turbo aggregate, trigger final serialization.
- if c.server.turboAggregate != nil {
- dlog.Server.Info(c.server.user, "Triggering final turbo aggregate serialization")
- c.server.turboAggregate.Serialize(context.Background())
+ if turboAggregate := c.server.TurboAggregate(); turboAggregate != nil {
+ dlog.Server.Info(c.server.LogContext(), "Triggering final turbo aggregate serialization")
+ turboAggregate.Serialize(context.Background())
// In serverless mode, serialization is synchronous, so no wait needed.
- if !c.server.serverless {
+ if !c.server.Serverless() {
time.Sleep(500 * time.Millisecond)
}
}
// Double-check that we really have no pending work before shutdown.
- if !c.server.serverless {
+ if !c.server.Serverless() {
time.Sleep(10 * time.Millisecond)
}
- finalPending := atomic.LoadInt32(&c.server.pendingFiles)
- finalActive := atomic.LoadInt32(&c.server.activeCommands)
+ finalPending, finalActive := c.server.PendingAndActive()
if finalPending == 0 && finalActive == 0 {
- dlog.Server.Debug(c.server.user, "No active commands and no pending files after double-check, triggering shutdown")
- c.server.shutdown()
+ dlog.Server.Debug(c.server.LogContext(), "No active commands and no pending files after double-check, triggering shutdown")
+ c.server.TriggerShutdown()
return
}
- dlog.Server.Debug(c.server.user, "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive)
+ dlog.Server.Debug(c.server.LogContext(), "Shutdown check cancelled", "finalPending", finalPending, "finalActive", finalActive)
}