diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 10:29:24 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 10:29:24 +0200 |
| commit | 29e50d7b6ebb9e6c59d079ef5b7551b1acd950fb (patch) | |
| tree | 147ae88ee00c6b170d1f28a55c89fb4c92fc440f /internal/server/handlers/readcommand_server.go | |
| parent | 8c08e4e60219782e50c3a5f20a051e706196f48c (diff) | |
config: make server timing and buffer knobs configurable
Diffstat (limited to 'internal/server/handlers/readcommand_server.go')
| -rw-r--r-- | internal/server/handlers/readcommand_server.go | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go index 5160c5c..46f61a4 100644 --- a/internal/server/handlers/readcommand_server.go +++ b/internal/server/handlers/readcommand_server.go @@ -2,6 +2,7 @@ package handlers import ( "sync/atomic" + "time" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/mapr/server" @@ -32,6 +33,13 @@ type readCommandServer interface { FlushTurboData() SignalTurboEOF() GetTurboChannel() chan []byte + ReadGlobRetryInterval() time.Duration + ReadRetryInterval() time.Duration + AggregateLinesChannelBufferSize() int + TurboDataTransmissionDelay() time.Duration + TurboEOFWaitDuration(fileCount int) time.Duration + ShutdownTurboSerializeWait() time.Duration + ShutdownIdleRecheckWait() time.Duration } var _ readCommandServer = (*ServerHandler)(nil) @@ -117,3 +125,65 @@ func (h *ServerHandler) TriggerShutdown() { func (h *ServerHandler) FlushTurboData() { h.flushTurboData() } + +func durationFromMilliseconds(value int, fallback time.Duration) time.Duration { + if value <= 0 { + return fallback + } + return time.Duration(value) * time.Millisecond +} + +func positiveIntOrDefault(value int, fallback int) int { + if value <= 0 { + return fallback + } + return value +} + +func (h *ServerHandler) ReadGlobRetryInterval() time.Duration { + return durationFromMilliseconds(h.serverCfg.ReadGlobRetryIntervalMs, 5*time.Second) +} + +func (h *ServerHandler) ReadRetryInterval() time.Duration { + return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second) +} + +func (h *ServerHandler) AggregateLinesChannelBufferSize() int { + return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000) +} + +func (h *ServerHandler) TurboDataTransmissionDelay() time.Duration { + return durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, 50*time.Millisecond) +} + +func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration { + baseWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitBaseMs, 500*time.Millisecond) + if fileCount <= 10 { + return baseWait + } + + perFileWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitPerFileMs, 10*time.Millisecond) + maxWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitMaxMs, 2*time.Second) + wait := time.Duration(fileCount) * perFileWait + if wait > maxWait { + return maxWait + } + return wait +} + +func (h *ServerHandler) ShutdownTurboSerializeWait() time.Duration { + return durationFromMilliseconds(h.serverCfg.ShutdownTurboSerializeWaitMs, 500*time.Millisecond) +} + +func (h *ServerHandler) ShutdownIdleRecheckWait() time.Duration { + return durationFromMilliseconds(h.serverCfg.ShutdownIdleRecheckWaitMs, 10*time.Millisecond) +} + +func (h *ServerHandler) turboManagerConfig() turboManagerConfig { + return turboManagerConfig{ + channelBufferSize: positiveIntOrDefault(h.serverCfg.TurboChannelBufferSize, defaultTurboChannelBufferSize), + flushTimeout: durationFromMilliseconds(h.serverCfg.TurboFlushTimeoutMs, defaultTurboFlushTimeout), + flushPollInterval: durationFromMilliseconds(h.serverCfg.TurboFlushPollIntervalMs, defaultTurboFlushPollInterval), + readRetryInterval: durationFromMilliseconds(h.serverCfg.TurboReadRetryIntervalMs, defaultTurboReadRetryInterval), + } +} |
