summaryrefslogtreecommitdiff
path: root/internal/eventloop_runtime.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-20 14:13:21 +0300
committerPaul Buetow <paul@buetow.org>2026-05-20 14:13:21 +0300
commit13d2c3ec8deb759308a8f0a28374470ca9bb1e9d (patch)
tree20f1d9ac5540d63a415223c7c5b4bb6f1e8ecec8 /internal/eventloop_runtime.go
parent9310b54d439d4a1a8d4d337987aa63884df0af76 (diff)
fix(task-17): prevent aggregate double-count and flush on shutdown
Diffstat (limited to 'internal/eventloop_runtime.go')
-rw-r--r--internal/eventloop_runtime.go75
1 files changed, 69 insertions, 6 deletions
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go
index f80f271..697de07 100644
--- a/internal/eventloop_runtime.go
+++ b/internal/eventloop_runtime.go
@@ -4,10 +4,13 @@ import (
"context"
"fmt"
"runtime/debug"
+ "strings"
"time"
"ior/internal/event"
"ior/internal/file"
+ "ior/internal/globalfilter"
+ "ior/internal/statsengine"
"ior/internal/types"
)
@@ -59,19 +62,79 @@ func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() {
case <-stop:
return
case <-ticker.C:
- rows, err := e.aggregateSrc.Drain()
- if err != nil {
- e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err))
- continue
- }
- e.aggregateSink.IngestSyscallAggregates(rows)
+ e.drainAggregatesOnce()
}
}
}()
return func() {
close(stop)
<-done
+ e.drainAggregatesOnce()
+ }
+}
+
+func (e *eventLoop) drainAggregatesOnce() {
+ rows, err := e.aggregateSrc.Drain()
+ if err != nil {
+ e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err))
+ return
+ }
+ rows = e.filterAggregateRowsForIngest(rows)
+ if len(rows) == 0 {
+ return
+ }
+ e.aggregateSink.IngestSyscallAggregates(rows)
+}
+
+func (e *eventLoop) filterAggregateRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate {
+ if len(rows) == 0 {
+ return nil
+ }
+ if !aggregateIngestAllowedForFilter(e.Filter()) {
+ return nil
+ }
+ if len(e.cfg.aggregateOnlyTraceIDs) == 0 {
+ return nil
+ }
+
+ filtered := make([]statsengine.SyscallAggregate, 0, len(rows))
+ for _, row := range rows {
+ if _, ok := e.cfg.aggregateOnlyTraceIDs[row.TraceID]; ok {
+ filtered = append(filtered, row)
+ }
+ }
+ return filtered
+}
+
+func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool {
+ if filter.ErrorsOnly {
+ return false
+ }
+ if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) {
+ return false
+ }
+ if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil {
+ return false
}
+ if !isKernelFilterNumericEq(filter.PID) {
+ return false
+ }
+ if !isKernelFilterNumericEq(filter.TID) {
+ return false
+ }
+ return true
+}
+
+func isKernelFilterNumericEq(filter *globalfilter.NumericFilter) bool {
+ if filter == nil {
+ return true
+ }
+ _, ok := filter.EqValue()
+ return ok
+}
+
+func hasPattern(filter *globalfilter.StringFilter) bool {
+ return filter != nil && strings.TrimSpace(filter.Pattern) != ""
}
func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) {