diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-23 18:44:28 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-23 18:44:28 +0300 |
| commit | b138aed54ff23c3b5557b1336835c9e37137b020 (patch) | |
| tree | c16aa5d5acace9ffe1e6f68570ddbadc531bc217 /internal/eventloop_runtime.go | |
| parent | d300847a794fd9cc823747320d53422880072b3b (diff) | |
8c extract aggregate draining and filter gating from eventLoop
Move aggregate drain scheduling, filter compatibility policy,
trace-ID allowlisting, and warning construction into a dedicated
aggregateDrainer type. eventLoop now only creates and ticks it.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Diffstat (limited to 'internal/eventloop_runtime.go')
| -rw-r--r-- | internal/eventloop_runtime.go | 82 |
1 files changed, 7 insertions, 75 deletions
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index 24214ca..c4fa6b0 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -4,13 +4,10 @@ import ( "context" "fmt" "runtime/debug" - "strings" "time" "ior/internal/event" "ior/internal/file" - "ior/internal/globalfilter" - "ior/internal/statsengine" "ior/internal/types" ) @@ -49,84 +46,19 @@ func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() { return func() {} } - done := make(chan struct{}) - stop := make(chan struct{}) - go func() { - defer close(done) - ticker := time.NewTicker(e.cfg.aggregateDrainEvery) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-stop: - return - case <-ticker.C: - e.drainAggregatesOnce() - } - } - }() - return func() { - close(stop) - <-done - e.drainAggregatesOnce() - } + drainer := newAggregateDrainer(e.aggregateSrc, e.cfg.aggregateOnlyTraceIDs, e.Filter) + return drainer.Start(ctx, e.cfg.aggregateDrainEvery, e.handleAggregateDrainResult) } -func (e *eventLoop) drainAggregatesOnce() { - rows, err := e.aggregateSrc.Drain() - if err != nil { - e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err)) +func (e *eventLoop) handleAggregateDrainResult(result aggregateDrainResult) { + if result.warning != "" { + e.notifyWarning(result.warning) return } - rows = e.filterAggregateRowsForIngest(rows) - if len(rows) == 0 { + if len(result.rows) == 0 { return } - e.aggregateSink.IngestSyscallAggregates(rows) -} - -func (e *eventLoop) filterAggregateRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate { - if len(rows) == 0 { - return nil - } - if !aggregateIngestAllowedForFilter(e.Filter()) { - return nil - } - if len(e.cfg.aggregateOnlyTraceIDs) == 0 { - return nil - } - - filtered := make([]statsengine.SyscallAggregate, 0, len(rows)) - for _, row := range rows { - if _, ok := e.cfg.aggregateOnlyTraceIDs[row.TraceID]; ok { - filtered = append(filtered, row) - } - } - return filtered -} - -func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool { - if filter.ErrorsOnly { - return false - } - if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) { - return false - } - if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil { - return false - } - if filter.PID != nil { - return false - } - if filter.TID != nil { - return false - } - return true -} - -func hasPattern(filter *globalfilter.StringFilter) bool { - return filter != nil && strings.TrimSpace(filter.Pattern) != "" + e.aggregateSink.IngestSyscallAggregates(result.rows) } func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { |
