From 8a34be1f4fffca90d74e2092c7bc5a6af02392c4 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 29 Mar 2025 11:25:27 +0200 Subject: fix --- internal/event/event.go | 19 +++------- internal/eventloop.go | 40 ++++++++++----------- internal/flags/flags.go | 44 +++++++++++++++-------- internal/flamegraph/collapsed.go | 25 ++++++------- internal/flamegraph/counter.go | 14 ++++++++ internal/flamegraph/flamegraph.go | 25 ++++++++----- internal/flamegraph/iordata.go | 75 ++++++++++++++++++++++++++++----------- internal/flamegraph/worker.go | 71 ++++++++++++++++++++++++------------ internal/ior.go | 2 +- 9 files changed, 200 insertions(+), 115 deletions(-) create mode 100644 internal/flamegraph/counter.go diff --git a/internal/event/event.go b/internal/event/event.go index 8e5c1db..dfff270 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -30,7 +30,6 @@ type Pair struct { Duration uint64 // To calculate the time difference from the previoud event. - PrevPair *Pair durationToPrev uint64 } @@ -40,11 +39,10 @@ func NewPair(enterEv Event) *Pair { return e } -func (e *Pair) CalculateDurations() { +func (e *Pair) CalculateDurations(prevPairTime uint64) { e.Duration = e.ExitEv.GetTime() - e.EnterEv.GetTime() - - if e.PrevPair != nil { - e.durationToPrev = e.EnterEv.GetTime() - e.PrevPair.ExitEv.GetTime() + if prevPairTime > 0 { + e.durationToPrev = e.EnterEv.GetTime() - prevPairTime } } @@ -92,15 +90,6 @@ func (e *Pair) Dump() string { func (e *Pair) Recycle() { e.EnterEv.Recycle() e.ExitEv.Recycle() - e.PrevPair = nil + e.durationToPrev = 0 poolOfEventPairs.Put(e) } - -// Only recycle the previous event, as the current event is the previous event of the next event! -// And the previous event is required for calculation of durationToPrev! -func (e *Pair) RecyclePrev() { - if e.PrevPair == nil { - return - } - e.PrevPair.Recycle() -} diff --git a/internal/eventloop.go b/internal/eventloop.go index b2392ab..4e9a3cb 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -20,13 +20,13 @@ import ( // TOOD: read and write syscalls: can also collect amount of bytes! type eventLoop struct { - flags flags.Flags - filter *eventFilter - enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid. - files map[int32]file.File // Track all open files by file descriptor.. - comms map[uint32]string // Program or thread name of the current Tid. - prevPairs map[uint32]*event.Pair // Previous event (to calculate time differences between two events) - flamegraph flamegraph.Flamegraph // Storing all paths in a map structure for analysis + flags flags.Flags + filter *eventFilter + enterEvs map[uint32]*event.Pair // Temp. store of sys_enter tracepoints per Tid. + files map[int32]file.File // Track all open files by file descriptor.. + comms map[uint32]string // Program or thread name of the current Tid. + prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events) + flamegraph flamegraph.Flamegraph // Storing all paths in a map structure for analysis // Statistics numTracepoints uint @@ -39,14 +39,14 @@ type eventLoop struct { func newEventLoop(flags flags.Flags) *eventLoop { return &eventLoop{ - flags: flags, - filter: newEventFilter(flags), - enterEvs: make(map[uint32]*event.Pair), - files: make(map[int32]file.File), - comms: make(map[uint32]string), - prevPairs: make(map[uint32]*event.Pair), - flamegraph: flamegraph.New(), - done: make(chan struct{}), + flags: flags, + filter: newEventFilter(flags), + enterEvs: make(map[uint32]*event.Pair), + files: make(map[int32]file.File), + comms: make(map[uint32]string), + prevPairTimes: make(map[uint32]uint64), + flamegraph: flamegraph.New(flags), + done: make(chan struct{}), } } @@ -90,10 +90,10 @@ func (e *eventLoop) run(ctx context.Context, rawCh <-chan []byte) { case e.flags.FlamegraphEnable: e.flamegraph.Ch <- ev case e.flags.PprofEnable: - ev.RecyclePrev() + ev.Recycle() default: fmt.Println(ev.String()) - ev.RecyclePrev() + ev.Recycle() } e.numSyscallsAfterFilter++ } @@ -325,9 +325,9 @@ func (e *eventLoop) syscallExit(exitEv event.Event, ch chan<- *event.Pair) { // TODO: fallocate // TODO: https://man7.org/linux/man-pages/man2/io_uring_enter.2.html (already captured but without FDs) - ev.PrevPair, _ = e.prevPairs[ev.EnterEv.GetTid()] - ev.CalculateDurations() - e.prevPairs[ev.EnterEv.GetTid()] = ev + prevPairTime, _ := e.prevPairTimes[ev.EnterEv.GetTid()] + ev.CalculateDurations(prevPairTime) + e.prevPairTimes[ev.EnterEv.GetTid()] = ev.ExitEv.GetTime() ch <- ev } diff --git a/internal/flags/flags.go b/internal/flags/flags.go index 7a77ae7..a85c838 100644 --- a/internal/flags/flags.go +++ b/internal/flags/flags.go @@ -11,16 +11,21 @@ import ( ) type Flags struct { - PidFilter int - TidFilter int - EventMapSize int - CommFilter string - PathFilter string - PprofEnable bool - FlamegraphEnable bool - Duration int + PidFilter int + TidFilter int + EventMapSize int + CommFilter string + PathFilter string + PprofEnable bool + Duration int + + // Tracepints flags TracepointsToAttach []*regexp.Regexp TracepointsToExclude []*regexp.Regexp + + // Flamegraph flags + FlamegraphEnable bool + FlamegraphName string // If set, enables new style iorData output, TODO: remove comment once old style collapsed format is retired } func New() (flags Flags) { @@ -33,20 +38,26 @@ func New() (flags Flags) { flag.StringVar(&flags.PathFilter, "path", "", "Path to filter for") flag.BoolVar(&flags.PprofEnable, "pprof", false, "Enable profiling") - flag.BoolVar(&flags.FlamegraphEnable, "flamegraph", false, "Enable flamegraph builder") tracepointsToAttach := flag.String("tps", "", "Comma separated list regexes for tracepoints to load") tracepointsToExclude := flag.String("tpsExclude", "", "Comma separated list regexes for tracepoints to exclude") + + flag.BoolVar(&flags.FlamegraphEnable, "flamegraph", false, "Enable flamegraph builder") + flag.StringVar(&flags.FlamegraphName, "name", "", "Name of the flamegraph data output") + flag.Parse() - flags.TracepointsToAttach = extractTracepointFlags(tracepointsToAttach) - flags.TracepointsToExclude = extractTracepointFlags(tracepointsToExclude) + flags.TracepointsToAttach = extractTracepointFlags(*tracepointsToAttach) + flags.TracepointsToExclude = extractTracepointFlags(*tracepointsToExclude) return flags } -func extractTracepointFlags(tracepoints *string) (regexes []*regexp.Regexp) { - for _, name := range strings.Split(*tracepoints, ",") { +func extractTracepointFlags(tracepoints string) (regexes []*regexp.Regexp) { + if len(tracepoints) == 0 { + return regexes + } + for _, name := range strings.Split(tracepoints, ",") { re, err := regexp.Compile(name) if err != nil { fmt.Println("Unable to compile regex", name, ": ", err) @@ -57,20 +68,25 @@ func extractTracepointFlags(tracepoints *string) (regexes []*regexp.Regexp) { return regexes } -func (flags Flags) AttachTracepoint(tracepointName string) bool { +func (flags Flags) ShouldIAttachTracepoint(tracepointName string) bool { for _, re := range flags.TracepointsToExclude { if re.MatchString(tracepointName) { + fmt.Println("Not attaching", tracepointName, "as excluded") return false } } if len(flags.TracepointsToAttach) == 0 { + fmt.Println("Attaching", tracepointName, "as none are explicitly incluced") return true } for _, re := range flags.TracepointsToAttach { if re.MatchString(tracepointName) { + fmt.Println("Attaching", tracepointName, "as included") return true } } + + fmt.Println("Not attaching", tracepointName, "as not includedd") return false } diff --git a/internal/flamegraph/collapsed.go b/internal/flamegraph/collapsed.go index 57e5ac0..60c0b7f 100644 --- a/internal/flamegraph/collapsed.go +++ b/internal/flamegraph/collapsed.go @@ -8,28 +8,23 @@ import ( "sync" ) -type counter struct { +type collapsedCounter struct { count uint64 duration uint64 } -func (c *counter) add(other counter) { +func (c *collapsedCounter) add(other collapsedCounter) { c.count += other.count c.duration += other.duration } -// TODO: make it generic, generate multiple trace points -// path, traceid (syscall name), comm, pid, tid -// traceid, path is by default set in this order -// store an intermediate format which then can be converted to the others... -// e.g. path ¶ traceid ¶ comm ¶ pid ¶ tid ¶ flags ¶ counter -// counter can also have bytes (for reads and writes) -type collapsed map[string]map[types.TraceId]counter +// TODO: Clean up all code commented with COLLAPSED once collapsed here retired. +type collapsed map[string]map[types.TraceId]collapsedCounter func (c collapsed) merge(other collapsed) (merged int) { for k, v := range other { if _, ok := c[k]; !ok { - c[k] = make(map[types.TraceId]counter) + c[k] = make(map[types.TraceId]collapsedCounter) } for traceId, cnt := range v { if existingCnt, ok := c[k][traceId]; ok { @@ -48,23 +43,23 @@ func (c collapsed) dump() { var wg sync.WaitGroup wg.Add(4) - go c.dumpBy(&wg, "ior-by-path-count-flamegraph.collapsed", true, func(cnt counter) uint64 { + go c.dumpBy(&wg, "ior-by-path-count-flamegraph.collapsed", true, func(cnt collapsedCounter) uint64 { return cnt.count }) - go c.dumpBy(&wg, "ior-by-path-duration-flamegraph.collapsed", true, func(cnt counter) uint64 { + go c.dumpBy(&wg, "ior-by-path-duration-flamegraph.collapsed", true, func(cnt collapsedCounter) uint64 { return cnt.duration }) - go c.dumpBy(&wg, "ior-by-syscall-count-flamegraph.collapsed", false, func(cnt counter) uint64 { + go c.dumpBy(&wg, "ior-by-syscall-count-flamegraph.collapsed", false, func(cnt collapsedCounter) uint64 { return cnt.count }) - go c.dumpBy(&wg, "ior-by-syscall-duration-flamegraph.collapsed", false, func(cnt counter) uint64 { + go c.dumpBy(&wg, "ior-by-syscall-duration-flamegraph.collapsed", false, func(cnt collapsedCounter) uint64 { return cnt.duration }) wg.Wait() } -func (c collapsed) dumpBy(wg *sync.WaitGroup, outfile string, syscallAtTop bool, by func(counter) uint64) { +func (c collapsed) dumpBy(wg *sync.WaitGroup, outfile string, syscallAtTop bool, by func(collapsedCounter) uint64) { defer wg.Done() defer fmt.Println("Dumping done") diff --git a/internal/flamegraph/counter.go b/internal/flamegraph/counter.go new file mode 100644 index 0000000..4a63f50 --- /dev/null +++ b/internal/flamegraph/counter.go @@ -0,0 +1,14 @@ +package flamegraph + +type counter struct { + count uint64 + duration uint64 + durationToPrev uint64 +} + +func (c counter) add(other counter) counter { + c.count += other.count + c.duration += other.duration + c.durationToPrev += other.durationToPrev + return c +} diff --git a/internal/flamegraph/flamegraph.go b/internal/flamegraph/flamegraph.go index d5d7e96..9e1e14b 100644 --- a/internal/flamegraph/flamegraph.go +++ b/internal/flamegraph/flamegraph.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "ior/internal/event" + "ior/internal/flags" "runtime" "sync" ) @@ -11,12 +12,13 @@ import ( // TODO: Add Command in path! Make it configurable? comm/syscall/path, or path/syscall/comm, etc... // TODO: Idea, show time spent between the syscalls (off syscalls) as well, but in a different color type Flamegraph struct { + flags flags.Flags Ch chan *event.Pair Done chan struct{} workers []worker } -func New() Flamegraph { +func New(flags flags.Flags) Flamegraph { f := Flamegraph{ Ch: make(chan *event.Pair, 4096), Done: make(chan struct{}), @@ -39,17 +41,24 @@ func (f Flamegraph) Start(ctx context.Context) { for i, worker := range f.workers { fmt.Println("Starting flamegraph worker", i) - go worker.run(ctx, &wg, f.Ch) + if f.flags.FlamegraphName == "" { // Empty string means: old style collapsed + go worker.runCollapsed(ctx, &wg, f.Ch) + } else { + go worker.run(ctx, &wg, f.Ch) + } } wg.Wait() - collapsed := f.workers[0].collapsed - if len(f.workers) > 1 { - for i, c := range f.workers[1:] { - fmt.Println("Worker", i+1, "merged", collapsed.merge(c.collapsed), - "counters =>", len(collapsed), "total counters") + // COLLAPSED: Will be removed, once migrated to iorData + if f.flags.FlamegraphName == "" { // Empty string means: old style collapsed + collapsed := f.workers[0].collapsed + if len(f.workers) > 1 { + for i, c := range f.workers[1:] { + fmt.Println("Worker", i+1, "merged", collapsed.merge(c.collapsed), + "counters =>", len(collapsed), "total counters") + } } + collapsed.dump() } - collapsed.dump() }() } diff --git a/internal/flamegraph/iordata.go b/internal/flamegraph/iordata.go index 26e5749..80571d9 100644 --- a/internal/flamegraph/iordata.go +++ b/internal/flamegraph/iordata.go @@ -3,6 +3,7 @@ package flamegraph import ( "encoding/json" "fmt" + "ior/internal/event" "ior/internal/types" "os" "time" @@ -16,39 +17,73 @@ type traceIdType = types.TraceId type commType = string type pidType = uint32 type tidType = uint32 -type flagsType = int32 +type flagsType = string type pathMap map[pathType]map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]counter type iorData struct{ paths pathMap } -# TODO: Flag to enable iorData -# TODO: Name flag for iorData -# TODO: Output path for iorData flag +// TODO: Flag to enable iorData +// TODO: Name flag for iorData (outfile format: hostname-name-timestamp.ior.zst) +// TODO: Output path for iorData flag +// TODO: Add helper to convert .ior data file to collapsed format func newIorData() iorData { return iorData{paths: make(pathMap)} } -func (id iorData) addPath(path pathType, traceId traceIdType, comm commType, pid pidType, tid tidType, flags flagsType, cnt counter) { - if _, ok := id.paths[path]; !ok { - id.paths[path] = make(map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]counter) +func (iod iorData) add(ev *event.Pair) { + // type Pair struct { + // EnterEv, ExitEv Event + // File file.File + // Comm string + // Duration uint64 + + // // To calculate the time difference from the previoud event. + // PrevPair *Pair + // durationToPrev uint64 + // } + // TODO: Add duration to prev to counter + cnt := counter{ + count: 1, + duration: ev.Duration, + } + iod.addPath(ev.File.Name(), ev.EnterEv.GetTraceId(), ev.Comm, + ev.EnterEv.GetPid(), ev.EnterEv.GetTid(), ev.File.FlagsString(), cnt) +} + +func (iod iorData) addPath(path pathType, traceId traceIdType, comm commType, + pid pidType, tid tidType, flags flagsType, addCnt counter) { + + pathMap, ok := iod.paths[path] + if !ok { + pathMap = make(map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]counter) + iod.paths[path] = pathMap } - if _, ok := id.paths[path][traceId]; !ok { - id.paths[path][traceId] = make(map[commType]map[pidType]map[tidType]map[flagsType]counter) + traceIdMap, ok := iod.paths[path][traceId] + if !ok { + traceIdMap = make(map[commType]map[pidType]map[tidType]map[flagsType]counter) + iod.paths[path][traceId] = traceIdMap } - if _, ok := id.paths[path][traceId][comm]; !ok { - id.paths[path][traceId][comm] = make(map[pidType]map[tidType]map[flagsType]counter) + commMap, ok := iod.paths[path][traceId][comm] + if !ok { + commMap = make(map[pidType]map[tidType]map[flagsType]counter) + iod.paths[path][traceId][comm] = commMap } - if _, ok := id.paths[path][traceId][comm][pid]; !ok { - id.paths[path][traceId][comm][pid] = make(map[tidType]map[flagsType]counter) + pidMap, ok := iod.paths[path][traceId][comm][pid] + if !ok { + pidMap = make(map[tidType]map[flagsType]counter) + iod.paths[path][traceId][comm][pid] = pidMap } - if _, ok := id.paths[path][traceId][comm][pid][tid]; !ok { - id.paths[path][traceId][comm][pid][tid] = make(map[flagsType]counter) + tidMap, ok := iod.paths[path][traceId][comm][pid][tid] + if !ok { + tidMap = make(map[flagsType]counter) + iod.paths[path][traceId][comm][pid][tid] = tidMap } - if _, ok := id.paths[path][traceId][comm][pid][tid][flags]; !ok { - id.paths[path][traceId][comm][pid][tid][flags] = cnt + cnt, ok := iod.paths[path][traceId][comm][pid][tid][flags] + if !ok { + iod.paths[path][traceId][comm][pid][tid][flags] = addCnt } else { - // iorData.paths[path][traceId][comm][pid][tid][flags] += cnt + iod.paths[path][traceId][comm][pid][tid][flags] = cnt.add(addCnt) } } -func (id iorData) commit() error { +func (iod iorData) commit() error { currentTime := time.Now().Format("2006-01-02_15:04:05") hostname, err := os.Hostname() if err != nil { @@ -62,5 +97,5 @@ func (id iorData) commit() error { defer file.Close() encoder := json.NewEncoder(file) - return encoder.Encode(id.paths) + return encoder.Encode(iod.paths) } diff --git a/internal/flamegraph/worker.go b/internal/flamegraph/worker.go index d10e0bf..6c2b9cd 100644 --- a/internal/flamegraph/worker.go +++ b/internal/flamegraph/worker.go @@ -10,42 +10,29 @@ import ( type worker struct { collapsed collapsed - id iorData + iod iorData done chan struct{} } func newWorker() worker { return worker{ - collapsed: make(collapsed), // TODO: Retire - id: newIorData(), // TODO: Implement fully + collapsed: make(collapsed), // COLLAPSED: Retire ocne newIorData implemented + iod: newIorData(), } } -// Run until ch is closed or has no more events and ctx is done. func (w worker) run(ctx context.Context, wg *sync.WaitGroup, ch <-chan *event.Pair) { defer wg.Done() for { select { case ev := <-ch: - var filePath string - if ev.File == nil { - filePath = "N:file" - } else { - filePath = ev.File.Name() - } - pathMap, ok := w.collapsed[filePath] - if !ok { - pathMap = make(map[types.TraceId]counter) - } - - traceId := ev.EnterEv.GetTraceId() - cnt := pathMap[traceId] - cnt.count++ - cnt.duration += ev.Duration - pathMap[traceId] = cnt - - w.collapsed[filePath] = pathMap + // var filePath string + // if ev.File == nil { + // filePath = "N:file" + // } else { + // filePath = ev.File.Name() + // } ev.Recycle() default: @@ -58,3 +45,43 @@ func (w worker) run(ctx context.Context, wg *sync.WaitGroup, ch <-chan *event.Pa } } } + +// TODO: Retire collapsed +func (w worker) runCollapsed(ctx context.Context, wg *sync.WaitGroup, ch <-chan *event.Pair) { + { + defer wg.Done() + + for { + select { + case ev := <-ch: + var filePath string + if ev.File == nil { + filePath = "N:file" + } else { + filePath = ev.File.Name() + } + pathMap, ok := w.collapsed[filePath] + if !ok { + pathMap = make(map[types.TraceId]collapsedCounter) + } + + traceId := ev.EnterEv.GetTraceId() + cnt := pathMap[traceId] + cnt.count++ + cnt.duration += ev.Duration + pathMap[traceId] = cnt + + w.collapsed[filePath] = pathMap + ev.Recycle() + + default: + select { + case <-ctx.Done(): + return + default: + time.Sleep(time.Millisecond * 10) + } + } + } + } +} diff --git a/internal/ior.go b/internal/ior.go index 68a6afb..7bf96bc 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -21,7 +21,7 @@ import ( // TODO: Integration tests, write C or Cgo code to simulate I/O? func attachTracepoints(flags flags.Flags, bpfModule *bpf.Module) error { for _, name := range tracepoints.List { - if !flags.AttachTracepoint(name) { + if !flags.ShouldIAttachTracepoint(name) { continue } fmt.Println("Attaching tracepoint", name) -- cgit v1.2.3