From 9310b54d439d4a1a8d4d337987aa63884df0af76 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 20 May 2026 11:38:19 +0300 Subject: feat: add syscall aggregate sampling infrastructure (task 17) --- internal/eventloop_runtime.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) (limited to 'internal/eventloop_runtime.go') diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index 35714a1..f80f271 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -14,6 +14,8 @@ import ( func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { defer close(e.done) defer e.shutdownCommResolver() + stopAggregateLoop := e.startAggregateDrainLoop(ctx) + defer stopAggregateLoop() if e.cfg.pprofEnable { fmt.Println("Profiling, press Ctrl+C to stop") @@ -39,6 +41,39 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { } } +func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() { + if e.aggregateSrc == nil || e.aggregateSink == nil { + return func() {} + } + + done := make(chan struct{}) + stop := make(chan struct{}) + go func() { + defer close(done) + ticker := time.NewTicker(e.cfg.aggregateDrainEvery) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-stop: + return + case <-ticker.C: + rows, err := e.aggregateSrc.Drain() + if err != nil { + e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err)) + continue + } + e.aggregateSink.IngestSyscallAggregates(rows) + } + } + }() + return func() { + close(stop) + <-done + } +} + func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { pairs := make(chan *event.Pair, 1) -- cgit v1.2.3