summaryrefslogtreecommitdiff
path: root/internal/eventloop_aggregate_test.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-20 14:13:21 +0300
committerPaul Buetow <paul@buetow.org>2026-05-20 14:13:21 +0300
commit13d2c3ec8deb759308a8f0a28374470ca9bb1e9d (patch)
tree20f1d9ac5540d63a415223c7c5b4bb6f1e8ecec8 /internal/eventloop_aggregate_test.go
parent9310b54d439d4a1a8d4d337987aa63884df0af76 (diff)
fix(task-17): prevent aggregate double-count and flush on shutdown
Diffstat (limited to 'internal/eventloop_aggregate_test.go')
-rw-r--r--internal/eventloop_aggregate_test.go91
1 files changed, 76 insertions, 15 deletions
diff --git a/internal/eventloop_aggregate_test.go b/internal/eventloop_aggregate_test.go
index 215c604..cfa9bb9 100644
--- a/internal/eventloop_aggregate_test.go
+++ b/internal/eventloop_aggregate_test.go
@@ -6,6 +6,7 @@ import (
"testing"
"time"
+ "ior/internal/globalfilter"
"ior/internal/statsengine"
"ior/internal/types"
)
@@ -37,7 +38,70 @@ func (s *aggregateSinkStub) IngestSyscallAggregates(rows []statsengine.SyscallAg
s.rows = append(s.rows, rows...)
}
-func TestStartAggregateDrainLoopIngestsRows(t *testing.T) {
+func TestFilterAggregateRowsForIngestAggregateOnlyTraceIDs(t *testing.T) {
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ }
+ el.SetFilter(globalfilter.Filter{})
+
+ in := []statsengine.SyscallAggregate{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5},
+ }
+ got := el.filterAggregateRowsForIngest(in)
+ if len(got) != 1 {
+ t.Fatalf("filtered rows len = %d, want 1", len(got))
+ }
+ if got[0].TraceID != types.SYS_ENTER_FUTEX {
+ t.Fatalf("filtered trace id = %v, want %v", got[0].TraceID, types.SYS_ENTER_FUTEX)
+ }
+}
+
+func TestFilterAggregateRowsForIngestGatesWhenUnsupportedFilterActive(t *testing.T) {
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ }
+ el.SetFilter(globalfilter.Filter{
+ Comm: &globalfilter.StringFilter{Pattern: "ioworkload"},
+ })
+
+ got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ })
+ if len(got) != 0 {
+ t.Fatalf("expected no rows when comm filter is active, got %+v", got)
+ }
+}
+
+func TestFilterAggregateRowsForIngestAllowsKernelEqPIDFilter(t *testing.T) {
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ }
+ el.SetFilter(globalfilter.Filter{
+ PID: globalfilter.NewEqFilter(42),
+ })
+
+ got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ })
+ if len(got) != 1 {
+ t.Fatalf("expected row with PID eq filter, got %+v", got)
+ }
+}
+
+func TestStartAggregateDrainLoopFinalFlushesOnStop(t *testing.T) {
src := &aggregateSourceStub{
rows: [][]statsengine.SyscallAggregate{
{{TraceID: types.SYS_ENTER_FUTEX, Count: 2}},
@@ -45,32 +109,29 @@ func TestStartAggregateDrainLoopIngestsRows(t *testing.T) {
}
sink := &aggregateSinkStub{}
el := &eventLoop{
- cfg: eventLoopConfig{aggregateDrainEvery: 2 * time.Millisecond},
+ cfg: eventLoopConfig{
+ aggregateDrainEvery: 5 * time.Second,
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
aggregateSrc: src,
aggregateSink: sink,
}
+ el.SetFilter(globalfilter.Filter{})
ctx, cancel := context.WithCancel(context.Background())
stop := el.startAggregateDrainLoop(ctx)
- deadline := time.Now().Add(100 * time.Millisecond)
- for time.Now().Before(deadline) {
- sink.mu.Lock()
- done := len(sink.rows) > 0
- sink.mu.Unlock()
- if done {
- break
- }
- time.Sleep(2 * time.Millisecond)
- }
+ time.Sleep(2 * time.Millisecond)
cancel()
stop()
sink.mu.Lock()
defer sink.mu.Unlock()
- if len(sink.rows) == 0 {
- t.Fatal("expected drained aggregate rows")
+ if len(sink.rows) != 1 {
+ t.Fatalf("final flush rows len = %d, want 1", len(sink.rows))
}
if sink.rows[0].TraceID != types.SYS_ENTER_FUTEX || sink.rows[0].Count != 2 {
- t.Fatalf("drained row = %+v, want futex count=2", sink.rows[0])
+ t.Fatalf("final flush row = %+v, want futex count=2", sink.rows[0])
}
}