summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-20 14:13:21 +0300
committerPaul Buetow <paul@buetow.org>2026-05-20 14:13:21 +0300
commit13d2c3ec8deb759308a8f0a28374470ca9bb1e9d (patch)
tree20f1d9ac5540d63a415223c7c5b4bb6f1e8ecec8
parent9310b54d439d4a1a8d4d337987aa63884df0af76 (diff)
fix(task-17): prevent aggregate double-count and flush on shutdown
-rw-r--r--integrationtests/sampling_test.go7
-rw-r--r--internal/eventloop.go1
-rw-r--r--internal/eventloop_aggregate_test.go91
-rw-r--r--internal/eventloop_runtime.go75
-rw-r--r--internal/ior.go13
-rw-r--r--internal/syscall_aggregate_consumer.go10
-rw-r--r--internal/syscall_aggregate_consumer_test.go18
7 files changed, 188 insertions, 27 deletions
diff --git a/integrationtests/sampling_test.go b/integrationtests/sampling_test.go
index 7bfbc75..34c1fdb 100644
--- a/integrationtests/sampling_test.go
+++ b/integrationtests/sampling_test.go
@@ -20,4 +20,11 @@ func TestPerSyscallSamplingAggregateOnlySuppressesRingbufEvents(t *testing.T) {
Comm: "ioworkload",
},
})
+ AssertEventsPresent(t, result, []ExpectedEvent{
+ {
+ Tracepoint: "enter_close",
+ Comm: "ioworkload",
+ MinCount: 1,
+ },
+ })
}
diff --git a/internal/eventloop.go b/internal/eventloop.go
index 62ffba6..71c7982 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -45,6 +45,7 @@ type eventLoopConfig struct {
fdTracker *fdTracker
commResolver *commResolver
aggregateDrainEvery time.Duration
+ aggregateOnlyTraceIDs map[types.TraceId]struct{}
}
type rawEventHandler func(raw []byte, ch chan<- *event.Pair)
diff --git a/internal/eventloop_aggregate_test.go b/internal/eventloop_aggregate_test.go
index 215c604..cfa9bb9 100644
--- a/internal/eventloop_aggregate_test.go
+++ b/internal/eventloop_aggregate_test.go
@@ -6,6 +6,7 @@ import (
"testing"
"time"
+ "ior/internal/globalfilter"
"ior/internal/statsengine"
"ior/internal/types"
)
@@ -37,7 +38,70 @@ func (s *aggregateSinkStub) IngestSyscallAggregates(rows []statsengine.SyscallAg
s.rows = append(s.rows, rows...)
}
-func TestStartAggregateDrainLoopIngestsRows(t *testing.T) {
+func TestFilterAggregateRowsForIngestAggregateOnlyTraceIDs(t *testing.T) {
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ }
+ el.SetFilter(globalfilter.Filter{})
+
+ in := []statsengine.SyscallAggregate{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5},
+ }
+ got := el.filterAggregateRowsForIngest(in)
+ if len(got) != 1 {
+ t.Fatalf("filtered rows len = %d, want 1", len(got))
+ }
+ if got[0].TraceID != types.SYS_ENTER_FUTEX {
+ t.Fatalf("filtered trace id = %v, want %v", got[0].TraceID, types.SYS_ENTER_FUTEX)
+ }
+}
+
+func TestFilterAggregateRowsForIngestGatesWhenUnsupportedFilterActive(t *testing.T) {
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ }
+ el.SetFilter(globalfilter.Filter{
+ Comm: &globalfilter.StringFilter{Pattern: "ioworkload"},
+ })
+
+ got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ })
+ if len(got) != 0 {
+ t.Fatalf("expected no rows when comm filter is active, got %+v", got)
+ }
+}
+
+func TestFilterAggregateRowsForIngestAllowsKernelEqPIDFilter(t *testing.T) {
+ el := &eventLoop{
+ cfg: eventLoopConfig{
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
+ }
+ el.SetFilter(globalfilter.Filter{
+ PID: globalfilter.NewEqFilter(42),
+ })
+
+ got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ })
+ if len(got) != 1 {
+ t.Fatalf("expected row with PID eq filter, got %+v", got)
+ }
+}
+
+func TestStartAggregateDrainLoopFinalFlushesOnStop(t *testing.T) {
src := &aggregateSourceStub{
rows: [][]statsengine.SyscallAggregate{
{{TraceID: types.SYS_ENTER_FUTEX, Count: 2}},
@@ -45,32 +109,29 @@ func TestStartAggregateDrainLoopIngestsRows(t *testing.T) {
}
sink := &aggregateSinkStub{}
el := &eventLoop{
- cfg: eventLoopConfig{aggregateDrainEvery: 2 * time.Millisecond},
+ cfg: eventLoopConfig{
+ aggregateDrainEvery: 5 * time.Second,
+ aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ },
aggregateSrc: src,
aggregateSink: sink,
}
+ el.SetFilter(globalfilter.Filter{})
ctx, cancel := context.WithCancel(context.Background())
stop := el.startAggregateDrainLoop(ctx)
- deadline := time.Now().Add(100 * time.Millisecond)
- for time.Now().Before(deadline) {
- sink.mu.Lock()
- done := len(sink.rows) > 0
- sink.mu.Unlock()
- if done {
- break
- }
- time.Sleep(2 * time.Millisecond)
- }
+ time.Sleep(2 * time.Millisecond)
cancel()
stop()
sink.mu.Lock()
defer sink.mu.Unlock()
- if len(sink.rows) == 0 {
- t.Fatal("expected drained aggregate rows")
+ if len(sink.rows) != 1 {
+ t.Fatalf("final flush rows len = %d, want 1", len(sink.rows))
}
if sink.rows[0].TraceID != types.SYS_ENTER_FUTEX || sink.rows[0].Count != 2 {
- t.Fatalf("drained row = %+v, want futex count=2", sink.rows[0])
+ t.Fatalf("final flush row = %+v, want futex count=2", sink.rows[0])
}
}
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go
index f80f271..697de07 100644
--- a/internal/eventloop_runtime.go
+++ b/internal/eventloop_runtime.go
@@ -4,10 +4,13 @@ import (
"context"
"fmt"
"runtime/debug"
+ "strings"
"time"
"ior/internal/event"
"ior/internal/file"
+ "ior/internal/globalfilter"
+ "ior/internal/statsengine"
"ior/internal/types"
)
@@ -59,19 +62,79 @@ func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() {
case <-stop:
return
case <-ticker.C:
- rows, err := e.aggregateSrc.Drain()
- if err != nil {
- e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err))
- continue
- }
- e.aggregateSink.IngestSyscallAggregates(rows)
+ e.drainAggregatesOnce()
}
}
}()
return func() {
close(stop)
<-done
+ e.drainAggregatesOnce()
+ }
+}
+
+func (e *eventLoop) drainAggregatesOnce() {
+ rows, err := e.aggregateSrc.Drain()
+ if err != nil {
+ e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err))
+ return
+ }
+ rows = e.filterAggregateRowsForIngest(rows)
+ if len(rows) == 0 {
+ return
+ }
+ e.aggregateSink.IngestSyscallAggregates(rows)
+}
+
+func (e *eventLoop) filterAggregateRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate {
+ if len(rows) == 0 {
+ return nil
+ }
+ if !aggregateIngestAllowedForFilter(e.Filter()) {
+ return nil
+ }
+ if len(e.cfg.aggregateOnlyTraceIDs) == 0 {
+ return nil
+ }
+
+ filtered := make([]statsengine.SyscallAggregate, 0, len(rows))
+ for _, row := range rows {
+ if _, ok := e.cfg.aggregateOnlyTraceIDs[row.TraceID]; ok {
+ filtered = append(filtered, row)
+ }
+ }
+ return filtered
+}
+
+func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool {
+ if filter.ErrorsOnly {
+ return false
+ }
+ if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) {
+ return false
+ }
+ if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil {
+ return false
}
+ if !isKernelFilterNumericEq(filter.PID) {
+ return false
+ }
+ if !isKernelFilterNumericEq(filter.TID) {
+ return false
+ }
+ return true
+}
+
+func isKernelFilterNumericEq(filter *globalfilter.NumericFilter) bool {
+ if filter == nil {
+ return true
+ }
+ _, ok := filter.EqValue()
+ return ok
+}
+
+func hasPattern(filter *globalfilter.StringFilter) bool {
+ return filter != nil && strings.TrimSpace(filter.Pattern) != ""
}
func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) {
diff --git a/internal/ior.go b/internal/ior.go
index 3afb09a..61be397 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -379,12 +379,13 @@ func newEventLoopConfig(cfg flags.Config) eventLoopConfig {
fields := make([]string, len(cfg.CollapsedFields))
copy(fields, cfg.CollapsedFields)
return eventLoopConfig{
- pidFilter: cfg.PidFilter,
- filter: traceFilterFromConfig(cfg),
- collapsedFields: fields,
- countField: cfg.CountField,
- pprofEnable: cfg.PprofEnable,
- plainMode: cfg.PlainMode,
+ pidFilter: cfg.PidFilter,
+ filter: traceFilterFromConfig(cfg),
+ collapsedFields: fields,
+ countField: cfg.CountField,
+ pprofEnable: cfg.PprofEnable,
+ plainMode: cfg.PlainMode,
+ aggregateOnlyTraceIDs: buildAggregateOnlyTraceIDs(cfg),
}
}
diff --git a/internal/syscall_aggregate_consumer.go b/internal/syscall_aggregate_consumer.go
index 108bbeb..dadd38e 100644
--- a/internal/syscall_aggregate_consumer.go
+++ b/internal/syscall_aggregate_consumer.go
@@ -127,3 +127,13 @@ func buildSyscallSamplingRates(cfg flags.Config) map[types.TraceId]uint32 {
}
return rates
}
+
+func buildAggregateOnlyTraceIDs(cfg flags.Config) map[types.TraceId]struct{} {
+ ids := make(map[types.TraceId]struct{})
+ for traceID, rate := range buildSyscallSamplingRates(cfg) {
+ if rate == 0 {
+ ids[traceID] = struct{}{}
+ }
+ }
+ return ids
+}
diff --git a/internal/syscall_aggregate_consumer_test.go b/internal/syscall_aggregate_consumer_test.go
index 1f6e856..362dfba 100644
--- a/internal/syscall_aggregate_consumer_test.go
+++ b/internal/syscall_aggregate_consumer_test.go
@@ -23,6 +23,24 @@ func TestBuildSyscallSamplingRatesFamilyAndSyscallOverride(t *testing.T) {
}
}
+func TestBuildAggregateOnlyTraceIDs(t *testing.T) {
+ cfg := flags.NewFlags()
+ cfg.SyscallFamilySamplingRates[types.FamilyTime] = 10
+ cfg.SyscallSamplingRates["futex"] = 0
+ cfg.SyscallSamplingRates["clock_gettime"] = 0
+
+ ids := buildAggregateOnlyTraceIDs(cfg)
+ if _, ok := ids[types.SYS_ENTER_FUTEX]; !ok {
+ t.Fatal("expected futex in aggregate-only set")
+ }
+ if _, ok := ids[types.SYS_ENTER_CLOCK_GETTIME]; !ok {
+ t.Fatal("expected clock_gettime in aggregate-only set")
+ }
+ if _, ok := ids[types.SYS_ENTER_NANOSLEEP]; ok {
+ t.Fatal("did not expect nanosleep in aggregate-only set")
+ }
+}
+
func TestDecodeRawSyscallAggregate(t *testing.T) {
want := rawSyscallAggregate{
Count: 7,