diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-20 11:38:19 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-20 11:38:19 +0300 |
| commit | 9310b54d439d4a1a8d4d337987aa63884df0af76 (patch) | |
| tree | c6fb38085891a04ce81672f977af316a2e96b2fd /internal/eventloop.go | |
| parent | 5fd613562e2aa2ab3aac3349f44db88330046c1c (diff) | |
feat: add syscall aggregate sampling infrastructure (task 17)
Diffstat (limited to 'internal/eventloop.go')
| -rw-r--r-- | internal/eventloop.go | 32 |
1 files changed, 24 insertions, 8 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go index 0fe327c..62ffba6 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -8,6 +8,7 @@ import ( "ior/internal/event" "ior/internal/file" "ior/internal/globalfilter" + "ior/internal/statsengine" "ior/internal/types" ) @@ -20,8 +21,17 @@ const ( defaultMaxPendingHandleEntries = 8192 defaultMaxProcFdCacheSize = 8192 cacheTrimDivisor = 4 + defaultAggregateDrainEvery = time.Second ) +type syscallAggregateSource interface { + Drain() ([]statsengine.SyscallAggregate, error) +} + +type syscallAggregateSink interface { + IngestSyscallAggregates([]statsengine.SyscallAggregate) +} + type eventLoopConfig struct { pidFilter int filter globalfilter.Filter @@ -34,6 +44,7 @@ type eventLoopConfig struct { synchronousRawProcessing bool fdTracker *fdTracker commResolver *commResolver + aggregateDrainEvery time.Duration } type rawEventHandler func(raw []byte, ch chan<- *event.Pair) @@ -43,14 +54,16 @@ type eventLoop struct { // the TUI can swap filters in place via SetFilter without tearing down // and reattaching the BPF probes (the previous behavior caused a multi- // second 'Attaching tracepoints' overlay every time the filter changed). - filterPtr atomic.Pointer[globalfilter.Filter] - pairs pairTracker // enter/exit pairing state and inter-syscall duration tracking - pendingHandles *pendingHandleTracker // TID → pathname from name_to_handle_at, for open_by_handle_at correlation - fdTracker *fdTracker // fd table and procfs resolution cache - commResolver *commResolver - outputFormatter // pair-emission and warning-notification callbacks (embedded collaborator) - rawHandlers map[types.EventType]rawEventHandler - cfg eventLoopConfig + filterPtr atomic.Pointer[globalfilter.Filter] + pairs pairTracker // enter/exit pairing state and inter-syscall duration tracking + pendingHandles *pendingHandleTracker // TID → pathname from name_to_handle_at, for open_by_handle_at correlation + fdTracker *fdTracker // fd table and procfs resolution cache + commResolver *commResolver + outputFormatter // pair-emission and warning-notification callbacks (embedded collaborator) + rawHandlers map[types.EventType]rawEventHandler + cfg eventLoopConfig + aggregateSink syscallAggregateSink + aggregateSrc syscallAggregateSource // Statistics numTracepoints uint @@ -100,6 +113,9 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { cfg: cfg, done: make(chan struct{}), } + if el.cfg.aggregateDrainEvery <= 0 { + el.cfg.aggregateDrainEvery = defaultAggregateDrainEvery + } el.SetFilter(cfg.filter) el.initRawHandlers() el.configureOutputCallback() |
