diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-01 23:50:20 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-01 23:50:20 +0200 |
| commit | 261aefd34bde83d75fa19aedb9ec99193c1c7fbb (patch) | |
| tree | bdd6cb375078267e0f5ae8a6c7bcd01966aba5b4 /internal/eventloop.go | |
| parent | 225cc311074f35e09aa3e70407d4e9f009f435a1 (diff) | |
Extract fd and comm state helpers from eventLoop
Diffstat (limited to 'internal/eventloop.go')
| -rw-r--r-- | internal/eventloop.go | 294 |
1 files changed, 191 insertions, 103 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go index 6975df3..536fe0a 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -34,24 +34,149 @@ type eventLoopConfig struct { plainMode bool } +type fdTracker struct { + files map[int32]file.File +} + +func newFDTracker(files map[int32]file.File) *fdTracker { + if files == nil { + files = make(map[int32]file.File) + } + return &fdTracker{files: files} +} + +func (t *fdTracker) get(fd int32) (file.File, bool) { + f, ok := t.files[fd] + return f, ok +} + +func (t *fdTracker) set(fd int32, f file.File) { + t.files[fd] = f +} + +func (t *fdTracker) delete(fd int32) { + delete(t.files, fd) +} + +func (t *fdTracker) closeRangeFrom(first int32) { + for fd := range t.files { + if fd >= first { + delete(t.files, fd) + } + } +} + +type commResolver struct { + comms map[uint32]string + + mu sync.RWMutex + pending map[uint32]struct{} +} + +func newCommResolver(comms map[uint32]string) *commResolver { + if comms == nil { + comms = make(map[uint32]string) + } + return &commResolver{ + comms: comms, + pending: make(map[uint32]struct{}), + } +} + +func (r *commResolver) seedTrackedPidComm(pidFilter int) { + candidates := []uint32{uint32(os.Getpid()), uint32(os.Getppid())} + if pidFilter > 0 { + candidates = append(candidates, uint32(pidFilter)) + } + + seen := make(map[uint32]struct{}, len(candidates)) + for _, tid := range candidates { + if tid == 0 { + continue + } + if _, ok := seen[tid]; ok { + continue + } + seen[tid] = struct{}{} + if comm := resolveCommFromProc(tid); comm != "" { + r.setCached(tid, comm) + continue + } + r.queueLookup(tid) + } +} + +func (r *commResolver) comm(tid uint32) string { + if comm, ok := r.cached(tid); ok { + return comm + } + r.queueLookup(tid) + return "" +} + +func (r *commResolver) cached(tid uint32) (string, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + comm, ok := r.comms[tid] + return comm, ok +} + +func (r *commResolver) setCached(tid uint32, comm string) { + if comm == "" { + return + } + r.mu.Lock() + r.comms[tid] = comm + r.mu.Unlock() +} + +func (r *commResolver) queueLookup(tid uint32) { + if tid == 0 { + return + } + r.mu.Lock() + if _, ok := r.comms[tid]; ok { + r.mu.Unlock() + return + } + if r.pending == nil { + r.pending = make(map[uint32]struct{}) + } + if _, ok := r.pending[tid]; ok { + r.mu.Unlock() + return + } + r.pending[tid] = struct{}{} + r.mu.Unlock() + + go func() { + comm := resolveCommFromProc(tid) + r.mu.Lock() + delete(r.pending, tid) + if comm != "" { + r.comms[tid] = comm + } + r.mu.Unlock() + }() +} + type rawEventHandler func(raw []byte, ch chan<- *event.Pair) type eventLoop struct { filter *eventFilter enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid. pendingHandles map[uint32]string // map of TID to pathname from name_to_handle_at - files map[int32]file.File // Track all open files by file descriptor.. - comms map[uint32]string // Program or thread name of the current Tid. - commsMu sync.RWMutex - // pendingCommLookups deduplicates async /proc comm lookups by TID. - pendingCommLookups map[uint32]struct{} - prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events) - rawHandlers map[EventType]rawEventHandler - flamegraph flamegraph.IorDataCollector // Storing all paths in a map structure for analysis - liveTrie *flamegraph.LiveTrie - printCb func(ep *event.Pair) // Callback to print the event - warningCb func(message string) // Optional callback for non-fatal event processing warnings - cfg eventLoopConfig + files map[int32]file.File // Track all open files by file descriptor. + fdTracker *fdTracker + comms map[uint32]string // Program or thread name of the current Tid. + commResolver *commResolver + prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events) + rawHandlers map[EventType]rawEventHandler + flamegraph flamegraph.IorDataCollector // Storing all paths in a map structure for analysis + liveTrie *flamegraph.LiveTrie + printCb func(ep *event.Pair) // Callback to print the event + warningCb func(message string) // Optional callback for non-fatal event processing warnings + cfg eventLoopConfig // Statistics numTracepoints uint @@ -63,19 +188,23 @@ type eventLoop struct { } func newEventLoop(cfg eventLoopConfig) *eventLoop { + filesByFD := make(map[int32]file.File) + commsByTID := make(map[uint32]string) + el := &eventLoop{ - filter: newEventFilter(cfg.commFilter, cfg.pathFilter), - enterEvs: make(map[uint32]*event.Pair), - pendingHandles: make(map[uint32]string), - files: make(map[int32]file.File), - comms: make(map[uint32]string), - pendingCommLookups: make(map[uint32]struct{}), - prevPairTimes: make(map[uint32]uint64), - rawHandlers: make(map[EventType]rawEventHandler), - printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, - flamegraph: flamegraph.New(cfg.flamegraphName), - cfg: cfg, - done: make(chan struct{}), + filter: newEventFilter(cfg.commFilter, cfg.pathFilter), + enterEvs: make(map[uint32]*event.Pair), + pendingHandles: make(map[uint32]string), + files: filesByFD, + fdTracker: newFDTracker(filesByFD), + comms: commsByTID, + commResolver: newCommResolver(commsByTID), + prevPairTimes: make(map[uint32]uint64), + rawHandlers: make(map[EventType]rawEventHandler), + printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, + flamegraph: flamegraph.New(cfg.flamegraphName), + cfg: cfg, + done: make(chan struct{}), } el.initRawHandlers() if cfg.liveFlamegraph { @@ -86,26 +215,27 @@ func newEventLoop(cfg eventLoopConfig) *eventLoop { } func (e *eventLoop) seedTrackedPidComm() { - candidates := []uint32{uint32(os.Getpid()), uint32(os.Getppid())} - if pid := e.cfg.pidFilter; pid > 0 { - candidates = append(candidates, uint32(pid)) + e.commState().seedTrackedPidComm(e.cfg.pidFilter) +} + +func (e *eventLoop) fdState() *fdTracker { + if e.files == nil { + e.files = make(map[int32]file.File) + } + if e.fdTracker == nil { + e.fdTracker = newFDTracker(e.files) } + return e.fdTracker +} - seen := make(map[uint32]struct{}, len(candidates)) - for _, tid := range candidates { - if tid == 0 { - continue - } - if _, ok := seen[tid]; ok { - continue - } - seen[tid] = struct{}{} - if comm := resolveCommFromProc(tid); comm != "" { - e.setCachedComm(tid, comm) - continue - } - e.queueCommLookup(tid) +func (e *eventLoop) commState() *commResolver { + if e.comms == nil { + e.comms = make(map[uint32]string) + } + if e.commResolver == nil { + e.commResolver = newCommResolver(e.comms) } + return e.commResolver } func (e *eventLoop) stats() string { @@ -355,7 +485,7 @@ func (e *eventLoop) handleOpenExit(ep *event.Pair, openEv *OpenEvent) bool { ep.Comm = comm if fd := int32(retEvent.Ret); fd >= 0 { fdFile := file.NewFd(fd, types.StringValue(openEv.Filename[:]), openEv.Flags) - e.files[fd] = fdFile + e.fdState().set(fd, fdFile) ep.File = fdFile } else { // Keep path information for failed opens so error scenarios remain observable. @@ -392,7 +522,7 @@ func (e *eventLoop) handlePathExit(ep *event.Pair, pathEv *PathEvent) bool { if fd := int32(retEvent.Ret); fd >= 0 { fdFile := file.NewFd(fd, types.StringValue(pathEv.Pathname[:]), syscall.O_CREAT|syscall.O_WRONLY|syscall.O_TRUNC) - e.files[fd] = fdFile + e.fdState().set(fd, fdFile) ep.File = fdFile } } else { @@ -404,10 +534,10 @@ func (e *eventLoop) handlePathExit(ep *event.Pair, pathEv *PathEvent) bool { func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *FdEvent) bool { fd := fdEv.Fd - if fdFile, ok := e.files[fd]; ok { + if fdFile, ok := e.fdState().get(fd); ok { ep.File = fdFile if ep.Is(SYS_ENTER_CLOSE) { - delete(e.files, fd) + e.fdState().delete(fd) } } else { ep.File = file.NewFdWithPid(fd, fdEv.Pid) @@ -417,11 +547,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *FdEvent) bool { // argument, so we approximate by closing all tracked fds >= first. retEv, ok := ep.ExitEv.(*types.RetEvent) if ok && retEv.Ret == 0 { - for fdToClose := range e.files { - if fdToClose >= fd { - delete(e.files, fdToClose) - } - } + e.fdState().closeRangeFrom(fd) } } ep.Comm = e.comm(fdEv.GetTid()) @@ -443,7 +569,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *FdEvent) bool { } // Duplicating fd if newFd := int32(retEvent.Ret); newFd != -1 { - e.files[newFd] = fdFile.Dup(newFd) + e.fdState().set(newFd, fdFile.Dup(newFd)) } } if ep.Is(SYS_ENTER_PIDFD_GETFD) { @@ -454,7 +580,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *FdEvent) bool { } if newFd := int32(retEv.Ret); newFd >= 0 { transferredFile := file.NewFdWithPid(newFd, fdEv.Pid) - e.files[newFd] = transferredFile + e.fdState().set(newFd, transferredFile) ep.File = transferredFile } } @@ -466,7 +592,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *FdEvent) bool { func (e *eventLoop) handleDup3Exit(ep *event.Pair, dup3Ev *Dup3Event) bool { fd := int32(dup3Ev.Fd) - if fdFile, ok := e.files[fd]; ok { + if fdFile, ok := e.fdState().get(fd); ok { ep.File = fdFile } else { ep.File = file.NewFdWithPid(fd, dup3Ev.Pid) @@ -490,7 +616,7 @@ func (e *eventLoop) handleDup3Exit(ep *event.Pair, dup3Ev *Dup3Event) bool { if newFd := int32(retEvent.Ret); newFd != -1 { duppedFdFile := fdFile.Dup(newFd) duppedFdFile.AddFlags(dup3Ev.Flags & syscall.O_CLOEXEC) - e.files[newFd] = duppedFdFile + e.fdState().set(newFd, duppedFdFile) } return true } @@ -512,14 +638,14 @@ func (e *eventLoop) handleOpenByHandleAtExit(ep *event.Pair, openByHandleEv *Ope if pathname, ok := e.pendingHandles[tid]; ok { delete(e.pendingHandles, tid) fdFile := file.NewFd(fd, pathname, openByHandleEv.Flags) - e.files[fd] = fdFile + e.fdState().set(fd, fdFile) ep.File = fdFile } else { fdFile := file.NewFdWithPid(fd, openByHandleEv.Pid) if fdFile.Flags() == file.Flags(-1) { fdFile.SetFlags(openByHandleEv.Flags) } - e.files[fd] = fdFile + e.fdState().set(fd, fdFile) ep.File = fdFile } ep.Comm = e.comm(tid) @@ -535,7 +661,7 @@ func (e *eventLoop) handleNullExit(ep *event.Pair, nullEv *NullEvent) bool { } if fd := int32(retEvent.Ret); fd >= 0 { fdFile := file.NewFdWithPid(fd, nullEv.Pid) - e.files[fd] = fdFile + e.fdState().set(fd, fdFile) ep.File = fdFile } } @@ -562,7 +688,7 @@ func (e *eventLoop) handleNullExit(ep *event.Pair, nullEv *NullEvent) bool { func (e *eventLoop) handleFcntlExit(ep *event.Pair, fcntlEv *FcntlEvent) bool { ep.Comm = e.comm(fcntlEv.GetTid()) fd := int32(fcntlEv.Fd) - if fdFile, ok := e.files[fd]; ok { + if fdFile, ok := e.fdState().get(fd); ok { ep.File = fdFile } else { ep.File = file.NewFdWithPid(fd, fcntlEv.Pid) @@ -594,15 +720,15 @@ func (e *eventLoop) handleFcntlExit(ep *event.Pair, fcntlEv *FcntlEvent) bool { const canChange = syscall.O_APPEND | syscall.O_ASYNC | syscall.O_DIRECT | syscall.O_NOATIME | syscall.O_NONBLOCK fdFile.SetFlags(int32(fcntlEv.Arg) & int32(canChange)) ep.File = fdFile - e.files[fd] = fdFile + e.fdState().set(fd, fdFile) case syscall.F_DUPFD: newFd := int32(retEvent.Ret) - e.files[newFd] = fdFile.Dup(newFd) + e.fdState().set(newFd, fdFile.Dup(newFd)) case syscall.F_DUPFD_CLOEXEC: newFd := int32(retEvent.Ret) duppedFdFile := fdFile.Dup(newFd) duppedFdFile.AddFlags(syscall.O_CLOEXEC) - e.files[newFd] = duppedFdFile + e.fdState().set(newFd, duppedFdFile) } return true } @@ -620,57 +746,19 @@ func (e *eventLoop) notifyWarning(message string) { } func (e *eventLoop) comm(tid uint32) string { - if comm, ok := e.cachedComm(tid); ok { - return comm - } - e.queueCommLookup(tid) - return "" + return e.commState().comm(tid) } func (e *eventLoop) cachedComm(tid uint32) (string, bool) { - e.commsMu.RLock() - defer e.commsMu.RUnlock() - comm, ok := e.comms[tid] - return comm, ok + return e.commState().cached(tid) } func (e *eventLoop) setCachedComm(tid uint32, comm string) { - if comm == "" { - return - } - e.commsMu.Lock() - e.comms[tid] = comm - e.commsMu.Unlock() + e.commState().setCached(tid, comm) } func (e *eventLoop) queueCommLookup(tid uint32) { - if tid == 0 { - return - } - e.commsMu.Lock() - if _, ok := e.comms[tid]; ok { - e.commsMu.Unlock() - return - } - if e.pendingCommLookups == nil { - e.pendingCommLookups = make(map[uint32]struct{}) - } - if _, ok := e.pendingCommLookups[tid]; ok { - e.commsMu.Unlock() - return - } - e.pendingCommLookups[tid] = struct{}{} - e.commsMu.Unlock() - - go func() { - comm := resolveCommFromProc(tid) - e.commsMu.Lock() - delete(e.pendingCommLookups, tid) - if comm != "" { - e.comms[tid] = comm - } - e.commsMu.Unlock() - }() + e.commState().queueLookup(tid) } func resolveCommFromProc(tid uint32) string { |
