diff options
Diffstat (limited to 'internal/eventloop_runtime.go')
| -rw-r--r-- | internal/eventloop_runtime.go | 82 |
1 files changed, 7 insertions, 75 deletions
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index 24214ca..c4fa6b0 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -4,13 +4,10 @@ import ( "context" "fmt" "runtime/debug" - "strings" "time" "ior/internal/event" "ior/internal/file" - "ior/internal/globalfilter" - "ior/internal/statsengine" "ior/internal/types" ) @@ -49,84 +46,19 @@ func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() { return func() {} } - done := make(chan struct{}) - stop := make(chan struct{}) - go func() { - defer close(done) - ticker := time.NewTicker(e.cfg.aggregateDrainEvery) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-stop: - return - case <-ticker.C: - e.drainAggregatesOnce() - } - } - }() - return func() { - close(stop) - <-done - e.drainAggregatesOnce() - } + drainer := newAggregateDrainer(e.aggregateSrc, e.cfg.aggregateOnlyTraceIDs, e.Filter) + return drainer.Start(ctx, e.cfg.aggregateDrainEvery, e.handleAggregateDrainResult) } -func (e *eventLoop) drainAggregatesOnce() { - rows, err := e.aggregateSrc.Drain() - if err != nil { - e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err)) +func (e *eventLoop) handleAggregateDrainResult(result aggregateDrainResult) { + if result.warning != "" { + e.notifyWarning(result.warning) return } - rows = e.filterAggregateRowsForIngest(rows) - if len(rows) == 0 { + if len(result.rows) == 0 { return } - e.aggregateSink.IngestSyscallAggregates(rows) -} - -func (e *eventLoop) filterAggregateRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate { - if len(rows) == 0 { - return nil - } - if !aggregateIngestAllowedForFilter(e.Filter()) { - return nil - } - if len(e.cfg.aggregateOnlyTraceIDs) == 0 { - return nil - } - - filtered := make([]statsengine.SyscallAggregate, 0, len(rows)) - for _, row := range rows { - if _, ok := e.cfg.aggregateOnlyTraceIDs[row.TraceID]; ok { - filtered = append(filtered, row) - } - } - return filtered -} - -func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool { - if filter.ErrorsOnly { - return false - } - if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) { - return false - } - if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil { - return false - } - if filter.PID != nil { - return false - } - if filter.TID != nil { - return false - } - return true -} - -func hasPattern(filter *globalfilter.StringFilter) bool { - return filter != nil && strings.TrimSpace(filter.Pattern) != "" + e.aggregateSink.IngestSyscallAggregates(result.rows) } func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { |
