diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-20 14:13:21 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-20 14:13:21 +0300 |
| commit | 13d2c3ec8deb759308a8f0a28374470ca9bb1e9d (patch) | |
| tree | 20f1d9ac5540d63a415223c7c5b4bb6f1e8ecec8 /internal/eventloop_aggregate_test.go | |
| parent | 9310b54d439d4a1a8d4d337987aa63884df0af76 (diff) | |
fix(task-17): prevent aggregate double-count and flush on shutdown
Diffstat (limited to 'internal/eventloop_aggregate_test.go')
| -rw-r--r-- | internal/eventloop_aggregate_test.go | 91 |
1 files changed, 76 insertions, 15 deletions
diff --git a/internal/eventloop_aggregate_test.go b/internal/eventloop_aggregate_test.go index 215c604..cfa9bb9 100644 --- a/internal/eventloop_aggregate_test.go +++ b/internal/eventloop_aggregate_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "ior/internal/globalfilter" "ior/internal/statsengine" "ior/internal/types" ) @@ -37,7 +38,70 @@ func (s *aggregateSinkStub) IngestSyscallAggregates(rows []statsengine.SyscallAg s.rows = append(s.rows, rows...) } -func TestStartAggregateDrainLoopIngestsRows(t *testing.T) { +func TestFilterAggregateRowsForIngestAggregateOnlyTraceIDs(t *testing.T) { + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + } + el.SetFilter(globalfilter.Filter{}) + + in := []statsengine.SyscallAggregate{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5}, + } + got := el.filterAggregateRowsForIngest(in) + if len(got) != 1 { + t.Fatalf("filtered rows len = %d, want 1", len(got)) + } + if got[0].TraceID != types.SYS_ENTER_FUTEX { + t.Fatalf("filtered trace id = %v, want %v", got[0].TraceID, types.SYS_ENTER_FUTEX) + } +} + +func TestFilterAggregateRowsForIngestGatesWhenUnsupportedFilterActive(t *testing.T) { + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + } + el.SetFilter(globalfilter.Filter{ + Comm: &globalfilter.StringFilter{Pattern: "ioworkload"}, + }) + + got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + }) + if len(got) != 0 { + t.Fatalf("expected no rows when comm filter is active, got %+v", got) + } +} + +func TestFilterAggregateRowsForIngestAllowsKernelEqPIDFilter(t *testing.T) { + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + } + el.SetFilter(globalfilter.Filter{ + PID: globalfilter.NewEqFilter(42), + }) + + got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + }) + if len(got) != 1 { + t.Fatalf("expected row with PID eq filter, got %+v", got) + } +} + +func TestStartAggregateDrainLoopFinalFlushesOnStop(t *testing.T) { src := &aggregateSourceStub{ rows: [][]statsengine.SyscallAggregate{ {{TraceID: types.SYS_ENTER_FUTEX, Count: 2}}, @@ -45,32 +109,29 @@ func TestStartAggregateDrainLoopIngestsRows(t *testing.T) { } sink := &aggregateSinkStub{} el := &eventLoop{ - cfg: eventLoopConfig{aggregateDrainEvery: 2 * time.Millisecond}, + cfg: eventLoopConfig{ + aggregateDrainEvery: 5 * time.Second, + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, aggregateSrc: src, aggregateSink: sink, } + el.SetFilter(globalfilter.Filter{}) ctx, cancel := context.WithCancel(context.Background()) stop := el.startAggregateDrainLoop(ctx) - deadline := time.Now().Add(100 * time.Millisecond) - for time.Now().Before(deadline) { - sink.mu.Lock() - done := len(sink.rows) > 0 - sink.mu.Unlock() - if done { - break - } - time.Sleep(2 * time.Millisecond) - } + time.Sleep(2 * time.Millisecond) cancel() stop() sink.mu.Lock() defer sink.mu.Unlock() - if len(sink.rows) == 0 { - t.Fatal("expected drained aggregate rows") + if len(sink.rows) != 1 { + t.Fatalf("final flush rows len = %d, want 1", len(sink.rows)) } if sink.rows[0].TraceID != types.SYS_ENTER_FUTEX || sink.rows[0].Count != 2 { - t.Fatalf("drained row = %+v, want futex count=2", sink.rows[0]) + t.Fatalf("final flush row = %+v, want futex count=2", sink.rows[0]) } } |
