summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand_server.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 10:29:24 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 10:29:24 +0200
commit29e50d7b6ebb9e6c59d079ef5b7551b1acd950fb (patch)
tree147ae88ee00c6b170d1f28a55c89fb4c92fc440f /internal/server/handlers/readcommand_server.go
parent8c08e4e60219782e50c3a5f20a051e706196f48c (diff)
config: make server timing and buffer knobs configurable
Diffstat (limited to 'internal/server/handlers/readcommand_server.go')
-rw-r--r--internal/server/handlers/readcommand_server.go70
1 files changed, 70 insertions, 0 deletions
diff --git a/internal/server/handlers/readcommand_server.go b/internal/server/handlers/readcommand_server.go
index 5160c5c..46f61a4 100644
--- a/internal/server/handlers/readcommand_server.go
+++ b/internal/server/handlers/readcommand_server.go
@@ -2,6 +2,7 @@ package handlers
import (
"sync/atomic"
+ "time"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/mapr/server"
@@ -32,6 +33,13 @@ type readCommandServer interface {
FlushTurboData()
SignalTurboEOF()
GetTurboChannel() chan []byte
+ ReadGlobRetryInterval() time.Duration
+ ReadRetryInterval() time.Duration
+ AggregateLinesChannelBufferSize() int
+ TurboDataTransmissionDelay() time.Duration
+ TurboEOFWaitDuration(fileCount int) time.Duration
+ ShutdownTurboSerializeWait() time.Duration
+ ShutdownIdleRecheckWait() time.Duration
}
var _ readCommandServer = (*ServerHandler)(nil)
@@ -117,3 +125,65 @@ func (h *ServerHandler) TriggerShutdown() {
func (h *ServerHandler) FlushTurboData() {
h.flushTurboData()
}
+
+func durationFromMilliseconds(value int, fallback time.Duration) time.Duration {
+ if value <= 0 {
+ return fallback
+ }
+ return time.Duration(value) * time.Millisecond
+}
+
+func positiveIntOrDefault(value int, fallback int) int {
+ if value <= 0 {
+ return fallback
+ }
+ return value
+}
+
+func (h *ServerHandler) ReadGlobRetryInterval() time.Duration {
+ return durationFromMilliseconds(h.serverCfg.ReadGlobRetryIntervalMs, 5*time.Second)
+}
+
+func (h *ServerHandler) ReadRetryInterval() time.Duration {
+ return durationFromMilliseconds(h.serverCfg.ReadRetryIntervalMs, 2*time.Second)
+}
+
+func (h *ServerHandler) AggregateLinesChannelBufferSize() int {
+ return positiveIntOrDefault(h.serverCfg.ReadAggregateLineBufferSize, 10000)
+}
+
+func (h *ServerHandler) TurboDataTransmissionDelay() time.Duration {
+ return durationFromMilliseconds(h.serverCfg.TurboTransmissionDelayMs, 50*time.Millisecond)
+}
+
+func (h *ServerHandler) TurboEOFWaitDuration(fileCount int) time.Duration {
+ baseWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitBaseMs, 500*time.Millisecond)
+ if fileCount <= 10 {
+ return baseWait
+ }
+
+ perFileWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitPerFileMs, 10*time.Millisecond)
+ maxWait := durationFromMilliseconds(h.serverCfg.TurboEOFWaitMaxMs, 2*time.Second)
+ wait := time.Duration(fileCount) * perFileWait
+ if wait > maxWait {
+ return maxWait
+ }
+ return wait
+}
+
+func (h *ServerHandler) ShutdownTurboSerializeWait() time.Duration {
+ return durationFromMilliseconds(h.serverCfg.ShutdownTurboSerializeWaitMs, 500*time.Millisecond)
+}
+
+func (h *ServerHandler) ShutdownIdleRecheckWait() time.Duration {
+ return durationFromMilliseconds(h.serverCfg.ShutdownIdleRecheckWaitMs, 10*time.Millisecond)
+}
+
+func (h *ServerHandler) turboManagerConfig() turboManagerConfig {
+ return turboManagerConfig{
+ channelBufferSize: positiveIntOrDefault(h.serverCfg.TurboChannelBufferSize, defaultTurboChannelBufferSize),
+ flushTimeout: durationFromMilliseconds(h.serverCfg.TurboFlushTimeoutMs, defaultTurboFlushTimeout),
+ flushPollInterval: durationFromMilliseconds(h.serverCfg.TurboFlushPollIntervalMs, defaultTurboFlushPollInterval),
+ readRetryInterval: durationFromMilliseconds(h.serverCfg.TurboReadRetryIntervalMs, defaultTurboReadRetryInterval),
+ }
+}