summaryrefslogtreecommitdiff
path: root/internal/eventloop_aggregate_test.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-23 18:44:28 +0300
committerPaul Buetow <paul@buetow.org>2026-05-23 18:44:28 +0300
commitb138aed54ff23c3b5557b1336835c9e37137b020 (patch)
treec16aa5d5acace9ffe1e6f68570ddbadc531bc217 /internal/eventloop_aggregate_test.go
parentd300847a794fd9cc823747320d53422880072b3b (diff)
8c extract aggregate draining and filter gating from eventLoop
Move aggregate drain scheduling, filter compatibility policy, trace-ID allowlisting, and warning construction into a dedicated aggregateDrainer type. eventLoop now only creates and ticks it. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Diffstat (limited to 'internal/eventloop_aggregate_test.go')
-rw-r--r--internal/eventloop_aggregate_test.go142
1 files changed, 99 insertions, 43 deletions
diff --git a/internal/eventloop_aggregate_test.go b/internal/eventloop_aggregate_test.go
index 582b598..78cf2a6 100644
--- a/internal/eventloop_aggregate_test.go
+++ b/internal/eventloop_aggregate_test.go
@@ -2,6 +2,7 @@ package internal
import (
"context"
+ "errors"
"sync"
"testing"
"time"
@@ -14,11 +15,15 @@ import (
type aggregateSourceStub struct {
mu sync.Mutex
rows [][]statsengine.SyscallAggregate
+ err error
}
func (s *aggregateSourceStub) Drain() ([]statsengine.SyscallAggregate, error) {
s.mu.Lock()
defer s.mu.Unlock()
+ if s.err != nil {
+ return nil, s.err
+ }
if len(s.rows) == 0 {
return nil, nil
}
@@ -38,50 +43,79 @@ func (s *aggregateSinkStub) IngestSyscallAggregates(rows []statsengine.SyscallAg
s.rows = append(s.rows, rows...)
}
-func TestFilterAggregateRowsForIngestAggregateOnlyTraceIDs(t *testing.T) {
- el := &eventLoop{
- cfg: eventLoopConfig{
- aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
- types.SYS_ENTER_FUTEX: {},
- },
+func TestAggregateDrainerTickFiltersAggregateOnlyTraceIDs(t *testing.T) {
+ drainer := newAggregateDrainer(
+ &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5},
+ }},
},
- }
- el.SetFilter(globalfilter.Filter{})
+ map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ func() globalfilter.Filter { return globalfilter.Filter{} },
+ )
- in := []statsengine.SyscallAggregate{
- {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
- {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5},
+ got := drainer.Tick()
+ if got.warning != "" {
+ t.Fatalf("warning = %q, want empty", got.warning)
}
- got := el.filterAggregateRowsForIngest(in)
- if len(got) != 1 {
- t.Fatalf("filtered rows len = %d, want 1", len(got))
+ if len(got.rows) != 1 {
+ t.Fatalf("filtered rows len = %d, want 1", len(got.rows))
}
- if got[0].TraceID != types.SYS_ENTER_FUTEX {
- t.Fatalf("filtered trace id = %v, want %v", got[0].TraceID, types.SYS_ENTER_FUTEX)
+ if got.rows[0].TraceID != types.SYS_ENTER_FUTEX {
+ t.Fatalf("filtered trace id = %v, want %v", got.rows[0].TraceID, types.SYS_ENTER_FUTEX)
}
}
-func TestFilterAggregateRowsForIngestGatesWhenUnsupportedFilterActive(t *testing.T) {
- el := &eventLoop{
- cfg: eventLoopConfig{
- aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
- types.SYS_ENTER_FUTEX: {},
- },
+func TestAggregateDrainerTickGatesWhenUnsupportedFilterActive(t *testing.T) {
+ drainer := newAggregateDrainer(
+ &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ }},
+ },
+ map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ func() globalfilter.Filter {
+ return globalfilter.Filter{
+ Comm: &globalfilter.StringFilter{Pattern: "ioworkload"},
+ }
},
+ )
+
+ got := drainer.Tick()
+ if got.warning != "" {
+ t.Fatalf("warning = %q, want empty", got.warning)
+ }
+ if len(got.rows) != 0 {
+ t.Fatalf("expected no rows when comm filter is active, got %+v", got.rows)
}
- el.SetFilter(globalfilter.Filter{
- Comm: &globalfilter.StringFilter{Pattern: "ioworkload"},
- })
+}
- got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{
- {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
- })
- if len(got) != 0 {
- t.Fatalf("expected no rows when comm filter is active, got %+v", got)
+func TestAggregateDrainerTickRejectsRowsWithoutAggregateOnlyTraceIDs(t *testing.T) {
+ drainer := newAggregateDrainer(
+ &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ }},
+ },
+ nil,
+ func() globalfilter.Filter { return globalfilter.Filter{} },
+ )
+
+ got := drainer.Tick()
+ if got.warning != "" {
+ t.Fatalf("warning = %q, want empty", got.warning)
+ }
+ if len(got.rows) != 0 {
+ t.Fatalf("expected no rows without aggregate-only trace IDs, got %+v", got.rows)
}
}
-func TestFilterAggregateRowsForIngestRejectsPIDAndTIDFilters(t *testing.T) {
+func TestAggregateDrainerTickRejectsPIDAndTIDFilters(t *testing.T) {
tests := []struct {
name string
filter globalfilter.Filter
@@ -92,25 +126,47 @@ func TestFilterAggregateRowsForIngestRejectsPIDAndTIDFilters(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- el := &eventLoop{
- cfg: eventLoopConfig{
- aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
- types.SYS_ENTER_FUTEX: {},
- },
+ drainer := newAggregateDrainer(
+ &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ }},
},
- }
- el.SetFilter(tt.filter)
+ map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ func() globalfilter.Filter { return tt.filter },
+ )
- got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{
- {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
- })
- if len(got) != 0 {
- t.Fatalf("expected no aggregate rows with %s filter, got %+v", tt.name, got)
+ got := drainer.Tick()
+ if got.warning != "" {
+ t.Fatalf("warning = %q, want empty", got.warning)
+ }
+ if len(got.rows) != 0 {
+ t.Fatalf("expected no aggregate rows with %s filter, got %+v", tt.name, got.rows)
}
})
}
}
+func TestAggregateDrainerTickReturnsDrainWarning(t *testing.T) {
+ drainer := newAggregateDrainer(
+ &aggregateSourceStub{err: errors.New("boom")},
+ map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ func() globalfilter.Filter { return globalfilter.Filter{} },
+ )
+
+ got := drainer.Tick()
+ if len(got.rows) != 0 {
+ t.Fatalf("rows len = %d, want 0", len(got.rows))
+ }
+ if got.warning != "syscall aggregate drain failed: boom" {
+ t.Fatalf("warning = %q, want drain failure", got.warning)
+ }
+}
+
func TestStartAggregateDrainLoopFinalFlushesOnStop(t *testing.T) {
src := &aggregateSourceStub{
rows: [][]statsengine.SyscallAggregate{