diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-02 10:29:24 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-02 10:29:24 +0200 |
| commit | 29e50d7b6ebb9e6c59d079ef5b7551b1acd950fb (patch) | |
| tree | 147ae88ee00c6b170d1f28a55c89fb4c92fc440f /internal/server/handlers/readcommand.go | |
| parent | 8c08e4e60219782e50c3a5f20a051e706196f48c (diff) | |
config: make server timing and buffer knobs configurable
Diffstat (limited to 'internal/server/handlers/readcommand.go')
| -rw-r--r-- | internal/server/handlers/readcommand.go | 17 |
1 files changed, 5 insertions, 12 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index dd49f5d..ad2b87f 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -78,7 +78,7 @@ func (r *readCommand) Start(ctx context.Context, ltx lcontext.LContext, func (r *readCommand) readGlob(ctx context.Context, ltx lcontext.LContext, glob string, re regex.Regex, retries int) { - retryInterval := time.Second * 5 + retryInterval := r.server.ReadGlobRetryInterval() glob = filepath.Clean(glob) for retryCount := 0; retryCount < retries; retryCount++ { @@ -146,14 +146,7 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, // This is especially important when files are queued due to concurrency limits // In serverless mode, data is written directly to stdout, so no wait is needed if !r.server.Serverless() { - waitTime := 500 * time.Millisecond - if len(paths) > 10 { - // For many files, wait proportionally longer - waitTime = time.Duration(len(paths)*10) * time.Millisecond - if waitTime > 2*time.Second { - waitTime = 2 * time.Second - } - } + waitTime := r.server.TurboEOFWaitDuration(len(paths)) dlog.Server.Debug(r.server.LogContext(), "Waiting for data transmission", "duration", waitTime) time.Sleep(waitTime) } @@ -276,7 +269,7 @@ func (r *readCommand) executeReadLoop(ctx context.Context, ltx lcontext.LContext } } - time.Sleep(time.Second * 2) + time.Sleep(r.server.ReadRetryInterval()) dlog.Server.Info(path, globID, "Reading file again") } } @@ -288,7 +281,7 @@ func (r *readCommand) readViaChannels() readStrategy { if r.server.HasRegularAggregate() { // For MapReduce operations, create a new channel that goes only to the aggregate. - linesCh = make(chan *line.Line, 10000) + linesCh = make(chan *line.Line, r.server.AggregateLinesChannelBufferSize()) r.server.RegisterAggregateLines(linesCh) closeLines = true } else { @@ -332,7 +325,7 @@ func (r *readCommand) readViaTurboProcessor(path, globID string, writer TurboWri // Skip this delay in serverless mode since data is written directly to stdout if !r.server.Serverless() { dlog.Server.Trace(r.server.LogContext(), path, globID, "readWithTurboProcessor -> waiting for data transmission") - time.Sleep(50 * time.Millisecond) + time.Sleep(r.server.TurboDataTransmissionDelay()) } return startErr |
