summaryrefslogtreecommitdiff
path: root/internal/mapr/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-08 09:34:32 +0200
committerPaul Buetow <paul@buetow.org>2026-03-08 09:34:32 +0200
commitf7f98ccaffc1be88db6f9814fb3c88b5f0a6ea34 (patch)
treeb16c9be77e4d4f9e2fedba83b356e6abf21b6d0e /internal/mapr/server
parent7179dba1f70f7fbdc8b89bf709bc2d5b643fe692 (diff)
task: replace looped time.After with tickers (task 378)
Diffstat (limited to 'internal/mapr/server')
-rw-r--r--internal/mapr/server/aggregate.go19
1 files changed, 12 insertions, 7 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 353cda5..98fe817 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -106,9 +106,16 @@ func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) {
}
func (a *Aggregate) aggregateTimer(ctx context.Context) {
+ interval := a.query.Interval
+ if interval <= 0 {
+ interval = time.Second
+ }
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+
for {
select {
- case <-time.After(a.query.Interval):
+ case <-ticker.C:
a.Serialize(ctx)
case <-ctx.Done():
return
@@ -139,12 +146,12 @@ func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) {
case newLinesCh := <-a.NextLinesCh:
oldLinesCh := a.linesCh
a.linesCh = newLinesCh
-
+
// Ensure the old channel is fully drained before recycling to prevent data mixing
go func(oldCh chan *line.Line) {
// First, drain any remaining lines from the old channel
drained := 0
- drainLoop:
+ drainLoop:
for {
select {
case l, ok := <-oldCh:
@@ -161,11 +168,11 @@ func (a *Aggregate) nextLine() (l *line.Line, ok bool, noMoreChannels bool) {
break drainLoop
}
}
-
+
if drained > 0 {
dlog.Server.Debug("Drained", drained, "lines from recycled channel")
}
-
+
// Now safely recycle the drained channel
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
@@ -342,5 +349,3 @@ func (a *Aggregate) Serialize(ctx context.Context) {
case <-ctx.Done():
}
}
-
-