diff options
Diffstat (limited to 'internal/eventloop_runtime.go')
| -rw-r--r-- | internal/eventloop_runtime.go | 43 |
1 files changed, 25 insertions, 18 deletions
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go index 12d9f12..7f540ec 100644 --- a/internal/eventloop_runtime.go +++ b/internal/eventloop_runtime.go @@ -25,6 +25,7 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { if e.printCb == nil { e.printCb = func(ep *event.Pair) { ep.Recycle() } } + e.initRawHandlers() if e.cfg.synchronousRawProcessing { e.runSynchronously(ctx, rawCh) return @@ -48,20 +49,24 @@ func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { continue } e.processRawEvent(raw, pairs) - for { - select { - case ep := <-pairs: - e.printCb(ep) - e.numSyscallsAfterFilter++ - default: - goto nextRaw - } - } + e.drainPairs(pairs) case <-ctx.Done(): fmt.Println("Stopping event loop") return } - nextRaw: + } +} + +// drainPairs consumes all immediately available pairs from the buffered channel. +func (e *eventLoop) drainPairs(pairs <-chan *event.Pair) { + for { + select { + case ep := <-pairs: + e.printCb(ep) + e.numSyscallsAfterFilter++ + default: + return + } } } @@ -92,8 +97,10 @@ func (e *eventLoop) events(ctx context.Context, rawCh <-chan []byte) <-chan *eve } func (e *eventLoop) processRawEvent(raw []byte, ch chan<- *event.Pair) { + if len(raw) == 0 { + return + } e.numTracepoints++ - e.initRawHandlers() evType := types.EventType(raw[0]) handler, ok := e.rawHandlers[evType] if !ok { @@ -217,17 +224,17 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) { // Schedule comm lookup as early as possible to reduce races for short-lived processes. e.queueCommLookup(tid) if !e.filter.UsesCommFilter() { - e.setEnterEvent(enterEv) + e.pairs.set(enterEv) return } switch enterEv.(type) { case *types.OpenEvent: - e.setEnterEvent(enterEv) + e.pairs.set(enterEv) default: // Only, when we have a comm name if _, ok := e.cachedComm(tid); ok { - e.setEnterEvent(enterEv) + e.pairs.set(enterEv) } else { e.notifyWarning(fmt.Sprintf("No comm name for %v process probably already vanished?", enterEv)) } @@ -235,7 +242,7 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) { } func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) { - ep, ok := e.consumeEnterEvent(exitEv.GetTid()) + ep, ok := e.pairs.consume(exitEv.GetTid()) if !ok { exitEv.Recycle() return @@ -255,9 +262,9 @@ func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) if !e.handleTracepointExit(ep) { return } - prevPairTime, _ := e.prevPairTimes[ep.EnterEv.GetTid()] - ep.CalculateDurations(prevPairTime) - e.prevPairTimes[ep.EnterEv.GetTid()] = ep.ExitEv.GetTime() + tid := ep.EnterEv.GetTid() + ep.CalculateDurations(e.pairs.prevTime(tid)) + e.pairs.setPrevTime(tid, ep.ExitEv.GetTime()) e.freezePairForEmission(ep) ch <- ep } |
