summaryrefslogtreecommitdiff
path: root/internal/eventloop.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-01 23:50:20 +0200
committerPaul Buetow <paul@buetow.org>2026-03-01 23:50:20 +0200
commit261aefd34bde83d75fa19aedb9ec99193c1c7fbb (patch)
treebdd6cb375078267e0f5ae8a6c7bcd01966aba5b4 /internal/eventloop.go
parent225cc311074f35e09aa3e70407d4e9f009f435a1 (diff)
Extract fd and comm state helpers from eventLoop
Diffstat (limited to 'internal/eventloop.go')
-rw-r--r--internal/eventloop.go294
1 files changed, 191 insertions, 103 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go
index 6975df3..536fe0a 100644
--- a/internal/eventloop.go
+++ b/internal/eventloop.go
@@ -34,24 +34,149 @@ type eventLoopConfig struct {
plainMode bool
}
+type fdTracker struct {
+ files map[int32]file.File
+}
+
+func newFDTracker(files map[int32]file.File) *fdTracker {
+ if files == nil {
+ files = make(map[int32]file.File)
+ }
+ return &fdTracker{files: files}
+}
+
+func (t *fdTracker) get(fd int32) (file.File, bool) {
+ f, ok := t.files[fd]
+ return f, ok
+}
+
+func (t *fdTracker) set(fd int32, f file.File) {
+ t.files[fd] = f
+}
+
+func (t *fdTracker) delete(fd int32) {
+ delete(t.files, fd)
+}
+
+func (t *fdTracker) closeRangeFrom(first int32) {
+ for fd := range t.files {
+ if fd >= first {
+ delete(t.files, fd)
+ }
+ }
+}
+
+type commResolver struct {
+ comms map[uint32]string
+
+ mu sync.RWMutex
+ pending map[uint32]struct{}
+}
+
+func newCommResolver(comms map[uint32]string) *commResolver {
+ if comms == nil {
+ comms = make(map[uint32]string)
+ }
+ return &commResolver{
+ comms: comms,
+ pending: make(map[uint32]struct{}),
+ }
+}
+
+func (r *commResolver) seedTrackedPidComm(pidFilter int) {
+ candidates := []uint32{uint32(os.Getpid()), uint32(os.Getppid())}
+ if pidFilter > 0 {
+ candidates = append(candidates, uint32(pidFilter))
+ }
+
+ seen := make(map[uint32]struct{}, len(candidates))
+ for _, tid := range candidates {
+ if tid == 0 {
+ continue
+ }
+ if _, ok := seen[tid]; ok {
+ continue
+ }
+ seen[tid] = struct{}{}
+ if comm := resolveCommFromProc(tid); comm != "" {
+ r.setCached(tid, comm)
+ continue
+ }
+ r.queueLookup(tid)
+ }
+}
+
+func (r *commResolver) comm(tid uint32) string {
+ if comm, ok := r.cached(tid); ok {
+ return comm
+ }
+ r.queueLookup(tid)
+ return ""
+}
+
+func (r *commResolver) cached(tid uint32) (string, bool) {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ comm, ok := r.comms[tid]
+ return comm, ok
+}
+
+func (r *commResolver) setCached(tid uint32, comm string) {
+ if comm == "" {
+ return
+ }
+ r.mu.Lock()
+ r.comms[tid] = comm
+ r.mu.Unlock()
+}
+
+func (r *commResolver) queueLookup(tid uint32) {
+ if tid == 0 {
+ return
+ }
+ r.mu.Lock()
+ if _, ok := r.comms[tid]; ok {
+ r.mu.Unlock()
+ return
+ }
+ if r.pending == nil {
+ r.pending = make(map[uint32]struct{})
+ }
+ if _, ok := r.pending[tid]; ok {
+ r.mu.Unlock()
+ return
+ }
+ r.pending[tid] = struct{}{}
+ r.mu.Unlock()
+
+ go func() {
+ comm := resolveCommFromProc(tid)
+ r.mu.Lock()
+ delete(r.pending, tid)
+ if comm != "" {
+ r.comms[tid] = comm
+ }
+ r.mu.Unlock()
+ }()
+}
+
type rawEventHandler func(raw []byte, ch chan<- *event.Pair)
type eventLoop struct {
filter *eventFilter
enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid.
pendingHandles map[uint32]string // map of TID to pathname from name_to_handle_at
- files map[int32]file.File // Track all open files by file descriptor..
- comms map[uint32]string // Program or thread name of the current Tid.
- commsMu sync.RWMutex
- // pendingCommLookups deduplicates async /proc comm lookups by TID.
- pendingCommLookups map[uint32]struct{}
- prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events)
- rawHandlers map[EventType]rawEventHandler
- flamegraph flamegraph.IorDataCollector // Storing all paths in a map structure for analysis
- liveTrie *flamegraph.LiveTrie
- printCb func(ep *event.Pair) // Callback to print the event
- warningCb func(message string) // Optional callback for non-fatal event processing warnings
- cfg eventLoopConfig
+ files map[int32]file.File // Track all open files by file descriptor.
+ fdTracker *fdTracker
+ comms map[uint32]string // Program or thread name of the current Tid.
+ commResolver *commResolver
+ prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events)
+ rawHandlers map[EventType]rawEventHandler
+ flamegraph flamegraph.IorDataCollector // Storing all paths in a map structure for analysis
+ liveTrie *flamegraph.LiveTrie
+ printCb func(ep *event.Pair) // Callback to print the event
+ warningCb func(message string) // Optional callback for non-fatal event processing warnings
+ cfg eventLoopConfig
// Statistics
numTracepoints uint
@@ -63,19 +188,23 @@ type eventLoop struct {
}
func newEventLoop(cfg eventLoopConfig) *eventLoop {
+ filesByFD := make(map[int32]file.File)
+ commsByTID := make(map[uint32]string)
+
el := &eventLoop{
- filter: newEventFilter(cfg.commFilter, cfg.pathFilter),
- enterEvs: make(map[uint32]*event.Pair),
- pendingHandles: make(map[uint32]string),
- files: make(map[int32]file.File),
- comms: make(map[uint32]string),
- pendingCommLookups: make(map[uint32]struct{}),
- prevPairTimes: make(map[uint32]uint64),
- rawHandlers: make(map[EventType]rawEventHandler),
- printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() },
- flamegraph: flamegraph.New(cfg.flamegraphName),
- cfg: cfg,
- done: make(chan struct{}),
+ filter: newEventFilter(cfg.commFilter, cfg.pathFilter),
+ enterEvs: make(map[uint32]*event.Pair),
+ pendingHandles: make(map[uint32]string),
+ files: filesByFD,
+ fdTracker: newFDTracker(filesByFD),
+ comms: commsByTID,
+ commResolver: newCommResolver(commsByTID),
+ prevPairTimes: make(map[uint32]uint64),
+ rawHandlers: make(map[EventType]rawEventHandler),
+ printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() },
+ flamegraph: flamegraph.New(cfg.flamegraphName),
+ cfg: cfg,
+ done: make(chan struct{}),
}
el.initRawHandlers()
if cfg.liveFlamegraph {
@@ -86,26 +215,27 @@ func newEventLoop(cfg eventLoopConfig) *eventLoop {
}
func (e *eventLoop) seedTrackedPidComm() {
- candidates := []uint32{uint32(os.Getpid()), uint32(os.Getppid())}
- if pid := e.cfg.pidFilter; pid > 0 {
- candidates = append(candidates, uint32(pid))
+ e.commState().seedTrackedPidComm(e.cfg.pidFilter)
+}
+
+func (e *eventLoop) fdState() *fdTracker {
+ if e.files == nil {
+ e.files = make(map[int32]file.File)
+ }
+ if e.fdTracker == nil {
+ e.fdTracker = newFDTracker(e.files)
}
+ return e.fdTracker
+}
- seen := make(map[uint32]struct{}, len(candidates))
- for _, tid := range candidates {
- if tid == 0 {
- continue
- }
- if _, ok := seen[tid]; ok {
- continue
- }
- seen[tid] = struct{}{}
- if comm := resolveCommFromProc(tid); comm != "" {
- e.setCachedComm(tid, comm)
- continue
- }
- e.queueCommLookup(tid)
+func (e *eventLoop) commState() *commResolver {
+ if e.comms == nil {
+ e.comms = make(map[uint32]string)
+ }
+ if e.commResolver == nil {
+ e.commResolver = newCommResolver(e.comms)
}
+ return e.commResolver
}
func (e *eventLoop) stats() string {
@@ -355,7 +485,7 @@ func (e *eventLoop) handleOpenExit(ep *event.Pair, openEv *OpenEvent) bool {
ep.Comm = comm
if fd := int32(retEvent.Ret); fd >= 0 {
fdFile := file.NewFd(fd, types.StringValue(openEv.Filename[:]), openEv.Flags)
- e.files[fd] = fdFile
+ e.fdState().set(fd, fdFile)
ep.File = fdFile
} else {
// Keep path information for failed opens so error scenarios remain observable.
@@ -392,7 +522,7 @@ func (e *eventLoop) handlePathExit(ep *event.Pair, pathEv *PathEvent) bool {
if fd := int32(retEvent.Ret); fd >= 0 {
fdFile := file.NewFd(fd, types.StringValue(pathEv.Pathname[:]),
syscall.O_CREAT|syscall.O_WRONLY|syscall.O_TRUNC)
- e.files[fd] = fdFile
+ e.fdState().set(fd, fdFile)
ep.File = fdFile
}
} else {
@@ -404,10 +534,10 @@ func (e *eventLoop) handlePathExit(ep *event.Pair, pathEv *PathEvent) bool {
func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *FdEvent) bool {
fd := fdEv.Fd
- if fdFile, ok := e.files[fd]; ok {
+ if fdFile, ok := e.fdState().get(fd); ok {
ep.File = fdFile
if ep.Is(SYS_ENTER_CLOSE) {
- delete(e.files, fd)
+ e.fdState().delete(fd)
}
} else {
ep.File = file.NewFdWithPid(fd, fdEv.Pid)
@@ -417,11 +547,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *FdEvent) bool {
// argument, so we approximate by closing all tracked fds >= first.
retEv, ok := ep.ExitEv.(*types.RetEvent)
if ok && retEv.Ret == 0 {
- for fdToClose := range e.files {
- if fdToClose >= fd {
- delete(e.files, fdToClose)
- }
- }
+ e.fdState().closeRangeFrom(fd)
}
}
ep.Comm = e.comm(fdEv.GetTid())
@@ -443,7 +569,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *FdEvent) bool {
}
// Duplicating fd
if newFd := int32(retEvent.Ret); newFd != -1 {
- e.files[newFd] = fdFile.Dup(newFd)
+ e.fdState().set(newFd, fdFile.Dup(newFd))
}
}
if ep.Is(SYS_ENTER_PIDFD_GETFD) {
@@ -454,7 +580,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *FdEvent) bool {
}
if newFd := int32(retEv.Ret); newFd >= 0 {
transferredFile := file.NewFdWithPid(newFd, fdEv.Pid)
- e.files[newFd] = transferredFile
+ e.fdState().set(newFd, transferredFile)
ep.File = transferredFile
}
}
@@ -466,7 +592,7 @@ func (e *eventLoop) handleFdExit(ep *event.Pair, fdEv *FdEvent) bool {
func (e *eventLoop) handleDup3Exit(ep *event.Pair, dup3Ev *Dup3Event) bool {
fd := int32(dup3Ev.Fd)
- if fdFile, ok := e.files[fd]; ok {
+ if fdFile, ok := e.fdState().get(fd); ok {
ep.File = fdFile
} else {
ep.File = file.NewFdWithPid(fd, dup3Ev.Pid)
@@ -490,7 +616,7 @@ func (e *eventLoop) handleDup3Exit(ep *event.Pair, dup3Ev *Dup3Event) bool {
if newFd := int32(retEvent.Ret); newFd != -1 {
duppedFdFile := fdFile.Dup(newFd)
duppedFdFile.AddFlags(dup3Ev.Flags & syscall.O_CLOEXEC)
- e.files[newFd] = duppedFdFile
+ e.fdState().set(newFd, duppedFdFile)
}
return true
}
@@ -512,14 +638,14 @@ func (e *eventLoop) handleOpenByHandleAtExit(ep *event.Pair, openByHandleEv *Ope
if pathname, ok := e.pendingHandles[tid]; ok {
delete(e.pendingHandles, tid)
fdFile := file.NewFd(fd, pathname, openByHandleEv.Flags)
- e.files[fd] = fdFile
+ e.fdState().set(fd, fdFile)
ep.File = fdFile
} else {
fdFile := file.NewFdWithPid(fd, openByHandleEv.Pid)
if fdFile.Flags() == file.Flags(-1) {
fdFile.SetFlags(openByHandleEv.Flags)
}
- e.files[fd] = fdFile
+ e.fdState().set(fd, fdFile)
ep.File = fdFile
}
ep.Comm = e.comm(tid)
@@ -535,7 +661,7 @@ func (e *eventLoop) handleNullExit(ep *event.Pair, nullEv *NullEvent) bool {
}
if fd := int32(retEvent.Ret); fd >= 0 {
fdFile := file.NewFdWithPid(fd, nullEv.Pid)
- e.files[fd] = fdFile
+ e.fdState().set(fd, fdFile)
ep.File = fdFile
}
}
@@ -562,7 +688,7 @@ func (e *eventLoop) handleNullExit(ep *event.Pair, nullEv *NullEvent) bool {
func (e *eventLoop) handleFcntlExit(ep *event.Pair, fcntlEv *FcntlEvent) bool {
ep.Comm = e.comm(fcntlEv.GetTid())
fd := int32(fcntlEv.Fd)
- if fdFile, ok := e.files[fd]; ok {
+ if fdFile, ok := e.fdState().get(fd); ok {
ep.File = fdFile
} else {
ep.File = file.NewFdWithPid(fd, fcntlEv.Pid)
@@ -594,15 +720,15 @@ func (e *eventLoop) handleFcntlExit(ep *event.Pair, fcntlEv *FcntlEvent) bool {
const canChange = syscall.O_APPEND | syscall.O_ASYNC | syscall.O_DIRECT | syscall.O_NOATIME | syscall.O_NONBLOCK
fdFile.SetFlags(int32(fcntlEv.Arg) & int32(canChange))
ep.File = fdFile
- e.files[fd] = fdFile
+ e.fdState().set(fd, fdFile)
case syscall.F_DUPFD:
newFd := int32(retEvent.Ret)
- e.files[newFd] = fdFile.Dup(newFd)
+ e.fdState().set(newFd, fdFile.Dup(newFd))
case syscall.F_DUPFD_CLOEXEC:
newFd := int32(retEvent.Ret)
duppedFdFile := fdFile.Dup(newFd)
duppedFdFile.AddFlags(syscall.O_CLOEXEC)
- e.files[newFd] = duppedFdFile
+ e.fdState().set(newFd, duppedFdFile)
}
return true
}
@@ -620,57 +746,19 @@ func (e *eventLoop) notifyWarning(message string) {
}
func (e *eventLoop) comm(tid uint32) string {
- if comm, ok := e.cachedComm(tid); ok {
- return comm
- }
- e.queueCommLookup(tid)
- return ""
+ return e.commState().comm(tid)
}
func (e *eventLoop) cachedComm(tid uint32) (string, bool) {
- e.commsMu.RLock()
- defer e.commsMu.RUnlock()
- comm, ok := e.comms[tid]
- return comm, ok
+ return e.commState().cached(tid)
}
func (e *eventLoop) setCachedComm(tid uint32, comm string) {
- if comm == "" {
- return
- }
- e.commsMu.Lock()
- e.comms[tid] = comm
- e.commsMu.Unlock()
+ e.commState().setCached(tid, comm)
}
func (e *eventLoop) queueCommLookup(tid uint32) {
- if tid == 0 {
- return
- }
- e.commsMu.Lock()
- if _, ok := e.comms[tid]; ok {
- e.commsMu.Unlock()
- return
- }
- if e.pendingCommLookups == nil {
- e.pendingCommLookups = make(map[uint32]struct{})
- }
- if _, ok := e.pendingCommLookups[tid]; ok {
- e.commsMu.Unlock()
- return
- }
- e.pendingCommLookups[tid] = struct{}{}
- e.commsMu.Unlock()
-
- go func() {
- comm := resolveCommFromProc(tid)
- e.commsMu.Lock()
- delete(e.pendingCommLookups, tid)
- if comm != "" {
- e.comms[tid] = comm
- }
- e.commsMu.Unlock()
- }()
+ e.commState().queueLookup(tid)
}
func resolveCommFromProc(tid uint32) string {