summaryrefslogtreecommitdiff
path: root/internal/eventloop.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/eventloop.go')
-rw-r--r--internal/eventloop.go32
1 files changed, 24 insertions, 8 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go
index 0fe327c..62ffba6 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -8,6 +8,7 @@ import (
"ior/internal/event"
"ior/internal/file"
"ior/internal/globalfilter"
+ "ior/internal/statsengine"
"ior/internal/types"
)
@@ -20,8 +21,17 @@ const (
defaultMaxPendingHandleEntries = 8192
defaultMaxProcFdCacheSize = 8192
cacheTrimDivisor = 4
+ defaultAggregateDrainEvery = time.Second
)
+type syscallAggregateSource interface {
+ Drain() ([]statsengine.SyscallAggregate, error)
+}
+
+type syscallAggregateSink interface {
+ IngestSyscallAggregates([]statsengine.SyscallAggregate)
+}
+
type eventLoopConfig struct {
pidFilter int
filter globalfilter.Filter
@@ -34,6 +44,7 @@ type eventLoopConfig struct {
synchronousRawProcessing bool
fdTracker *fdTracker
commResolver *commResolver
+ aggregateDrainEvery time.Duration
}
type rawEventHandler func(raw []byte, ch chan<- *event.Pair)
@@ -43,14 +54,16 @@ type eventLoop struct {
// the TUI can swap filters in place via SetFilter without tearing down
// and reattaching the BPF probes (the previous behavior caused a multi-
// second 'Attaching tracepoints' overlay every time the filter changed).
- filterPtr atomic.Pointer[globalfilter.Filter]
- pairs pairTracker // enter/exit pairing state and inter-syscall duration tracking
- pendingHandles *pendingHandleTracker // TID → pathname from name_to_handle_at, for open_by_handle_at correlation
- fdTracker *fdTracker // fd table and procfs resolution cache
- commResolver *commResolver
- outputFormatter // pair-emission and warning-notification callbacks (embedded collaborator)
- rawHandlers map[types.EventType]rawEventHandler
- cfg eventLoopConfig
+ filterPtr atomic.Pointer[globalfilter.Filter]
+ pairs pairTracker // enter/exit pairing state and inter-syscall duration tracking
+ pendingHandles *pendingHandleTracker // TID → pathname from name_to_handle_at, for open_by_handle_at correlation
+ fdTracker *fdTracker // fd table and procfs resolution cache
+ commResolver *commResolver
+ outputFormatter // pair-emission and warning-notification callbacks (embedded collaborator)
+ rawHandlers map[types.EventType]rawEventHandler
+ cfg eventLoopConfig
+ aggregateSink syscallAggregateSink
+ aggregateSrc syscallAggregateSource
// Statistics
numTracepoints uint
@@ -100,6 +113,9 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) {
cfg: cfg,
done: make(chan struct{}),
}
+ if el.cfg.aggregateDrainEvery <= 0 {
+ el.cfg.aggregateDrainEvery = defaultAggregateDrainEvery
+ }
el.SetFilter(cfg.filter)
el.initRawHandlers()
el.configureOutputCallback()