summaryrefslogtreecommitdiff
path: root/internal/eventloop_runtime.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/eventloop_runtime.go')
-rw-r--r--internal/eventloop_runtime.go43
1 files changed, 25 insertions, 18 deletions
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go
index 12d9f12..7f540ec 100644
--- a/internal/eventloop_runtime.go
+++ b/internal/eventloop_runtime.go
@@ -25,6 +25,7 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) {
if e.printCb == nil {
e.printCb = func(ep *event.Pair) { ep.Recycle() }
}
+ e.initRawHandlers()
if e.cfg.synchronousRawProcessing {
e.runSynchronously(ctx, rawCh)
return
@@ -48,20 +49,24 @@ func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) {
continue
}
e.processRawEvent(raw, pairs)
- for {
- select {
- case ep := <-pairs:
- e.printCb(ep)
- e.numSyscallsAfterFilter++
- default:
- goto nextRaw
- }
- }
+ e.drainPairs(pairs)
case <-ctx.Done():
fmt.Println("Stopping event loop")
return
}
- nextRaw:
+ }
+}
+
+// drainPairs consumes all immediately available pairs from the buffered channel.
+func (e *eventLoop) drainPairs(pairs <-chan *event.Pair) {
+ for {
+ select {
+ case ep := <-pairs:
+ e.printCb(ep)
+ e.numSyscallsAfterFilter++
+ default:
+ return
+ }
}
}
@@ -92,8 +97,10 @@ func (e *eventLoop) events(ctx context.Context, rawCh <-chan []byte) <-chan *eve
}
func (e *eventLoop) processRawEvent(raw []byte, ch chan<- *event.Pair) {
+ if len(raw) == 0 {
+ return
+ }
e.numTracepoints++
- e.initRawHandlers()
evType := types.EventType(raw[0])
handler, ok := e.rawHandlers[evType]
if !ok {
@@ -217,17 +224,17 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) {
// Schedule comm lookup as early as possible to reduce races for short-lived processes.
e.queueCommLookup(tid)
if !e.filter.UsesCommFilter() {
- e.setEnterEvent(enterEv)
+ e.pairs.set(enterEv)
return
}
switch enterEv.(type) {
case *types.OpenEvent:
- e.setEnterEvent(enterEv)
+ e.pairs.set(enterEv)
default:
// Only, when we have a comm name
if _, ok := e.cachedComm(tid); ok {
- e.setEnterEvent(enterEv)
+ e.pairs.set(enterEv)
} else {
e.notifyWarning(fmt.Sprintf("No comm name for %v process probably already vanished?", enterEv))
}
@@ -235,7 +242,7 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) {
}
func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) {
- ep, ok := e.consumeEnterEvent(exitEv.GetTid())
+ ep, ok := e.pairs.consume(exitEv.GetTid())
if !ok {
exitEv.Recycle()
return
@@ -255,9 +262,9 @@ func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair)
if !e.handleTracepointExit(ep) {
return
}
- prevPairTime, _ := e.prevPairTimes[ep.EnterEv.GetTid()]
- ep.CalculateDurations(prevPairTime)
- e.prevPairTimes[ep.EnterEv.GetTid()] = ep.ExitEv.GetTime()
+ tid := ep.EnterEv.GetTid()
+ ep.CalculateDurations(e.pairs.prevTime(tid))
+ e.pairs.setPrevTime(tid, ep.ExitEv.GetTime())
e.freezePairForEmission(ep)
ch <- ep
}