diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-23 18:44:28 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-23 18:44:28 +0300 |
| commit | b138aed54ff23c3b5557b1336835c9e37137b020 (patch) | |
| tree | c16aa5d5acace9ffe1e6f68570ddbadc531bc217 /internal/eventloop_aggregate_test.go | |
| parent | d300847a794fd9cc823747320d53422880072b3b (diff) | |
8c extract aggregate draining and filter gating from eventLoop
Move aggregate drain scheduling, filter compatibility policy,
trace-ID allowlisting, and warning construction into a dedicated
aggregateDrainer type. eventLoop now only creates and ticks it.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Diffstat (limited to 'internal/eventloop_aggregate_test.go')
| -rw-r--r-- | internal/eventloop_aggregate_test.go | 142 |
1 files changed, 99 insertions, 43 deletions
diff --git a/internal/eventloop_aggregate_test.go b/internal/eventloop_aggregate_test.go index 582b598..78cf2a6 100644 --- a/internal/eventloop_aggregate_test.go +++ b/internal/eventloop_aggregate_test.go @@ -2,6 +2,7 @@ package internal import ( "context" + "errors" "sync" "testing" "time" @@ -14,11 +15,15 @@ import ( type aggregateSourceStub struct { mu sync.Mutex rows [][]statsengine.SyscallAggregate + err error } func (s *aggregateSourceStub) Drain() ([]statsengine.SyscallAggregate, error) { s.mu.Lock() defer s.mu.Unlock() + if s.err != nil { + return nil, s.err + } if len(s.rows) == 0 { return nil, nil } @@ -38,50 +43,79 @@ func (s *aggregateSinkStub) IngestSyscallAggregates(rows []statsengine.SyscallAg s.rows = append(s.rows, rows...) } -func TestFilterAggregateRowsForIngestAggregateOnlyTraceIDs(t *testing.T) { - el := &eventLoop{ - cfg: eventLoopConfig{ - aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ - types.SYS_ENTER_FUTEX: {}, - }, +func TestAggregateDrainerTickFiltersAggregateOnlyTraceIDs(t *testing.T) { + drainer := newAggregateDrainer( + &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5}, + }}, }, - } - el.SetFilter(globalfilter.Filter{}) + map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + func() globalfilter.Filter { return globalfilter.Filter{} }, + ) - in := []statsengine.SyscallAggregate{ - {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, - {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5}, + got := drainer.Tick() + if got.warning != "" { + t.Fatalf("warning = %q, want empty", got.warning) } - got := el.filterAggregateRowsForIngest(in) - if len(got) != 1 { - t.Fatalf("filtered rows len = %d, want 1", len(got)) + if len(got.rows) != 1 { + t.Fatalf("filtered rows len = %d, want 1", len(got.rows)) } - if got[0].TraceID != types.SYS_ENTER_FUTEX { - t.Fatalf("filtered trace id = %v, want %v", got[0].TraceID, types.SYS_ENTER_FUTEX) + if got.rows[0].TraceID != types.SYS_ENTER_FUTEX { + t.Fatalf("filtered trace id = %v, want %v", got.rows[0].TraceID, types.SYS_ENTER_FUTEX) } } -func TestFilterAggregateRowsForIngestGatesWhenUnsupportedFilterActive(t *testing.T) { - el := &eventLoop{ - cfg: eventLoopConfig{ - aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ - types.SYS_ENTER_FUTEX: {}, - }, +func TestAggregateDrainerTickGatesWhenUnsupportedFilterActive(t *testing.T) { + drainer := newAggregateDrainer( + &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + }}, + }, + map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + func() globalfilter.Filter { + return globalfilter.Filter{ + Comm: &globalfilter.StringFilter{Pattern: "ioworkload"}, + } }, + ) + + got := drainer.Tick() + if got.warning != "" { + t.Fatalf("warning = %q, want empty", got.warning) + } + if len(got.rows) != 0 { + t.Fatalf("expected no rows when comm filter is active, got %+v", got.rows) } - el.SetFilter(globalfilter.Filter{ - Comm: &globalfilter.StringFilter{Pattern: "ioworkload"}, - }) +} - got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{ - {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, - }) - if len(got) != 0 { - t.Fatalf("expected no rows when comm filter is active, got %+v", got) +func TestAggregateDrainerTickRejectsRowsWithoutAggregateOnlyTraceIDs(t *testing.T) { + drainer := newAggregateDrainer( + &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + }}, + }, + nil, + func() globalfilter.Filter { return globalfilter.Filter{} }, + ) + + got := drainer.Tick() + if got.warning != "" { + t.Fatalf("warning = %q, want empty", got.warning) + } + if len(got.rows) != 0 { + t.Fatalf("expected no rows without aggregate-only trace IDs, got %+v", got.rows) } } -func TestFilterAggregateRowsForIngestRejectsPIDAndTIDFilters(t *testing.T) { +func TestAggregateDrainerTickRejectsPIDAndTIDFilters(t *testing.T) { tests := []struct { name string filter globalfilter.Filter @@ -92,25 +126,47 @@ func TestFilterAggregateRowsForIngestRejectsPIDAndTIDFilters(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - el := &eventLoop{ - cfg: eventLoopConfig{ - aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ - types.SYS_ENTER_FUTEX: {}, - }, + drainer := newAggregateDrainer( + &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + }}, }, - } - el.SetFilter(tt.filter) + map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + func() globalfilter.Filter { return tt.filter }, + ) - got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{ - {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, - }) - if len(got) != 0 { - t.Fatalf("expected no aggregate rows with %s filter, got %+v", tt.name, got) + got := drainer.Tick() + if got.warning != "" { + t.Fatalf("warning = %q, want empty", got.warning) + } + if len(got.rows) != 0 { + t.Fatalf("expected no aggregate rows with %s filter, got %+v", tt.name, got.rows) } }) } } +func TestAggregateDrainerTickReturnsDrainWarning(t *testing.T) { + drainer := newAggregateDrainer( + &aggregateSourceStub{err: errors.New("boom")}, + map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + func() globalfilter.Filter { return globalfilter.Filter{} }, + ) + + got := drainer.Tick() + if len(got.rows) != 0 { + t.Fatalf("rows len = %d, want 0", len(got.rows)) + } + if got.warning != "syscall aggregate drain failed: boom" { + t.Fatalf("warning = %q, want drain failure", got.warning) + } +} + func TestStartAggregateDrainLoopFinalFlushesOnStop(t *testing.T) { src := &aggregateSourceStub{ rows: [][]statsengine.SyscallAggregate{ |
