summaryrefslogtreecommitdiff
path: root/internal/aggregate_drainer.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-23 18:44:28 +0300
committerPaul Buetow <paul@buetow.org>2026-05-23 18:44:28 +0300
commitb138aed54ff23c3b5557b1336835c9e37137b020 (patch)
treec16aa5d5acace9ffe1e6f68570ddbadc531bc217 /internal/aggregate_drainer.go
parentd300847a794fd9cc823747320d53422880072b3b (diff)
8c extract aggregate draining and filter gating from eventLoop
Move aggregate drain scheduling, filter compatibility policy, trace-ID allowlisting, and warning construction into a dedicated aggregateDrainer type. eventLoop now only creates and ticks it. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Diffstat (limited to 'internal/aggregate_drainer.go')
-rw-r--r--internal/aggregate_drainer.go129
1 files changed, 129 insertions, 0 deletions
diff --git a/internal/aggregate_drainer.go b/internal/aggregate_drainer.go
new file mode 100644
index 0000000..87c5e1a
--- /dev/null
+++ b/internal/aggregate_drainer.go
@@ -0,0 +1,129 @@
+package internal
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+
+ "ior/internal/globalfilter"
+ "ior/internal/statsengine"
+ "ior/internal/types"
+)
+
+type aggregateDrainResult struct {
+ rows []statsengine.SyscallAggregate
+ warning string
+}
+
+type aggregateDrainer struct {
+ source syscallAggregateSource
+ filter func() globalfilter.Filter
+ aggregateOnlyTraceIDs map[types.TraceId]struct{}
+}
+
+func newAggregateDrainer(
+ source syscallAggregateSource,
+ aggregateOnlyTraceIDs map[types.TraceId]struct{},
+ filter func() globalfilter.Filter,
+) *aggregateDrainer {
+ return &aggregateDrainer{
+ source: source,
+ filter: filter,
+ aggregateOnlyTraceIDs: aggregateOnlyTraceIDs,
+ }
+}
+
+func (d *aggregateDrainer) Tick() aggregateDrainResult {
+ if d == nil || d.source == nil {
+ return aggregateDrainResult{}
+ }
+ rows, err := d.source.Drain()
+ if err != nil {
+ return aggregateDrainResult{warning: fmt.Sprintf("syscall aggregate drain failed: %v", err)}
+ }
+ rows = d.filterRowsForIngest(rows)
+ if len(rows) == 0 {
+ return aggregateDrainResult{}
+ }
+ return aggregateDrainResult{rows: rows}
+}
+
+func (d *aggregateDrainer) Start(ctx context.Context, every time.Duration, handle func(aggregateDrainResult)) func() {
+ if d == nil || d.source == nil {
+ return func() {}
+ }
+
+ done := make(chan struct{})
+ stop := make(chan struct{})
+ go func() {
+ defer close(done)
+ ticker := time.NewTicker(every)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-stop:
+ return
+ case <-ticker.C:
+ handle(d.Tick())
+ }
+ }
+ }()
+ return func() {
+ close(stop)
+ <-done
+ handle(d.Tick())
+ }
+}
+
+func (d *aggregateDrainer) filterRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate {
+ if len(rows) == 0 {
+ return nil
+ }
+ if !aggregateIngestAllowedForFilter(d.currentFilter()) {
+ return nil
+ }
+ if len(d.aggregateOnlyTraceIDs) == 0 {
+ return nil
+ }
+
+ filtered := make([]statsengine.SyscallAggregate, 0, len(rows))
+ for _, row := range rows {
+ if _, ok := d.aggregateOnlyTraceIDs[row.TraceID]; ok {
+ filtered = append(filtered, row)
+ }
+ }
+ return filtered
+}
+
+func (d *aggregateDrainer) currentFilter() globalfilter.Filter {
+ if d == nil || d.filter == nil {
+ return globalfilter.Filter{}
+ }
+ return d.filter()
+}
+
+func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool {
+ if filter.ErrorsOnly {
+ return false
+ }
+ if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) {
+ return false
+ }
+ if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil {
+ return false
+ }
+ if filter.PID != nil {
+ return false
+ }
+ if filter.TID != nil {
+ return false
+ }
+ return true
+}
+
+func hasPattern(filter *globalfilter.StringFilter) bool {
+ return filter != nil && strings.TrimSpace(filter.Pattern) != ""
+}