diff options
Diffstat (limited to 'internal/eventloop_runtime.go')
| -rw-r--r-- | internal/eventloop_runtime.go | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go new file mode 100644 index 0000000..12d9f12 --- /dev/null +++ b/internal/eventloop_runtime.go @@ -0,0 +1,271 @@ +package internal + +import ( + "context" + "fmt" + "time" + + "ior/internal/event" + "ior/internal/file" + "ior/internal/types" +) + +func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { + defer close(e.done) + defer e.shutdownCommResolver() + + if e.cfg.pprofEnable { + fmt.Println("Profiling, press Ctrl+C to stop") + } + if e.cfg.plainMode && !e.cfg.pprofEnable { + fmt.Println(event.EventStreamHeader) + } + + e.startTime = time.Now() + if e.printCb == nil { + e.printCb = func(ep *event.Pair) { ep.Recycle() } + } + if e.cfg.synchronousRawProcessing { + e.runSynchronously(ctx, rawCh) + return + } + for ep := range e.events(ctx, rawCh) { + e.printCb(ep) + e.numSyscallsAfterFilter++ + } +} + +func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) { + pairs := make(chan *event.Pair, 1) + + for { + select { + case raw, ok := <-rawCh: + if !ok { + return + } + if len(raw) == 0 { + continue + } + e.processRawEvent(raw, pairs) + for { + select { + case ep := <-pairs: + e.printCb(ep) + e.numSyscallsAfterFilter++ + default: + goto nextRaw + } + } + case <-ctx.Done(): + fmt.Println("Stopping event loop") + return + } + nextRaw: + } +} + +func (e *eventLoop) events(ctx context.Context, rawCh <-chan []byte) <-chan *event.Pair { + ch := make(chan *event.Pair) + + go func() { + defer close(ch) + + for { + select { + case raw, ok := <-rawCh: + if !ok { + return + } + if len(raw) == 0 { + continue + } + e.processRawEvent(raw, ch) + case <-ctx.Done(): + fmt.Println("Stopping event loop") + return + } + } + }() + + return ch +} + +func (e *eventLoop) processRawEvent(raw []byte, ch chan<- *event.Pair) { + e.numTracepoints++ + e.initRawHandlers() + evType := types.EventType(raw[0]) + handler, ok := e.rawHandlers[evType] + if !ok { + e.notifyWarning(fmt.Sprintf("Dropped unhandled raw event type %d", evType)) + return + } + handler(raw, ch) +} + +func (e *eventLoop) initRawHandlers() { + if e.rawHandlers == nil { + e.rawHandlers = make(map[types.EventType]rawEventHandler) + } + if len(e.rawHandlers) != 0 { + return + } + + e.rawHandlers[types.ENTER_OPEN_EVENT] = func(raw []byte, _ chan<- *event.Pair) { + openEv, ok := decodeRawEvent(e, types.ENTER_OPEN_EVENT, raw, types.NewOpenEventFast) + if !ok { + return + } + if e.filter.MatchOpenEvent(openEv) { + e.tracepointEntered(openEv) + } + } + e.rawHandlers[types.EXIT_OPEN_EVENT] = func(raw []byte, ch chan<- *event.Pair) { + retEv, ok := decodeRawEvent(e, types.EXIT_OPEN_EVENT, raw, types.NewRetEventFast) + if !ok { + return + } + e.tracepointExited(retEv, ch) + } + e.rawHandlers[types.ENTER_FD_EVENT] = func(raw []byte, _ chan<- *event.Pair) { + fdEv, ok := decodeRawEvent(e, types.ENTER_FD_EVENT, raw, types.NewFdEventFast) + if !ok { + return + } + e.tracepointEntered(fdEv) + } + e.rawHandlers[types.EXIT_FD_EVENT] = func(raw []byte, ch chan<- *event.Pair) { + fdEv, ok := decodeRawEvent(e, types.EXIT_FD_EVENT, raw, types.NewFdEventFast) + if !ok { + return + } + e.tracepointExited(fdEv, ch) + } + e.rawHandlers[types.ENTER_NULL_EVENT] = func(raw []byte, _ chan<- *event.Pair) { + nullEv, ok := decodeRawEvent(e, types.ENTER_NULL_EVENT, raw, types.NewNullEventFast) + if !ok { + return + } + e.tracepointEntered(nullEv) + } + e.rawHandlers[types.EXIT_NULL_EVENT] = func(raw []byte, ch chan<- *event.Pair) { + nullEv, ok := decodeRawEvent(e, types.EXIT_NULL_EVENT, raw, types.NewNullEventFast) + if !ok { + return + } + e.tracepointExited(nullEv, ch) + } + e.rawHandlers[types.EXIT_RET_EVENT] = func(raw []byte, ch chan<- *event.Pair) { + retEv, ok := decodeRawEvent(e, types.EXIT_RET_EVENT, raw, types.NewRetEventFast) + if !ok { + return + } + e.tracepointExited(retEv, ch) + } + e.rawHandlers[types.ENTER_NAME_EVENT] = func(raw []byte, _ chan<- *event.Pair) { + nameEv, ok := decodeRawEvent(e, types.ENTER_NAME_EVENT, raw, types.NewNameEventFast) + if !ok { + return + } + if e.filter.MatchNameEvent(nameEv) { + e.tracepointEntered(nameEv) + } + } + e.rawHandlers[types.ENTER_PATH_EVENT] = func(raw []byte, _ chan<- *event.Pair) { + pathEv, ok := decodeRawEvent(e, types.ENTER_PATH_EVENT, raw, types.NewPathEventFast) + if !ok { + return + } + if e.filter.MatchPathEvent(pathEv) { + e.tracepointEntered(pathEv) + } + } + e.rawHandlers[types.ENTER_FCNTL_EVENT] = func(raw []byte, _ chan<- *event.Pair) { + fcntlEv, ok := decodeRawEvent(e, types.ENTER_FCNTL_EVENT, raw, types.NewFcntlEventFast) + if !ok { + return + } + e.tracepointEntered(fcntlEv) + } + e.rawHandlers[types.ENTER_OPEN_BY_HANDLE_AT_EVENT] = func(raw []byte, _ chan<- *event.Pair) { + openByHandleEv, ok := decodeRawEvent(e, types.ENTER_OPEN_BY_HANDLE_AT_EVENT, raw, types.NewOpenByHandleAtEventFast) + if !ok { + return + } + e.tracepointEntered(openByHandleEv) + } + e.rawHandlers[types.ENTER_DUP3_EVENT] = func(raw []byte, _ chan<- *event.Pair) { + dup3Ev, ok := decodeRawEvent(e, types.ENTER_DUP3_EVENT, raw, types.NewDup3EventFast) + if !ok { + return + } + e.tracepointEntered(dup3Ev) + } +} + +func decodeRawEvent[T any](e *eventLoop, eventType types.EventType, raw []byte, decode func([]byte) *T) (*T, bool) { + decoded := decode(raw) + if decoded == nil { + e.dropMalformedRawEvent(eventType, raw) + return nil, false + } + return decoded, true +} + +func (e *eventLoop) tracepointEntered(enterEv event.Event) { + tid := enterEv.GetTid() + // Schedule comm lookup as early as possible to reduce races for short-lived processes. + e.queueCommLookup(tid) + if !e.filter.UsesCommFilter() { + e.setEnterEvent(enterEv) + return + } + + switch enterEv.(type) { + case *types.OpenEvent: + e.setEnterEvent(enterEv) + default: + // Only, when we have a comm name + if _, ok := e.cachedComm(tid); ok { + e.setEnterEvent(enterEv) + } else { + e.notifyWarning(fmt.Sprintf("No comm name for %v process probably already vanished?", enterEv)) + } + } +} + +func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) { + ep, ok := e.consumeEnterEvent(exitEv.GetTid()) + if !ok { + exitEv.Recycle() + return + } + ep.ExitEv = exitEv + e.numSyscalls++ + + // Expect ID one lower, otherwise, enter and exit tracepoints + // don't match up. E.g.: + // enterEv:SYS_ENTER_OPEN => exitEv:SYS_EXIT_OPEN + if ep.EnterEv.GetTraceId()-1 != ep.ExitEv.GetTraceId() { + e.numTracepointMismatches++ + e.notifyWarning("Dropped tracepoint pair with mismatched enter/exit IDs") + ep.Recycle() + return + } + if !e.handleTracepointExit(ep) { + return + } + prevPairTime, _ := e.prevPairTimes[ep.EnterEv.GetTid()] + ep.CalculateDurations(prevPairTime) + e.prevPairTimes[ep.EnterEv.GetTid()] = ep.ExitEv.GetTime() + e.freezePairForEmission(ep) + ch <- ep +} + +func (e *eventLoop) freezePairForEmission(ep *event.Pair) { + fdFile, ok := ep.File.(*file.FdFile) + if !ok { + return + } + ep.File = fdFile.Dup(fdFile.FD()) +} |
