summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-23 18:44:28 +0300
committerPaul Buetow <paul@buetow.org>2026-05-23 18:44:28 +0300
commitb138aed54ff23c3b5557b1336835c9e37137b020 (patch)
treec16aa5d5acace9ffe1e6f68570ddbadc531bc217
parentd300847a794fd9cc823747320d53422880072b3b (diff)
8c extract aggregate draining and filter gating from eventLoop
Move aggregate drain scheduling, filter compatibility policy, trace-ID allowlisting, and warning construction into a dedicated aggregateDrainer type. eventLoop now only creates and ticks it. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
-rw-r--r--internal/aggregate_drainer.go129
-rw-r--r--internal/eventloop_aggregate_test.go142
-rw-r--r--internal/eventloop_runtime.go82
3 files changed, 235 insertions, 118 deletions
diff --git a/internal/aggregate_drainer.go b/internal/aggregate_drainer.go
new file mode 100644
index 0000000..87c5e1a
--- /dev/null
+++ b/internal/aggregate_drainer.go
@@ -0,0 +1,129 @@
+package internal
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+
+ "ior/internal/globalfilter"
+ "ior/internal/statsengine"
+ "ior/internal/types"
+)
+
+type aggregateDrainResult struct {
+ rows []statsengine.SyscallAggregate
+ warning string
+}
+
+type aggregateDrainer struct {
+ source syscallAggregateSource
+ filter func() globalfilter.Filter
+ aggregateOnlyTraceIDs map[types.TraceId]struct{}
+}
+
+func newAggregateDrainer(
+ source syscallAggregateSource,
+ aggregateOnlyTraceIDs map[types.TraceId]struct{},
+ filter func() globalfilter.Filter,
+) *aggregateDrainer {
+ return &aggregateDrainer{
+ source: source,
+ filter: filter,
+ aggregateOnlyTraceIDs: aggregateOnlyTraceIDs,
+ }
+}
+
+func (d *aggregateDrainer) Tick() aggregateDrainResult {
+ if d == nil || d.source == nil {
+ return aggregateDrainResult{}
+ }
+ rows, err := d.source.Drain()
+ if err != nil {
+ return aggregateDrainResult{warning: fmt.Sprintf("syscall aggregate drain failed: %v", err)}
+ }
+ rows = d.filterRowsForIngest(rows)
+ if len(rows) == 0 {
+ return aggregateDrainResult{}
+ }
+ return aggregateDrainResult{rows: rows}
+}
+
+func (d *aggregateDrainer) Start(ctx context.Context, every time.Duration, handle func(aggregateDrainResult)) func() {
+ if d == nil || d.source == nil {
+ return func() {}
+ }
+
+ done := make(chan struct{})
+ stop := make(chan struct{})
+ go func() {
+ defer close(done)
+ ticker := time.NewTicker(every)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-stop:
+ return
+ case <-ticker.C:
+ handle(d.Tick())
+ }
+ }
+ }()
+ return func() {
+ close(stop)
+ <-done
+ handle(d.Tick())
+ }
+}
+
+func (d *aggregateDrainer) filterRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate {
+ if len(rows) == 0 {
+ return nil
+ }
+ if !aggregateIngestAllowedForFilter(d.currentFilter()) {
+ return nil
+ }
+ if len(d.aggregateOnlyTraceIDs) == 0 {
+ return nil
+ }
+
+ filtered := make([]statsengine.SyscallAggregate, 0, len(rows))
+ for _, row := range rows {
+ if _, ok := d.aggregateOnlyTraceIDs[row.TraceID]; ok {
+ filtered = append(filtered, row)
+ }
+ }
+ return filtered
+}
+
+func (d *aggregateDrainer) currentFilter() globalfilter.Filter {
+ if d == nil || d.filter == nil {
+ return globalfilter.Filter{}
+ }
+ return d.filter()
+}
+
+func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool {
+ if filter.ErrorsOnly {
+ return false
+ }
+ if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) {
+ return false
+ }
+ if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil {
+ return false
+ }
+ if filter.PID != nil {
+ return false
+ }
+ if filter.TID != nil {
+ return false
+ }
+ return true
+}
+
+func hasPattern(filter *globalfilter.StringFilter) bool {
+ return filter != nil && strings.TrimSpace(filter.Pattern) != ""
+}
diff --git a/internal/eventloop_aggregate_test.go b/internal/eventloop_aggregate_test.go
index 582b598..78cf2a6 100644
--- a/internal/eventloop_aggregate_test.go
+++ b/internal/eventloop_aggregate_test.go
@@ -2,6 +2,7 @@ package internal
import (
"context"
+ "errors"
"sync"
"testing"
"time"
@@ -14,11 +15,15 @@ import (
type aggregateSourceStub struct {
mu sync.Mutex
rows [][]statsengine.SyscallAggregate
+ err error
}
func (s *aggregateSourceStub) Drain() ([]statsengine.SyscallAggregate, error) {
s.mu.Lock()
defer s.mu.Unlock()
+ if s.err != nil {
+ return nil, s.err
+ }
if len(s.rows) == 0 {
return nil, nil
}
@@ -38,50 +43,79 @@ func (s *aggregateSinkStub) IngestSyscallAggregates(rows []statsengine.SyscallAg
s.rows = append(s.rows, rows...)
}
-func TestFilterAggregateRowsForIngestAggregateOnlyTraceIDs(t *testing.T) {
- el := &eventLoop{
- cfg: eventLoopConfig{
- aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
- types.SYS_ENTER_FUTEX: {},
- },
+func TestAggregateDrainerTickFiltersAggregateOnlyTraceIDs(t *testing.T) {
+ drainer := newAggregateDrainer(
+ &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5},
+ }},
},
- }
- el.SetFilter(globalfilter.Filter{})
+ map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ func() globalfilter.Filter { return globalfilter.Filter{} },
+ )
- in := []statsengine.SyscallAggregate{
- {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
- {TraceID: types.SYS_ENTER_CLOCK_GETTIME, Count: 5},
+ got := drainer.Tick()
+ if got.warning != "" {
+ t.Fatalf("warning = %q, want empty", got.warning)
}
- got := el.filterAggregateRowsForIngest(in)
- if len(got) != 1 {
- t.Fatalf("filtered rows len = %d, want 1", len(got))
+ if len(got.rows) != 1 {
+ t.Fatalf("filtered rows len = %d, want 1", len(got.rows))
}
- if got[0].TraceID != types.SYS_ENTER_FUTEX {
- t.Fatalf("filtered trace id = %v, want %v", got[0].TraceID, types.SYS_ENTER_FUTEX)
+ if got.rows[0].TraceID != types.SYS_ENTER_FUTEX {
+ t.Fatalf("filtered trace id = %v, want %v", got.rows[0].TraceID, types.SYS_ENTER_FUTEX)
}
}
-func TestFilterAggregateRowsForIngestGatesWhenUnsupportedFilterActive(t *testing.T) {
- el := &eventLoop{
- cfg: eventLoopConfig{
- aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
- types.SYS_ENTER_FUTEX: {},
- },
+func TestAggregateDrainerTickGatesWhenUnsupportedFilterActive(t *testing.T) {
+ drainer := newAggregateDrainer(
+ &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ }},
+ },
+ map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ func() globalfilter.Filter {
+ return globalfilter.Filter{
+ Comm: &globalfilter.StringFilter{Pattern: "ioworkload"},
+ }
},
+ )
+
+ got := drainer.Tick()
+ if got.warning != "" {
+ t.Fatalf("warning = %q, want empty", got.warning)
+ }
+ if len(got.rows) != 0 {
+ t.Fatalf("expected no rows when comm filter is active, got %+v", got.rows)
}
- el.SetFilter(globalfilter.Filter{
- Comm: &globalfilter.StringFilter{Pattern: "ioworkload"},
- })
+}
- got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{
- {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
- })
- if len(got) != 0 {
- t.Fatalf("expected no rows when comm filter is active, got %+v", got)
+func TestAggregateDrainerTickRejectsRowsWithoutAggregateOnlyTraceIDs(t *testing.T) {
+ drainer := newAggregateDrainer(
+ &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ }},
+ },
+ nil,
+ func() globalfilter.Filter { return globalfilter.Filter{} },
+ )
+
+ got := drainer.Tick()
+ if got.warning != "" {
+ t.Fatalf("warning = %q, want empty", got.warning)
+ }
+ if len(got.rows) != 0 {
+ t.Fatalf("expected no rows without aggregate-only trace IDs, got %+v", got.rows)
}
}
-func TestFilterAggregateRowsForIngestRejectsPIDAndTIDFilters(t *testing.T) {
+func TestAggregateDrainerTickRejectsPIDAndTIDFilters(t *testing.T) {
tests := []struct {
name string
filter globalfilter.Filter
@@ -92,25 +126,47 @@ func TestFilterAggregateRowsForIngestRejectsPIDAndTIDFilters(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- el := &eventLoop{
- cfg: eventLoopConfig{
- aggregateOnlyTraceIDs: map[types.TraceId]struct{}{
- types.SYS_ENTER_FUTEX: {},
- },
+ drainer := newAggregateDrainer(
+ &aggregateSourceStub{
+ rows: [][]statsengine.SyscallAggregate{{
+ {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
+ }},
},
- }
- el.SetFilter(tt.filter)
+ map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ func() globalfilter.Filter { return tt.filter },
+ )
- got := el.filterAggregateRowsForIngest([]statsengine.SyscallAggregate{
- {TraceID: types.SYS_ENTER_FUTEX, Count: 2},
- })
- if len(got) != 0 {
- t.Fatalf("expected no aggregate rows with %s filter, got %+v", tt.name, got)
+ got := drainer.Tick()
+ if got.warning != "" {
+ t.Fatalf("warning = %q, want empty", got.warning)
+ }
+ if len(got.rows) != 0 {
+ t.Fatalf("expected no aggregate rows with %s filter, got %+v", tt.name, got.rows)
}
})
}
}
+func TestAggregateDrainerTickReturnsDrainWarning(t *testing.T) {
+ drainer := newAggregateDrainer(
+ &aggregateSourceStub{err: errors.New("boom")},
+ map[types.TraceId]struct{}{
+ types.SYS_ENTER_FUTEX: {},
+ },
+ func() globalfilter.Filter { return globalfilter.Filter{} },
+ )
+
+ got := drainer.Tick()
+ if len(got.rows) != 0 {
+ t.Fatalf("rows len = %d, want 0", len(got.rows))
+ }
+ if got.warning != "syscall aggregate drain failed: boom" {
+ t.Fatalf("warning = %q, want drain failure", got.warning)
+ }
+}
+
func TestStartAggregateDrainLoopFinalFlushesOnStop(t *testing.T) {
src := &aggregateSourceStub{
rows: [][]statsengine.SyscallAggregate{
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go
index 24214ca..c4fa6b0 100644
--- a/internal/eventloop_runtime.go
+++ b/internal/eventloop_runtime.go
@@ -4,13 +4,10 @@ import (
"context"
"fmt"
"runtime/debug"
- "strings"
"time"
"ior/internal/event"
"ior/internal/file"
- "ior/internal/globalfilter"
- "ior/internal/statsengine"
"ior/internal/types"
)
@@ -49,84 +46,19 @@ func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() {
return func() {}
}
- done := make(chan struct{})
- stop := make(chan struct{})
- go func() {
- defer close(done)
- ticker := time.NewTicker(e.cfg.aggregateDrainEvery)
- defer ticker.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-stop:
- return
- case <-ticker.C:
- e.drainAggregatesOnce()
- }
- }
- }()
- return func() {
- close(stop)
- <-done
- e.drainAggregatesOnce()
- }
+ drainer := newAggregateDrainer(e.aggregateSrc, e.cfg.aggregateOnlyTraceIDs, e.Filter)
+ return drainer.Start(ctx, e.cfg.aggregateDrainEvery, e.handleAggregateDrainResult)
}
-func (e *eventLoop) drainAggregatesOnce() {
- rows, err := e.aggregateSrc.Drain()
- if err != nil {
- e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err))
+func (e *eventLoop) handleAggregateDrainResult(result aggregateDrainResult) {
+ if result.warning != "" {
+ e.notifyWarning(result.warning)
return
}
- rows = e.filterAggregateRowsForIngest(rows)
- if len(rows) == 0 {
+ if len(result.rows) == 0 {
return
}
- e.aggregateSink.IngestSyscallAggregates(rows)
-}
-
-func (e *eventLoop) filterAggregateRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate {
- if len(rows) == 0 {
- return nil
- }
- if !aggregateIngestAllowedForFilter(e.Filter()) {
- return nil
- }
- if len(e.cfg.aggregateOnlyTraceIDs) == 0 {
- return nil
- }
-
- filtered := make([]statsengine.SyscallAggregate, 0, len(rows))
- for _, row := range rows {
- if _, ok := e.cfg.aggregateOnlyTraceIDs[row.TraceID]; ok {
- filtered = append(filtered, row)
- }
- }
- return filtered
-}
-
-func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool {
- if filter.ErrorsOnly {
- return false
- }
- if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) {
- return false
- }
- if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil {
- return false
- }
- if filter.PID != nil {
- return false
- }
- if filter.TID != nil {
- return false
- }
- return true
-}
-
-func hasPattern(filter *globalfilter.StringFilter) bool {
- return filter != nil && strings.TrimSpace(filter.Pattern) != ""
+ e.aggregateSink.IngestSyscallAggregates(result.rows)
}
func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) {