summaryrefslogtreecommitdiff
path: root/internal/eventloop_runtime.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/eventloop_runtime.go')
-rw-r--r--internal/eventloop_runtime.go35
1 files changed, 35 insertions, 0 deletions
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go
index 35714a1..f80f271 100644
--- a/internal/eventloop_runtime.go
+++ b/internal/eventloop_runtime.go
@@ -14,6 +14,8 @@ import (
func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) {
defer close(e.done)
defer e.shutdownCommResolver()
+ stopAggregateLoop := e.startAggregateDrainLoop(ctx)
+ defer stopAggregateLoop()
if e.cfg.pprofEnable {
fmt.Println("Profiling, press Ctrl+C to stop")
@@ -39,6 +41,39 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) {
}
}
+func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() {
+ if e.aggregateSrc == nil || e.aggregateSink == nil {
+ return func() {}
+ }
+
+ done := make(chan struct{})
+ stop := make(chan struct{})
+ go func() {
+ defer close(done)
+ ticker := time.NewTicker(e.cfg.aggregateDrainEvery)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-stop:
+ return
+ case <-ticker.C:
+ rows, err := e.aggregateSrc.Drain()
+ if err != nil {
+ e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err))
+ continue
+ }
+ e.aggregateSink.IngestSyscallAggregates(rows)
+ }
+ }
+ }()
+ return func() {
+ close(stop)
+ <-done
+ }
+}
+
func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) {
pairs := make(chan *event.Pair, 1)