diff options
| -rw-r--r-- | integrationtests/sampling_test.go | 7 | ||||
| -rw-r--r-- | internal/eventloop.go | 1 | ||||
| -rw-r--r-- | internal/eventloop_aggregate_test.go | 91 | ||||
| -rw-r--r-- | internal/eventloop_runtime.go | 75 | ||||
| -rw-r--r-- | internal/ior.go | 13 | ||||
| -rw-r--r-- | internal/syscall_aggregate_consumer.go | 10 | ||||
| -rw-r--r-- | internal/syscall_aggregate_consumer_test.go | 18 |
7 files changed, 188 insertions, 27 deletions
diff --git a/integrationtests/sampling_test.go b/integrationtests/sampling_test.go index 7bfbc75..34c1fdb 100644 --- a/integrationtests/sampling_test.go +++ b/integrationtests/sampling_test.go @@ -20,4 +20,11 @@ func TestPerSyscallSamplingAggregateOnlySuppressesRingbufEvents(t *testing.T) { Comm: "ioworkload", }, }) + AssertEventsPresent(t, result, []ExpectedEvent{ + { + Tracepoint: "enter_close", + Comm: "ioworkload", + MinCount: 1, + }, + }) } diff --git a/internal/eventloop.go b/internal/eventloop.go index 62ffba6..71c7982 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -45,6 +45,7 @@ type eventLoopConfig struct { fdTracker *fdTracker commResolver *commResolver aggregateDrainEvery time.Duration + aggregateOnlyTraceIDs map[types.TraceId]struct{} } type rawEventHandler func(raw []byte, ch chan<- *event.Pair) diff --git a/internal/eventloop_aggregate_test.go b/internal/eventloop_aggregate_test.go index 215c604..cfa9bb9 100644 --- a/internal/eventloop_aggregate_test.go +++ b/internal/eventloop_aggregate_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "ior/internal/globalfilter" "ior/internal/statsengine" "ior/internal/types" ) @@ -37,7 +38,70 @@ func (s *aggregateSinkStub) IngestSyscallAggregates(rows []statsengine.SyscallAg s.rows = append(s.rows, rows...) } -func TestStartAggregateDrainLoopIngestsRows(t *testing.T) { +func TestFilterAggregateRowsForIngestAggregateOnlyTraceIDs(t *testing.T) { + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + } + el.SetFilter(globalfilter.Filter{}) + + in := []statsengine.SyscallAggregate{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5}, + } + got := el.filterAggregateRowsForIngest(in) + if len(got) != 1 { + t.Fatalf("filtered rows len = %d, want 1", len(got)) + } + if got[0].TraceID != types.SYS_ENTER_FUTEX { + t.Fatalf("filtered trace id = %v, want %v", got[0].TraceID, types.SYS_ENTER_FUTEX) + } +} + +func TestFilterAggregateRowsForIngestGatesWhenUnsupportedFilterActive(t *testing.T) { + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + } + el.SetFilter(globalfilter.Filter{ + Comm: &globalfilter.StringFilter{Pattern: "ioworkload"}, + }) + + got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + }) + if len(got) != 0 { + t.Fatalf("expected no rows when comm filter is active, got %+v", got) + } +} + +func TestFilterAggregateRowsForIngestAllowsKernelEqPIDFilter(t *testing.T) { + el := &eventLoop{ + cfg: eventLoopConfig{ + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, + } + el.SetFilter(globalfilter.Filter{ + PID: globalfilter.NewEqFilter(42), + }) + + got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{ + {TraceID: types.SYS_ENTER_FUTEX, Count: 2}, + }) + if len(got) != 1 { + t.Fatalf("expected row with PID eq filter, got %+v", got) + } +} + +func TestStartAggregateDrainLoopFinalFlushesOnStop(t *testing.T) { src := &aggregateSourceStub{ rows: [][]statsengine.SyscallAggregate{ {{TraceID: types.SYS_ENTER_FUTEX, Count: 2}}, @@ -45,32 +109,29 @@ func TestStartAggregateDrainLoopIngestsRows(t *testing.T) { } sink := &aggregateSinkStub{} el := &eventLoop{ - cfg: eventLoopConfig{aggregateDrainEvery: 2 * time.Millisecond}, + cfg: eventLoopConfig{ + aggregateDrainEvery: 5 * time.Second, + aggregateOnlyTraceIDs: map[types.TraceId]struct{}{ + types.SYS_ENTER_FUTEX: {}, + }, + }, aggregateSrc: src, aggregateSink: sink, } + el.SetFilter(globalfilter.Filter{}) ctx, cancel := context.WithCancel(context.Background()) stop := el.startAggregateDrainLoop(ctx) - deadline := time.Now().Add(100 * time.Millisecond) - for time.Now().Before(deadline) { - sink.mu.Lock() - done := len(sink.rows) > 0 - sink.mu.Unlock() - if done { - break - } - time.Sleep(2 * time.Millisecond) - } + time.Sleep(2 * time.Millisecond) cancel() stop() sink.mu.Lock() defer sink.mu.Unlock() - if len(sink.rows) == 0 { - t.Fatal("expected drained aggregate rows") + if len(sink.rows) != 1 { + t.Fatalf("final flush rows len = %d, want 1", len(sink.rows)) } if sink.rows[0].TraceID != types.SYS_ENTER_FUTEX || sink.rows[0].Count != 2 { - t.Fatalf("drained row = %+v, want futex count=2", sink.rows[0]) + t.Fatalf("final flush row = %+v, want futex count=2", sink.rows[0]) } } diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index f80f271..697de07 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -4,10 +4,13 @@ import ( "context" "fmt" "runtime/debug" + "strings" "time" "ior/internal/event" "ior/internal/file" + "ior/internal/globalfilter" + "ior/internal/statsengine" "ior/internal/types" ) @@ -59,19 +62,79 @@ func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() { case <-stop: return case <-ticker.C: - rows, err := e.aggregateSrc.Drain() - if err != nil { - e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err)) - continue - } - e.aggregateSink.IngestSyscallAggregates(rows) + e.drainAggregatesOnce() } } }() return func() { close(stop) <-done + e.drainAggregatesOnce() + } +} + +func (e *eventLoop) drainAggregatesOnce() { + rows, err := e.aggregateSrc.Drain() + if err != nil { + e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err)) + return + } + rows = e.filterAggregateRowsForIngest(rows) + if len(rows) == 0 { + return + } + e.aggregateSink.IngestSyscallAggregates(rows) +} + +func (e *eventLoop) filterAggregateRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate { + if len(rows) == 0 { + return nil + } + if !aggregateIngestAllowedForFilter(e.Filter()) { + return nil + } + if len(e.cfg.aggregateOnlyTraceIDs) == 0 { + return nil + } + + filtered := make([]statsengine.SyscallAggregate, 0, len(rows)) + for _, row := range rows { + if _, ok := e.cfg.aggregateOnlyTraceIDs[row.TraceID]; ok { + filtered = append(filtered, row) + } + } + return filtered +} + +func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool { + if filter.ErrorsOnly { + return false + } + if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) { + return false + } + if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil { + return false } + if !isKernelFilterNumericEq(filter.PID) { + return false + } + if !isKernelFilterNumericEq(filter.TID) { + return false + } + return true +} + +func isKernelFilterNumericEq(filter *globalfilter.NumericFilter) bool { + if filter == nil { + return true + } + _, ok := filter.EqValue() + return ok +} + +func hasPattern(filter *globalfilter.StringFilter) bool { + return filter != nil && strings.TrimSpace(filter.Pattern) != "" } func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { diff --git a/internal/ior.go b/internal/ior.go index 3afb09a..61be397 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -379,12 +379,13 @@ func newEventLoopConfig(cfg flags.Config) eventLoopConfig { fields := make([]string, len(cfg.CollapsedFields)) copy(fields, cfg.CollapsedFields) return eventLoopConfig{ - pidFilter: cfg.PidFilter, - filter: traceFilterFromConfig(cfg), - collapsedFields: fields, - countField: cfg.CountField, - pprofEnable: cfg.PprofEnable, - plainMode: cfg.PlainMode, + pidFilter: cfg.PidFilter, + filter: traceFilterFromConfig(cfg), + collapsedFields: fields, + countField: cfg.CountField, + pprofEnable: cfg.PprofEnable, + plainMode: cfg.PlainMode, + aggregateOnlyTraceIDs: buildAggregateOnlyTraceIDs(cfg), } } diff --git a/internal/syscall_aggregate_consumer.go b/internal/syscall_aggregate_consumer.go index 108bbeb..dadd38e 100644 --- a/internal/syscall_aggregate_consumer.go +++ b/internal/syscall_aggregate_consumer.go @@ -127,3 +127,13 @@ func buildSyscallSamplingRates(cfg flags.Config) map[types.TraceId]uint32 { } return rates } + +func buildAggregateOnlyTraceIDs(cfg flags.Config) map[types.TraceId]struct{} { + ids := make(map[types.TraceId]struct{}) + for traceID, rate := range buildSyscallSamplingRates(cfg) { + if rate == 0 { + ids[traceID] = struct{}{} + } + } + return ids +} diff --git a/internal/syscall_aggregate_consumer_test.go b/internal/syscall_aggregate_consumer_test.go index 1f6e856..362dfba 100644 --- a/internal/syscall_aggregate_consumer_test.go +++ b/internal/syscall_aggregate_consumer_test.go @@ -23,6 +23,24 @@ func TestBuildSyscallSamplingRatesFamilyAndSyscallOverride(t *testing.T) { } } +func TestBuildAggregateOnlyTraceIDs(t *testing.T) { + cfg := flags.NewFlags() + cfg.SyscallFamilySamplingRates[types.FamilyTime] = 10 + cfg.SyscallSamplingRates["futex"] = 0 + cfg.SyscallSamplingRates["clock_gettime"] = 0 + + ids := buildAggregateOnlyTraceIDs(cfg) + if _, ok := ids[types.SYS_ENTER_FUTEX]; !ok { + t.Fatal("expected futex in aggregate-only set") + } + if _, ok := ids[types.SYS_ENTER_CLOCK_GETTIME]; !ok { + t.Fatal("expected clock_gettime in aggregate-only set") + } + if _, ok := ids[types.SYS_ENTER_NANOSLEEP]; ok { + t.Fatal("did not expect nanosleep in aggregate-only set") + } +} + func TestDecodeRawSyscallAggregate(t *testing.T) { want := rawSyscallAggregate{ Count: 7, |
