diff options
Diffstat (limited to 'internal/mapr/server')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 353cda5..98fe817 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -106,9 +106,16 @@ func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) { } func (a *Aggregate) aggregateTimer(ctx context.Context) { + interval := a.query.Interval + if interval <= 0 { + interval = time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { select { - case <-time.After(a.query.Interval): + case <-ticker.C: a.Serialize(ctx) case <-ctx.Done(): return @@ -139,12 +146,12 @@ func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) { case newLinesCh := <-a.NextLinesCh: oldLinesCh := a.linesCh a.linesCh = newLinesCh - + // Ensure the old channel is fully drained before recycling to prevent data mixing go func(oldCh chan *line.Line) { // First, drain any remaining lines from the old channel drained := 0 - drainLoop: + drainLoop: for { select { case l, ok := <-oldCh: @@ -161,11 +168,11 @@ func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) { break drainLoop } } - + if drained > 0 { dlog.Server.Debug("Drained", drained, "lines from recycled channel") } - + // Now safely recycle the drained channel timer := time.NewTimer(5 * time.Second) defer timer.Stop() @@ -342,5 +349,3 @@ func (a *Aggregate) Serialize(ctx context.Context) { case <-ctx.Done(): } } - - |
