summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 10:29:24 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 10:29:24 +0200
commit29e50d7b6ebb9e6c59d079ef5b7551b1acd950fb (patch)
tree147ae88ee00c6b170d1f28a55c89fb4c92fc440f /internal/server/handlers/readcommand.go
parent8c08e4e60219782e50c3a5f20a051e706196f48c (diff)
config: make server timing and buffer knobs configurable
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go17
1 files changed, 5 insertions, 12 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index dd49f5d..ad2b87f 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -78,7 +78,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext,
func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext,
glob string, re regex.Regex, retries int) {
- retryInterval := time.Second * 5
+ retryInterval := r.server.ReadGlobRetryInterval()
glob = filepath.Clean(glob)
for retryCount := 0; retryCount < retries; retryCount++ {
@@ -146,14 +146,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext,
// This is especially important when files are queued due to concurrency limits
// In serverless mode, data is written directly to stdout, so no wait is needed
if !r.server.Serverless() {
- waitTime := 500 * time.Millisecond
- if len(paths) > 10 {
- // For many files, wait proportionally longer
- waitTime = time.Duration(len(paths)*10) * time.Millisecond
- if waitTime > 2*time.Second {
- waitTime = 2 * time.Second
- }
- }
+ waitTime := r.server.TurboEOFWaitDuration(len(paths))
dlog.Server.Debug(r.server.LogContext(), "Waiting for data transmission", "duration", waitTime)
time.Sleep(waitTime)
}
@@ -276,7 +269,7 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext
}
}
- time.Sleep(time.Second * 2)
+ time.Sleep(r.server.ReadRetryInterval())
dlog.Server.Info(path, globID, "Reading file again")
}
}
@@ -288,7 +281,7 @@ func (r *readCommand) readViaChannels() readStrategy {
if r.server.HasRegularAggregate() {
// For MapReduce operations, create a new channel that goes only to the aggregate.
- linesCh = make(chan *line.Line, 10000)
+ linesCh = make(chan *line.Line, r.server.AggregateLinesChannelBufferSize())
r.server.RegisterAggregateLines(linesCh)
closeLines = true
} else {
@@ -332,7 +325,7 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri
// Skip this delay in serverless mode since data is written directly to stdout
if !r.server.Serverless() {
dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> waiting for data transmission")
- time.Sleep(50 * time.Millisecond)
+ time.Sleep(r.server.TurboDataTransmissionDelay())
}
return startErr