diff options
Diffstat (limited to 'internal/eventloop_runtime.go')
| -rw-r--r-- | internal/eventloop_runtime.go | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index 35714a1..f80f271 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -14,6 +14,8 @@ import ( func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { defer close(e.done) defer e.shutdownCommResolver() + stopAggregateLoop := e.startAggregateDrainLoop(ctx) + defer stopAggregateLoop() if e.cfg.pprofEnable { fmt.Println("Profiling, press Ctrl+C to stop") @@ -39,6 +41,39 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { } } +func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() { + if e.aggregateSrc == nil || e.aggregateSink == nil { + return func() {} + } + + done := make(chan struct{}) + stop := make(chan struct{}) + go func() { + defer close(done) + ticker := time.NewTicker(e.cfg.aggregateDrainEvery) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-stop: + return + case <-ticker.C: + rows, err := e.aggregateSrc.Drain() + if err != nil { + e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err)) + continue + } + e.aggregateSink.IngestSyscallAggregates(rows) + } + } + }() + return func() { + close(stop) + <-done + } +} + func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { pairs := make(chan *event.Pair, 1) |
