summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-23 20:23:26 +0300
committerPaul Buetow <paul@buetow.org>2026-05-23 20:23:26 +0300
commitea68c8d4c9a1db379735c7c4f327fcbf8d84a571 (patch)
tree0e397ea88ce07435ce38976ffea74afdf849e03e
parent933d55996f826e1adfd839d3a24f97003f97f374 (diff)
7c add end-to-end tests for aggregate-only stats ingestion
Wire a real statsengine.Engine as the aggregate sink in five new tests to verify the full drain path (source -> drainer -> filter -> engine -> snapshot), closing the coverage gap where BPF aggregate decode, filter gating, or stats ingestion could break while the existing sampling test still passed. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
-rw-r--r--internal/eventloop_aggregate_test.go297
1 files changed, 297 insertions, 0 deletions
diff --git a/internal/eventloop_aggregate_test.go b/internal/eventloop_aggregate_test.go
index 78cf2a6..af0fac2 100644
--- a/internal/eventloop_aggregate_test.go
+++ b/internal/eventloop_aggregate_test.go
@@ -201,3 +201,300 @@ func TestStartAggregateDrainLoopFinalFlushesOnStop(t *testing.T) {
t.Fatalf("final flush row = %+v, want futex count=2", sink.rows[0])
}
}
+
+// TestAggregateEndToEndDrainIntoStatsEngine verifies that aggregate rows flow
+// from the source stub through the drainer, past filter gating, into a real
+// statsengine.Engine and appear in the resulting snapshot. This covers the
+// full ingestion path that individual unit tests cannot: decode -> filter ->
+// IngestSyscallAggregates -> Snapshot.
+func TestAggregateEndToEndDrainIntoStatsEngine(t *testing.T) {
+ engine := statsengine.NewEngine(statsengine.DefaultTopN)
+ src := &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {
+ TraceID: types.SYS_ENTER_FUTEX,
+ Count: 5,
+ Errors: 1,
+ TotalLatencyNs: 500,
+ MinLatencyNs: 50,
+ MaxLatencyNs: 200,
+ LatencyHistogramNs: [8]uint64{
+ 0, 2, 3, 0, 0, 0, 0, 0,
+ },
+ },
+ }},
+ }
+
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateDrainEvery: 5 * time.Second,
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ aggregateSrc: src,
+ aggregateSink: engine,
+ }
+ el.SetFilter(globalfilter.Filter{})
+
+ // Run the drain loop and immediately stop so the final flush fires.
+ ctx, cancel := context.WithCancel(context.Background())
+ stop := el.startAggregateDrainLoop(ctx)
+ cancel()
+ stop()
+
+ snap, err := engine.Snapshot()
+ if err != nil {
+ t.Fatalf("snapshot error: %v", err)
+ }
+ if snap.TotalSyscalls != 5 {
+ t.Fatalf("TotalSyscalls = %d, want 5", snap.TotalSyscalls)
+ }
+ if snap.TotalErrors != 1 {
+ t.Fatalf("TotalErrors = %d, want 1", snap.TotalErrors)
+ }
+
+ // Verify the per-syscall row for futex exists with correct counts.
+ futexRow := findSyscallSnapshot(t, snap.Syscalls(), types.SYS_ENTER_FUTEX)
+ if futexRow.Count != 5 {
+ t.Fatalf("futex Count = %d, want 5", futexRow.Count)
+ }
+ if futexRow.Errors != 1 {
+ t.Fatalf("futex Errors = %d, want 1", futexRow.Errors)
+ }
+ if futexRow.TotalLatencyNs != 500 {
+ t.Fatalf("futex TotalLatencyNs = %d, want 500", futexRow.TotalLatencyNs)
+ }
+ if futexRow.LatencyMinNs != 50 {
+ t.Fatalf("futex LatencyMinNs = %d, want 50", futexRow.LatencyMinNs)
+ }
+ if futexRow.LatencyMaxNs != 200 {
+ t.Fatalf("futex LatencyMaxNs = %d, want 200", futexRow.LatencyMaxNs)
+ }
+
+ // Verify the latency histogram received the bucket counts.
+ if snap.LatencyHistogram.Total != 5 {
+ t.Fatalf("LatencyHistogram.Total = %d, want 5", snap.LatencyHistogram.Total)
+ }
+}
+
+// TestAggregateEndToEndMultipleDrainTicksAccumulate verifies that successive
+// drain ticks from separate source batches accumulate into the statsengine
+// correctly, ensuring the periodic drain loop handles multiple rounds.
+func TestAggregateEndToEndMultipleDrainTicksAccumulate(t *testing.T) {
+ engine := statsengine.NewEngine(statsengine.DefaultTopN)
+ src := &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{
+ {{TraceID: types.SYS_ENTER_FUTEX, Count: 3, TotalLatencyNs: 300, MinLatencyNs: 100, MaxLatencyNs: 100}},
+ {{TraceID: types.SYS_ENTER_FUTEX, Count: 7, TotalLatencyNs: 700, MinLatencyNs: 50, MaxLatencyNs: 200}},
+ },
+ }
+
+ // Use a short drain interval so the ticker fires for both batches.
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateDrainEvery: time.Millisecond,
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ aggregateSrc: src,
+ aggregateSink: engine,
+ }
+ el.SetFilter(globalfilter.Filter{})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ stop := el.startAggregateDrainLoop(ctx)
+ // Allow enough time for both tick rounds to fire.
+ time.Sleep(10 * time.Millisecond)
+ cancel()
+ stop()
+
+ snap, err := engine.Snapshot()
+ if err != nil {
+ t.Fatalf("snapshot error: %v", err)
+ }
+ if snap.TotalSyscalls != 10 {
+ t.Fatalf("TotalSyscalls = %d, want 10 (3+7)", snap.TotalSyscalls)
+ }
+
+ futexRow := findSyscallSnapshot(t, snap.Syscalls(), types.SYS_ENTER_FUTEX)
+ if futexRow.Count != 10 {
+ t.Fatalf("futex Count = %d, want 10", futexRow.Count)
+ }
+ // Min should reflect the lowest across both batches (50).
+ if futexRow.LatencyMinNs != 50 {
+ t.Fatalf("futex LatencyMinNs = %d, want 50", futexRow.LatencyMinNs)
+ }
+ // Max should reflect the highest across both batches (200).
+ if futexRow.LatencyMaxNs != 200 {
+ t.Fatalf("futex LatencyMaxNs = %d, want 200", futexRow.LatencyMaxNs)
+ }
+}
+
+// TestAggregateEndToEndNonDesignatedSyscallsFiltered verifies that only
+// aggregate-only designated trace IDs reach the statsengine. A source emitting
+// rows for both futex (designated) and clock_gettime (not designated) should
+// only produce a futex row in the snapshot.
+func TestAggregateEndToEndNonDesignatedSyscallsFiltered(t *testing.T) {
+ engine := statsengine.NewEngine(statsengine.DefaultTopN)
+ src := &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 4, TotalLatencyNs: 400, MinLatencyNs: 100, MaxLatencyNs: 100},
+ {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 8, TotalLatencyNs: 800, MinLatencyNs: 100, MaxLatencyNs: 100},
+ }},
+ }
+
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateDrainEvery: 5 * time.Second,
+ // Only futex is aggregate-only; clock_gettime should be dropped.
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ aggregateSrc: src,
+ aggregateSink: engine,
+ }
+ el.SetFilter(globalfilter.Filter{})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ stop := el.startAggregateDrainLoop(ctx)
+ cancel()
+ stop()
+
+ snap, err := engine.Snapshot()
+ if err != nil {
+ t.Fatalf("snapshot error: %v", err)
+ }
+ if snap.TotalSyscalls != 4 {
+ t.Fatalf("TotalSyscalls = %d, want 4 (only futex)", snap.TotalSyscalls)
+ }
+
+ // Futex should be present.
+ futexRow := findSyscallSnapshot(t, snap.Syscalls(), types.SYS_ENTER_FUTEX)
+ if futexRow.Count != 4 {
+ t.Fatalf("futex Count = %d, want 4", futexRow.Count)
+ }
+
+ // clock_gettime should be absent.
+ for _, row := range snap.Syscalls() {
+ if row.TraceID == types.SYS_ENTER_CLOCK_GETTIME {
+ t.Fatalf("clock_gettime should not appear in snapshot, got %+v", row)
+ }
+ }
+}
+
+// TestAggregateEndToEndFilterGateBlocksIngestion verifies that when an
+// unsupported filter (e.g. comm filter) is active, aggregate rows never
+// reach the statsengine even though the source emits data.
+func TestAggregateEndToEndFilterGateBlocksIngestion(t *testing.T) {
+ engine := statsengine.NewEngine(statsengine.DefaultTopN)
+ src := &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 10, TotalLatencyNs: 1000, MinLatencyNs: 100, MaxLatencyNs: 100},
+ }},
+ }
+
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateDrainEvery: 5 * time.Second,
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ aggregateSrc: src,
+ aggregateSink: engine,
+ }
+ // Set a comm filter that blocks aggregate ingestion.
+ el.SetFilter(globalfilter.Filter{
+ Comm: &globalfilter.StringFilter{Pattern: "myapp"},
+ })
+
+ ctx, cancel := context.WithCancel(context.Background())
+ stop := el.startAggregateDrainLoop(ctx)
+ cancel()
+ stop()
+
+ snap, err := engine.Snapshot()
+ if err != nil {
+ t.Fatalf("snapshot error: %v", err)
+ }
+ if snap.TotalSyscalls != 0 {
+ t.Fatalf("TotalSyscalls = %d, want 0 (comm filter should block aggregates)", snap.TotalSyscalls)
+ }
+ if len(snap.Syscalls()) != 0 {
+ t.Fatalf("expected no syscall rows with comm filter active, got %d", len(snap.Syscalls()))
+ }
+}
+
+// TestAggregateEndToEndFamilyRowPopulated verifies that aggregate-only
+// ingestion also populates the per-family snapshot rows, not just per-syscall.
+func TestAggregateEndToEndFamilyRowPopulated(t *testing.T) {
+ engine := statsengine.NewEngine(statsengine.DefaultTopN)
+ src := &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {
+ TraceID: types.SYS_ENTER_FUTEX,
+ Count: 6,
+ TotalLatencyNs: 600,
+ MinLatencyNs: 50,
+ MaxLatencyNs: 200,
+ },
+ }},
+ }
+
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateDrainEvery: 5 * time.Second,
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ aggregateSrc: src,
+ aggregateSink: engine,
+ }
+ el.SetFilter(globalfilter.Filter{})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ stop := el.startAggregateDrainLoop(ctx)
+ cancel()
+ stop()
+
+ snap, err := engine.Snapshot()
+ if err != nil {
+ t.Fatalf("snapshot error: %v", err)
+ }
+
+ // Futex belongs to a syscall family; verify the family row is populated.
+ futexFamily := types.SYS_ENTER_FUTEX.Family()
+ var found bool
+ for _, fam := range snap.Families() {
+ if fam.Family == futexFamily {
+ found = true
+ if fam.Count != 6 {
+ t.Fatalf("family %s Count = %d, want 6", futexFamily, fam.Count)
+ }
+ if fam.TotalLatencyNs != 600 {
+ t.Fatalf("family %s TotalLatencyNs = %d, want 600", futexFamily, fam.TotalLatencyNs)
+ }
+ break
+ }
+ }
+ if !found {
+ t.Fatalf("expected family row for %s, got families: %+v", futexFamily, snap.Families())
+ }
+}
+
+// findSyscallSnapshot locates a SyscallSnapshot by trace ID and fails the test
+// if no matching row exists.
+func findSyscallSnapshot(t *testing.T, rows []statsengine.SyscallSnapshot, id types.TraceId) statsengine.SyscallSnapshot {
+ t.Helper()
+ for _, row := range rows {
+ if row.TraceID == id {
+ return row
+ }
+ }
+ t.Fatalf("no syscall snapshot row for trace ID %v", id)
+ return statsengine.SyscallSnapshot{}
+}