diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/eventloop_aggregate_test.go | 297 |
1 files changed, 297 insertions, 0 deletions
diff --git a/internal/eventloop_aggregate_test.go b/internal/eventloop_aggregate_test.go index 78cf2a6..af0fac2 100644 --- a/internal/eventloop_aggregate_test.go +++ b/internal/eventloop_aggregate_test.go @@ -201,3 +201,300 @@ func TestStartAggregateDrainLoopFinalFlushesOnStop(t *testing.T) { t.Fatalf("final flush row = %+v, want futex count=2", sink.rows[0]) } } + +// TestAggregateEndToEndDrainIntoStatsEngine verifies that aggregate rows flow +// from the source stub through the drainer, past filter gating, into a real +// statsengine.Engine and appear in the resulting snapshot. This covers the +// full ingestion path that individual unit tests cannot: decode -> filter -> +// IngestSyscallAggregates -> Snapshot. +func TestAggregateEndToEndDrainIntoStatsEngine(t *testing.T) { + engine := statsengine.NewEngine(statsengine.DefaultTopN) + src := &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + { + TraceID: types.SYS_ENTER_FUTEX, + Count: 5, + Errors: 1, + TotalLatencyNs: 500, + MinLatencyNs: 50, + MaxLatencyNs: 200, + LatencyHistogramNs: [8]uint64{ + 0, 2, 3, 0, 0, 0, 0, 0, + }, + }, + }}, + } + + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateDrainEvery: 5 * time.Second, + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + aggregateSrc: src, + aggregateSink: engine, + } + el.SetFilter(globalfilter.Filter{}) + + // Run the drain loop and immediately stop so the final flush fires. + ctx, cancel := context.WithCancel(context.Background()) + stop := el.startAggregateDrainLoop(ctx) + cancel() + stop() + + snap, err := engine.Snapshot() + if err != nil { + t.Fatalf("snapshot error: %v", err) + } + if snap.TotalSyscalls != 5 { + t.Fatalf("TotalSyscalls = %d, want 5", snap.TotalSyscalls) + } + if snap.TotalErrors != 1 { + t.Fatalf("TotalErrors = %d, want 1", snap.TotalErrors) + } + + // Verify the per-syscall row for futex exists with correct counts. + futexRow := findSyscallSnapshot(t, snap.Syscalls(), types.SYS_ENTER_FUTEX) + if futexRow.Count != 5 { + t.Fatalf("futex Count = %d, want 5", futexRow.Count) + } + if futexRow.Errors != 1 { + t.Fatalf("futex Errors = %d, want 1", futexRow.Errors) + } + if futexRow.TotalLatencyNs != 500 { + t.Fatalf("futex TotalLatencyNs = %d, want 500", futexRow.TotalLatencyNs) + } + if futexRow.LatencyMinNs != 50 { + t.Fatalf("futex LatencyMinNs = %d, want 50", futexRow.LatencyMinNs) + } + if futexRow.LatencyMaxNs != 200 { + t.Fatalf("futex LatencyMaxNs = %d, want 200", futexRow.LatencyMaxNs) + } + + // Verify the latency histogram received the bucket counts. + if snap.LatencyHistogram.Total != 5 { + t.Fatalf("LatencyHistogram.Total = %d, want 5", snap.LatencyHistogram.Total) + } +} + +// TestAggregateEndToEndMultipleDrainTicksAccumulate verifies that successive +// drain ticks from separate source batches accumulate into the statsengine +// correctly, ensuring the periodic drain loop handles multiple rounds. +func TestAggregateEndToEndMultipleDrainTicksAccumulate(t *testing.T) { + engine := statsengine.NewEngine(statsengine.DefaultTopN) + src := &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{ + {{TraceID: types.SYS_ENTER_FUTEX, Count: 3, TotalLatencyNs: 300, MinLatencyNs: 100, MaxLatencyNs: 100}}, + {{TraceID: types.SYS_ENTER_FUTEX, Count: 7, TotalLatencyNs: 700, MinLatencyNs: 50, MaxLatencyNs: 200}}, + }, + } + + // Use a short drain interval so the ticker fires for both batches. + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateDrainEvery: time.Millisecond, + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + aggregateSrc: src, + aggregateSink: engine, + } + el.SetFilter(globalfilter.Filter{}) + + ctx, cancel := context.WithCancel(context.Background()) + stop := el.startAggregateDrainLoop(ctx) + // Allow enough time for both tick rounds to fire. + time.Sleep(10 * time.Millisecond) + cancel() + stop() + + snap, err := engine.Snapshot() + if err != nil { + t.Fatalf("snapshot error: %v", err) + } + if snap.TotalSyscalls != 10 { + t.Fatalf("TotalSyscalls = %d, want 10 (3+7)", snap.TotalSyscalls) + } + + futexRow := findSyscallSnapshot(t, snap.Syscalls(), types.SYS_ENTER_FUTEX) + if futexRow.Count != 10 { + t.Fatalf("futex Count = %d, want 10", futexRow.Count) + } + // Min should reflect the lowest across both batches (50). + if futexRow.LatencyMinNs != 50 { + t.Fatalf("futex LatencyMinNs = %d, want 50", futexRow.LatencyMinNs) + } + // Max should reflect the highest across both batches (200). + if futexRow.LatencyMaxNs != 200 { + t.Fatalf("futex LatencyMaxNs = %d, want 200", futexRow.LatencyMaxNs) + } +} + +// TestAggregateEndToEndNonDesignatedSyscallsFiltered verifies that only +// aggregate-only designated trace IDs reach the statsengine. A source emitting +// rows for both futex (designated) and clock_gettime (not designated) should +// only produce a futex row in the snapshot. +func TestAggregateEndToEndNonDesignatedSyscallsFiltered(t *testing.T) { + engine := statsengine.NewEngine(statsengine.DefaultTopN) + src := &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 4, TotalLatencyNs: 400, MinLatencyNs: 100, MaxLatencyNs: 100}, + {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 8, TotalLatencyNs: 800, MinLatencyNs: 100, MaxLatencyNs: 100}, + }}, + } + + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateDrainEvery: 5 * time.Second, + // Only futex is aggregate-only; clock_gettime should be dropped. + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + aggregateSrc: src, + aggregateSink: engine, + } + el.SetFilter(globalfilter.Filter{}) + + ctx, cancel := context.WithCancel(context.Background()) + stop := el.startAggregateDrainLoop(ctx) + cancel() + stop() + + snap, err := engine.Snapshot() + if err != nil { + t.Fatalf("snapshot error: %v", err) + } + if snap.TotalSyscalls != 4 { + t.Fatalf("TotalSyscalls = %d, want 4 (only futex)", snap.TotalSyscalls) + } + + // Futex should be present. + futexRow := findSyscallSnapshot(t, snap.Syscalls(), types.SYS_ENTER_FUTEX) + if futexRow.Count != 4 { + t.Fatalf("futex Count = %d, want 4", futexRow.Count) + } + + // clock_gettime should be absent. + for _, row := range snap.Syscalls() { + if row.TraceID == types.SYS_ENTER_CLOCK_GETTIME { + t.Fatalf("clock_gettime should not appear in snapshot, got %+v", row) + } + } +} + +// TestAggregateEndToEndFilterGateBlocksIngestion verifies that when an +// unsupported filter (e.g. comm filter) is active, aggregate rows never +// reach the statsengine even though the source emits data. +func TestAggregateEndToEndFilterGateBlocksIngestion(t *testing.T) { + engine := statsengine.NewEngine(statsengine.DefaultTopN) + src := &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 10, TotalLatencyNs: 1000, MinLatencyNs: 100, MaxLatencyNs: 100}, + }}, + } + + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateDrainEvery: 5 * time.Second, + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + aggregateSrc: src, + aggregateSink: engine, + } + // Set a comm filter that blocks aggregate ingestion. + el.SetFilter(globalfilter.Filter{ + Comm: &globalfilter.StringFilter{Pattern: "myapp"}, + }) + + ctx, cancel := context.WithCancel(context.Background()) + stop := el.startAggregateDrainLoop(ctx) + cancel() + stop() + + snap, err := engine.Snapshot() + if err != nil { + t.Fatalf("snapshot error: %v", err) + } + if snap.TotalSyscalls != 0 { + t.Fatalf("TotalSyscalls = %d, want 0 (comm filter should block aggregates)", snap.TotalSyscalls) + } + if len(snap.Syscalls()) != 0 { + t.Fatalf("expected no syscall rows with comm filter active, got %d", len(snap.Syscalls())) + } +} + +// TestAggregateEndToEndFamilyRowPopulated verifies that aggregate-only +// ingestion also populates the per-family snapshot rows, not just per-syscall. +func TestAggregateEndToEndFamilyRowPopulated(t *testing.T) { + engine := statsengine.NewEngine(statsengine.DefaultTopN) + src := &aggregateSourceStub{ + rows: [][]statsengine.SyscallAggregate{{ + { + TraceID: types.SYS_ENTER_FUTEX, + Count: 6, + TotalLatencyNs: 600, + MinLatencyNs: 50, + MaxLatencyNs: 200, + }, + }}, + } + + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateDrainEvery: 5 * time.Second, + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + aggregateSrc: src, + aggregateSink: engine, + } + el.SetFilter(globalfilter.Filter{}) + + ctx, cancel := context.WithCancel(context.Background()) + stop := el.startAggregateDrainLoop(ctx) + cancel() + stop() + + snap, err := engine.Snapshot() + if err != nil { + t.Fatalf("snapshot error: %v", err) + } + + // Futex belongs to a syscall family; verify the family row is populated. + futexFamily := types.SYS_ENTER_FUTEX.Family() + var found bool + for _, fam := range snap.Families() { + if fam.Family == futexFamily { + found = true + if fam.Count != 6 { + t.Fatalf("family %s Count = %d, want 6", futexFamily, fam.Count) + } + if fam.TotalLatencyNs != 600 { + t.Fatalf("family %s TotalLatencyNs = %d, want 600", futexFamily, fam.TotalLatencyNs) + } + break + } + } + if !found { + t.Fatalf("expected family row for %s, got families: %+v", futexFamily, snap.Families()) + } +} + +// findSyscallSnapshot locates a SyscallSnapshot by trace ID and fails the test +// if no matching row exists. +func findSyscallSnapshot(t *testing.T, rows []statsengine.SyscallSnapshot, id types.TraceId) statsengine.SyscallSnapshot { + t.Helper() + for _, row := range rows { + if row.TraceID == id { + return row + } + } + t.Fatalf("no syscall snapshot row for trace ID %v", id) + return statsengine.SyscallSnapshot{} +} |
