diff options
Diffstat (limited to 'internal/eventloop_runtime.go')
| -rw-r--r-- | internal/eventloop_runtime.go | 75 |
1 files changed, 69 insertions, 6 deletions
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index f80f271..697de07 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -4,10 +4,13 @@ import ( "context" "fmt" "runtime/debug" + "strings" "time" "ior/internal/event" "ior/internal/file" + "ior/internal/globalfilter" + "ior/internal/statsengine" "ior/internal/types" ) @@ -59,19 +62,79 @@ func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() { case <-stop: return case <-ticker.C: - rows, err := e.aggregateSrc.Drain() - if err != nil { - e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err)) - continue - } - e.aggregateSink.IngestSyscallAggregates(rows) + e.drainAggregatesOnce() } } }() return func() { close(stop) <-done + e.drainAggregatesOnce() + } +} + +func (e *eventLoop) drainAggregatesOnce() { + rows, err := e.aggregateSrc.Drain() + if err != nil { + e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err)) + return + } + rows = e.filterAggregateRowsForIngest(rows) + if len(rows) == 0 { + return + } + e.aggregateSink.IngestSyscallAggregates(rows) +} + +func (e *eventLoop) filterAggregateRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate { + if len(rows) == 0 { + return nil + } + if !aggregateIngestAllowedForFilter(e.Filter()) { + return nil + } + if len(e.cfg.aggregateOnlyTraceIDs) == 0 { + return nil + } + + filtered := make([]statsengine.SyscallAggregate, 0, len(rows)) + for _, row := range rows { + if _, ok := e.cfg.aggregateOnlyTraceIDs[row.TraceID]; ok { + filtered = append(filtered, row) + } + } + return filtered +} + +func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool { + if filter.ErrorsOnly { + return false + } + if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) { + return false + } + if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil { + return false } + if !isKernelFilterNumericEq(filter.PID) { + return false + } + if !isKernelFilterNumericEq(filter.TID) { + return false + } + return true +} + +func isKernelFilterNumericEq(filter *globalfilter.NumericFilter) bool { + if filter == nil { + return true + } + _, ok := filter.EqValue() + return ok +} + +func hasPattern(filter *globalfilter.StringFilter) bool { + return filter != nil && strings.TrimSpace(filter.Pattern) != "" } func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { |
