diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-08 09:34:32 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-08 09:34:32 +0200 |
| commit | f7f98ccaffc1be88db6f9814fb3c88b5f0a6ea34 (patch) | |
| tree | b16c9be77e4d4f9e2fedba83b356e6abf21b6d0e /internal/mapr | |
| parent | 7179dba1f70f7fbdc8b89bf709bc2d5b643fe692 (diff) | |
task: replace looped time.After with tickers (task 378)
Diffstat (limited to 'internal/mapr')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 353cda5..98fe817 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -106,9 +106,16 @@ func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) { } func (a *Aggregate) aggregateTimer(ctx context.Context) { + interval := a.query.Interval + if interval <= 0 { + interval = time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { select { - case <-time.After(a.query.Interval): + case <-ticker.C: a.Serialize(ctx) case <-ctx.Done(): return @@ -139,12 +146,12 @@ func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) { case newLinesCh := <-a.NextLinesCh: oldLinesCh := a.linesCh a.linesCh = newLinesCh - + // Ensure the old channel is fully drained before recycling to prevent data mixing go func(oldCh chan *line.Line) { // First, drain any remaining lines from the old channel drained := 0 - drainLoop: + drainLoop: for { select { case l, ok := <-oldCh: @@ -161,11 +168,11 @@ func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) { break drainLoop } } - + if drained > 0 { dlog.Server.Debug("Drained", drained, "lines from recycled channel") } - + // Now safely recycle the drained channel timer := time.NewTimer(5 * time.Second) defer timer.Stop() @@ -342,5 +349,3 @@ func (a *Aggregate) Serialize(ctx context.Context) { case <-ctx.Done(): } } - - |
