diff options
Diffstat (limited to 'internal/eventloop.go')
| -rw-r--r-- | internal/eventloop.go | 220 |
1 files changed, 187 insertions, 33 deletions
diff --git a/internal/eventloop.go b/internal/eventloop.go index 6eba594..27b48e7 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "reflect" + "sort" "sync" "syscall" "time" @@ -23,6 +24,9 @@ const sysEnterNameToHandleAtName = "name_to_handle_at" const ( defaultCommLookupWorkers = 4 defaultCommLookupQueueSize = 512 + defaultMaxPendingEnterEvs = 16384 + defaultMaxProcFdCacheSize = 8192 + cacheTrimDivisor = 4 ) type eventLoopConfig struct { @@ -241,18 +245,23 @@ type rawEventHandler func(raw []byte, ch chan<- *event.Pair) type tracepointExitHandler func(ep *event.Pair) bool type eventLoop struct { - filter globalfilter.Filter - enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid. - pendingHandles map[uint32]string // map of TID to pathname from name_to_handle_at - fdTracker *fdTracker - procFdCache map[uint64]*file.FdFile // Cache procfs-resolved metadata for unknown fds. - commResolver *commResolver - prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events) - rawHandlers map[types.EventType]rawEventHandler - exitHandlers map[reflect.Type]tracepointExitHandler - printCb func(ep *event.Pair) // Callback to print the event - warningCb func(message string) // Optional callback for non-fatal event processing warnings - cfg eventLoopConfig + filter globalfilter.Filter + enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid. + enterEvAges map[uint32]uint64 + pendingHandles map[uint32]string // map of TID to pathname from name_to_handle_at + fdTracker *fdTracker + procFdCache map[uint64]*file.FdFile // Cache procfs-resolved metadata for unknown fds. + procFdCacheAges map[uint64]uint64 + commResolver *commResolver + prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events) + rawHandlers map[types.EventType]rawEventHandler + exitHandlers map[reflect.Type]tracepointExitHandler + printCb func(ep *event.Pair) // Callback to print the event + warningCb func(message string) // Optional callback for non-fatal event processing warnings + cfg eventLoopConfig + cacheAge uint64 + maxPendingEnterEvs int + maxProcFdCacheSize int // Statistics numTracepoints uint @@ -271,18 +280,20 @@ func newEventLoop(cfg eventLoopConfig) (*eventLoop, error) { } el := &eventLoop{ - filter: cfg.filter.Clone(), - enterEvs: make(map[uint32]*event.Pair), - pendingHandles: make(map[uint32]string), - fdTracker: fdState, - procFdCache: make(map[uint64]*file.FdFile), - commResolver: commState, - prevPairTimes: make(map[uint32]uint64), - rawHandlers: make(map[types.EventType]rawEventHandler), - exitHandlers: make(map[reflect.Type]tracepointExitHandler), - printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, - cfg: cfg, - done: make(chan struct{}), + filter: cfg.filter.Clone(), + enterEvs: make(map[uint32]*event.Pair), + enterEvAges: make(map[uint32]uint64), + pendingHandles: make(map[uint32]string), + fdTracker: fdState, + procFdCache: make(map[uint64]*file.FdFile), + procFdCacheAges: make(map[uint64]uint64), + commResolver: commState, + prevPairTimes: make(map[uint32]uint64), + rawHandlers: make(map[types.EventType]rawEventHandler), + exitHandlers: make(map[reflect.Type]tracepointExitHandler), + printCb: func(ep *event.Pair) { fmt.Println(ep); ep.Recycle() }, + cfg: cfg, + done: make(chan struct{}), } el.initRawHandlers() el.initExitHandlers() @@ -582,17 +593,17 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) { // Schedule comm lookup as early as possible to reduce races for short-lived processes. e.queueCommLookup(tid) if !e.filter.UsesCommFilter() { - e.enterEvs[tid] = event.NewPair(enterEv) + e.setEnterEvent(enterEv) return } switch enterEv.(type) { case *types.OpenEvent: - e.enterEvs[tid] = event.NewPair(enterEv) + e.setEnterEvent(enterEv) default: // Only, when we have a comm name if _, ok := e.cachedComm(tid); ok { - e.enterEvs[tid] = event.NewPair(enterEv) + e.setEnterEvent(enterEv) } else { e.notifyWarning(fmt.Sprintf("No comm name for %v process probably already vanished?", enterEv)) } @@ -600,12 +611,11 @@ func (e *eventLoop) tracepointEntered(enterEv event.Event) { } func (e *eventLoop) tracepointExited(exitEv event.Event, ch chan<- *event.Pair) { - ep, ok := e.enterEvs[exitEv.GetTid()] + ep, ok := e.consumeEnterEvent(exitEv.GetTid()) if !ok { exitEv.Recycle() return } - delete(e.enterEvs, exitEv.GetTid()) ep.ExitEv = exitEv e.numSyscalls++ @@ -1006,16 +1016,23 @@ func (e *eventLoop) resolveFdFile(fd int32, pid uint32) file.File { } func (e *eventLoop) cachedProcFdFile(fd int32, pid uint32) (*file.FdFile, bool) { - cache, ok := e.procFdCacheState()[procFdCacheKey(pid, fd)] + key := procFdCacheKey(pid, fd) + cache, ok := e.procFdCacheState()[key] + if ok { + e.procFdCacheAgeState()[key] = e.nextCacheAge() + } return cache, ok } func (e *eventLoop) setProcFdCache(fd int32, pid uint32, resolved *file.FdFile) { - e.procFdCacheState()[procFdCacheKey(pid, fd)] = resolved + key := procFdCacheKey(pid, fd) + e.procFdCacheState()[key] = resolved + e.procFdCacheAgeState()[key] = e.nextCacheAge() + e.pruneProcFdCache() } func (e *eventLoop) deleteProcFdCache(fd int32, pid uint32) { - delete(e.procFdCacheState(), procFdCacheKey(pid, fd)) + e.deleteProcFdCacheKey(procFdCacheKey(pid, fd)) } func (e *eventLoop) deleteProcFdCacheFrom(first int32, pid uint32) { @@ -1024,7 +1041,7 @@ func (e *eventLoop) deleteProcFdCacheFrom(first int32, pid uint32) { cachePid := uint32(key >> 32) cacheFd := int32(uint32(key)) if cachePid == pid && cacheFd >= first { - delete(cache, key) + e.deleteProcFdCacheKey(key) } } } @@ -1036,6 +1053,143 @@ func (e *eventLoop) procFdCacheState() map[uint64]*file.FdFile { return e.procFdCache } +func (e *eventLoop) procFdCacheAgeState() map[uint64]uint64 { + if e.procFdCacheAges == nil { + e.procFdCacheAges = make(map[uint64]uint64) + } + return e.procFdCacheAges +} + +func (e *eventLoop) enterEventAgeState() map[uint32]uint64 { + if e.enterEvAges == nil { + e.enterEvAges = make(map[uint32]uint64) + } + return e.enterEvAges +} + +func (e *eventLoop) enterEventState() map[uint32]*event.Pair { + if e.enterEvs == nil { + e.enterEvs = make(map[uint32]*event.Pair) + } + return e.enterEvs +} + +func (e *eventLoop) setEnterEvent(enterEv event.Event) { + tid := enterEv.GetTid() + pair := event.NewPair(enterEv) + if prev, ok := e.enterEventState()[tid]; ok && prev != nil { + prev.Recycle() + } + e.enterEventState()[tid] = pair + e.enterEventAgeState()[tid] = e.nextCacheAge() + e.prunePendingEnterEvents() +} + +func (e *eventLoop) consumeEnterEvent(tid uint32) (*event.Pair, bool) { + pair, ok := e.enterEventState()[tid] + if !ok { + return nil, false + } + delete(e.enterEventState(), tid) + delete(e.enterEventAgeState(), tid) + return pair, true +} + +func (e *eventLoop) prunePendingEnterEvents() { + state := e.enterEventState() + limit := e.pendingEnterLimit() + if len(state) <= limit { + return + } + trimOldestPendingPairs(state, e.enterEventAgeState(), trimTarget(limit)) +} + +func trimOldestPendingPairs(state map[uint32]*event.Pair, ages map[uint32]uint64, targetSize int) { + excess := len(state) - targetSize + if excess <= 0 { + return + } + type pendingPairAge struct { + tid uint32 + age uint64 + } + oldest := make([]pendingPairAge, 0, len(state)) + for tid := range state { + age := ages[tid] + oldest = append(oldest, pendingPairAge{tid: tid, age: age}) + } + sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age }) + for _, entry := range oldest[:excess] { + if pair, ok := state[entry.tid]; ok && pair != nil { + pair.Recycle() + } + delete(state, entry.tid) + delete(ages, entry.tid) + } +} + +func (e *eventLoop) pruneProcFdCache() { + state := e.procFdCacheState() + limit := e.procFdCacheLimit() + if len(state) <= limit { + return + } + trimOldestProcFdEntries(state, e.procFdCacheAgeState(), trimTarget(limit)) +} + +func trimOldestProcFdEntries(state map[uint64]*file.FdFile, ages map[uint64]uint64, targetSize int) { + excess := len(state) - targetSize + if excess <= 0 { + return + } + type procFdAge struct { + key uint64 + age uint64 + } + oldest := make([]procFdAge, 0, len(state)) + for key := range state { + age := ages[key] + oldest = append(oldest, procFdAge{key: key, age: age}) + } + sort.Slice(oldest, func(i, j int) bool { return oldest[i].age < oldest[j].age }) + for _, entry := range oldest[:excess] { + delete(state, entry.key) + delete(ages, entry.key) + } +} + +func (e *eventLoop) deleteProcFdCacheKey(key uint64) { + delete(e.procFdCacheState(), key) + delete(e.procFdCacheAgeState(), key) +} + +func (e *eventLoop) nextCacheAge() uint64 { + e.cacheAge++ + return e.cacheAge +} + +func (e *eventLoop) pendingEnterLimit() int { + if e.maxPendingEnterEvs > 0 { + return e.maxPendingEnterEvs + } + return defaultMaxPendingEnterEvs +} + +func (e *eventLoop) procFdCacheLimit() int { + if e.maxProcFdCacheSize > 0 { + return e.maxProcFdCacheSize + } + return defaultMaxProcFdCacheSize +} + +func trimTarget(limit int) int { + target := limit - limit/cacheTrimDivisor + if target < 1 { + return 1 + } + return target +} + func procFdCacheKey(pid uint32, fd int32) uint64 { return uint64(pid)<<32 | uint64(uint32(fd)) } |
