diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-23 18:44:28 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-23 18:44:28 +0300 |
| commit | b138aed54ff23c3b5557b1336835c9e37137b020 (patch) | |
| tree | c16aa5d5acace9ffe1e6f68570ddbadc531bc217 | |
| parent | d300847a794fd9cc823747320d53422880072b3b (diff) | |
8c extract aggregate draining and filter gating from eventLoop
Move aggregate drain scheduling, filter compatibility policy,
trace-ID allowlisting, and warning construction into a dedicated
aggregateDrainer type. eventLoop now only creates and ticks it.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
| -rw-r--r-- | internal/aggregate_drainer.go | 129 | ||||
| -rw-r--r-- | internal/eventloop_aggregate_test.go | 142 | ||||
| -rw-r--r-- | internal/eventloop_runtime.go | 82 |
3 files changed, 235 insertions, 118 deletions
diff --git a/internal/aggregate_drainer.go b/internal/aggregate_drainer.go new file mode 100644 index 0000000..87c5e1a --- /dev/null +++ b/internal/aggregate_drainer.go @@ -0,0 +1,129 @@ +package internal + +import ( + "context" + "fmt" + "strings" + "time" + + "ior/internal/globalfilter" + "ior/internal/statsengine" + "ior/internal/types" +) + +type aggregateDrainResult struct { + rows []statsengine.SyscallAggregate + warning string +} + +type aggregateDrainer struct { + source syscallAggregateSource + filter func() globalfilter.Filter + aggregateOnlyTraceIDs map[types.TraceId]struct{} +} + +func newAggregateDrainer( + source syscallAggregateSource, + aggregateOnlyTraceIDs map[types.TraceId]struct{}, + filter func() globalfilter.Filter, +) *aggregateDrainer { + return &aggregateDrainer{ + source: source, + filter: filter, + aggregateOnlyTraceIDs: aggregateOnlyTraceIDs, + } +} + +func (d *aggregateDrainer) Tick() aggregateDrainResult { + if d == nil || d.source == nil { + return aggregateDrainResult{} + } + rows, err := d.source.Drain() + if err != nil { + return aggregateDrainResult{warning: fmt.Sprintf("syscall aggregate drain failed: %v", err)} + } + rows = d.filterRowsForIngest(rows) + if len(rows) == 0 { + return aggregateDrainResult{} + } + return aggregateDrainResult{rows: rows} +} + +func (d *aggregateDrainer) Start(ctx context.Context, every time.Duration, handle func(aggregateDrainResult)) func() { + if d == nil || d.source == nil { + return func() {} + } + + done := make(chan struct{}) + stop := make(chan struct{}) + go func() { + defer close(done) + ticker := time.NewTicker(every) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-stop: + return + case <-ticker.C: + handle(d.Tick()) + } + } + }() + return func() { + close(stop) + <-done + handle(d.Tick()) + } +} + +func (d *aggregateDrainer) filterRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate { + if len(rows) == 0 { + return nil + } + if !aggregateIngestAllowedForFilter(d.currentFilter()) { + return nil + } + if len(d.aggregateOnlyTraceIDs) == 0 { + return nil + } + + filtered := make([]statsengine.SyscallAggregate, 0, len(rows)) + for _, row := range rows { + if _, ok := d.aggregateOnlyTraceIDs[row.TraceID]; ok { + filtered = append(filtered, row) + } + } + return filtered +} + +func (d *aggregateDrainer) currentFilter() globalfilter.Filter { + if d == nil || d.filter == nil { + return globalfilter.Filter{} + } + return d.filter() +} + +func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool { + if filter.ErrorsOnly { + return false + } + if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) { + return false + } + if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil { + return false + } + if filter.PID != nil { + return false + } + if filter.TID != nil { + return false + } + return true +} + +func hasPattern(filter *globalfilter.StringFilter) bool { + return filter != nil && strings.TrimSpace(filter.Pattern) != "" +} diff --git a/internal/eventloop_aggregate_test.go b/internal/eventloop_aggregate_test.go index 582b598..78cf2a6 100644 --- a/internal/eventloop_aggregate_test.go +++ b/internal/eventloop_aggregate_test.go @@ -2,6 +2,7 @@ package internal import ( "context" + "errors" "sync" "testing" "time" @@ -14,11 +15,15 @@ import ( type aggregateSourceStub struct { mu sync.Mutex rows [][]statsengine.SyscallAggregate + err error } func (s *aggregateSourceStub) Drain() ([]statsengine.SyscallAggregate, error) { s.mu.Lock() defer s.mu.Unlock() + if s.err != nil { + return nil, s.err + } if len(s.rows) == 0 { return nil, nil } @@ -38,50 +43,79 @@ func (s *aggregateSinkStub) IngestSyscallAggregates(rows []statsengine.SyscallAg s.rows = append(s.rows, rows...) } -func TestFilterAggregateRowsForIngestAggregateOnlyTraceIDs(t *testing.T) { - el := &eventLoop{ - cfg: eventLoopConfig{ - aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ - types.SYS_ENTER_FUTEX: {}, - }, +func TestAggregateDrainerTickFiltersAggregateOnlyTraceIDs(t *testing.T) { + drainer := newAggregateDrainer( + &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5}, + }}, }, - } - el.SetFilter(globalfilter.Filter{}) + map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + func() globalfilter.Filter { return globalfilter.Filter{} }, + ) - in := []statsengine.SyscallAggregate{ - {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, - {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5}, + got := drainer.Tick() + if got.warning != "" { + t.Fatalf("warning = %q, want empty", got.warning) } - got := el.filterAggregateRowsForIngest(in) - if len(got) != 1 { - t.Fatalf("filtered rows len = %d, want 1", len(got)) + if len(got.rows) != 1 { + t.Fatalf("filtered rows len = %d, want 1", len(got.rows)) } - if got[0].TraceID != types.SYS_ENTER_FUTEX { - t.Fatalf("filtered trace id = %v, want %v", got[0].TraceID, types.SYS_ENTER_FUTEX) + if got.rows[0].TraceID != types.SYS_ENTER_FUTEX { + t.Fatalf("filtered trace id = %v, want %v", got.rows[0].TraceID, types.SYS_ENTER_FUTEX) } } -func TestFilterAggregateRowsForIngestGatesWhenUnsupportedFilterActive(t *testing.T) { - el := &eventLoop{ - cfg: eventLoopConfig{ - aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ - types.SYS_ENTER_FUTEX: {}, - }, +func TestAggregateDrainerTickGatesWhenUnsupportedFilterActive(t *testing.T) { + drainer := newAggregateDrainer( + &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + }}, + }, + map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + func() globalfilter.Filter { + return globalfilter.Filter{ + Comm: &globalfilter.StringFilter{Pattern: "ioworkload"}, + } }, + ) + + got := drainer.Tick() + if got.warning != "" { + t.Fatalf("warning = %q, want empty", got.warning) + } + if len(got.rows) != 0 { + t.Fatalf("expected no rows when comm filter is active, got %+v", got.rows) } - el.SetFilter(globalfilter.Filter{ - Comm: &globalfilter.StringFilter{Pattern: "ioworkload"}, - }) +} - got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{ - {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, - }) - if len(got) != 0 { - t.Fatalf("expected no rows when comm filter is active, got %+v", got) +func TestAggregateDrainerTickRejectsRowsWithoutAggregateOnlyTraceIDs(t *testing.T) { + drainer := newAggregateDrainer( + &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + }}, + }, + nil, + func() globalfilter.Filter { return globalfilter.Filter{} }, + ) + + got := drainer.Tick() + if got.warning != "" { + t.Fatalf("warning = %q, want empty", got.warning) + } + if len(got.rows) != 0 { + t.Fatalf("expected no rows without aggregate-only trace IDs, got %+v", got.rows) } } -func TestFilterAggregateRowsForIngestRejectsPIDAndTIDFilters(t *testing.T) { +func TestAggregateDrainerTickRejectsPIDAndTIDFilters(t *testing.T) { tests := []struct { name string filter globalfilter.Filter @@ -92,25 +126,47 @@ func TestFilterAggregateRowsForIngestRejectsPIDAndTIDFilters(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - el := &eventLoop{ - cfg: eventLoopConfig{ - aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ - types.SYS_ENTER_FUTEX: {}, - }, + drainer := newAggregateDrainer( + &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + }}, }, - } - el.SetFilter(tt.filter) + map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + func() globalfilter.Filter { return tt.filter }, + ) - got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{ - {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, - }) - if len(got) != 0 { - t.Fatalf("expected no aggregate rows with %s filter, got %+v", tt.name, got) + got := drainer.Tick() + if got.warning != "" { + t.Fatalf("warning = %q, want empty", got.warning) + } + if len(got.rows) != 0 { + t.Fatalf("expected no aggregate rows with %s filter, got %+v", tt.name, got.rows) } }) } } +func TestAggregateDrainerTickReturnsDrainWarning(t *testing.T) { + drainer := newAggregateDrainer( + &aggregateSourceStub{err: errors.New("boom")}, + map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + func() globalfilter.Filter { return globalfilter.Filter{} }, + ) + + got := drainer.Tick() + if len(got.rows) != 0 { + t.Fatalf("rows len = %d, want 0", len(got.rows)) + } + if got.warning != "syscall aggregate drain failed: boom" { + t.Fatalf("warning = %q, want drain failure", got.warning) + } +} + func TestStartAggregateDrainLoopFinalFlushesOnStop(t *testing.T) { src := &aggregateSourceStub{ rows: [][]statsengine.SyscallAggregate{ diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index 24214ca..c4fa6b0 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -4,13 +4,10 @@ import ( "context" "fmt" "runtime/debug" - "strings" "time" "ior/internal/event" "ior/internal/file" - "ior/internal/globalfilter" - "ior/internal/statsengine" "ior/internal/types" ) @@ -49,84 +46,19 @@ func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() { return func() {} } - done := make(chan struct{}) - stop := make(chan struct{}) - go func() { - defer close(done) - ticker := time.NewTicker(e.cfg.aggregateDrainEvery) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-stop: - return - case <-ticker.C: - e.drainAggregatesOnce() - } - } - }() - return func() { - close(stop) - <-done - e.drainAggregatesOnce() - } + drainer := newAggregateDrainer(e.aggregateSrc, e.cfg.aggregateOnlyTraceIDs, e.Filter) + return drainer.Start(ctx, e.cfg.aggregateDrainEvery, e.handleAggregateDrainResult) } -func (e *eventLoop) drainAggregatesOnce() { - rows, err := e.aggregateSrc.Drain() - if err != nil { - e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err)) +func (e *eventLoop) handleAggregateDrainResult(result aggregateDrainResult) { + if result.warning != "" { + e.notifyWarning(result.warning) return } - rows = e.filterAggregateRowsForIngest(rows) - if len(rows) == 0 { + if len(result.rows) == 0 { return } - e.aggregateSink.IngestSyscallAggregates(rows) -} - -func (e *eventLoop) filterAggregateRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate { - if len(rows) == 0 { - return nil - } - if !aggregateIngestAllowedForFilter(e.Filter()) { - return nil - } - if len(e.cfg.aggregateOnlyTraceIDs) == 0 { - return nil - } - - filtered := make([]statsengine.SyscallAggregate, 0, len(rows)) - for _, row := range rows { - if _, ok := e.cfg.aggregateOnlyTraceIDs[row.TraceID]; ok { - filtered = append(filtered, row) - } - } - return filtered -} - -func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool { - if filter.ErrorsOnly { - return false - } - if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) { - return false - } - if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil { - return false - } - if filter.PID != nil { - return false - } - if filter.TID != nil { - return false - } - return true -} - -func hasPattern(filter *globalfilter.StringFilter) bool { - return filter != nil && strings.TrimSpace(filter.Pattern) != "" + e.aggregateSink.IngestSyscallAggregates(result.rows) } func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { |
