From 8a34be1f4fffca90d74e2092c7bc5a6af02392c4 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 29 Mar 2025 11:25:27 +0200 Subject: fix --- internal/flamegraph/collapsed.go | 25 ++++++------- internal/flamegraph/counter.go | 14 ++++++++ internal/flamegraph/flamegraph.go | 25 ++++++++----- internal/flamegraph/iordata.go | 75 ++++++++++++++++++++++++++++----------- internal/flamegraph/worker.go | 71 ++++++++++++++++++++++++------------ 5 files changed, 145 insertions(+), 65 deletions(-) create mode 100644 internal/flamegraph/counter.go (limited to 'internal/flamegraph') diff --git a/internal/flamegraph/collapsed.go b/internal/flamegraph/collapsed.go index 57e5ac0..60c0b7f 100644 --- a/internal/flamegraph/collapsed.go +++ b/internal/flamegraph/collapsed.go @@ -8,28 +8,23 @@ import ( "sync" ) -type counter struct { +type collapsedCounter struct { count uint64 duration uint64 } -func (c *counter) add(other counter) { +func (c *collapsedCounter) add(other collapsedCounter) { c.count += other.count c.duration += other.duration } -// TODO: make it generic, generate multiple trace points -// path, traceid (syscall name), comm, pid, tid -// traceid, path is by default set in this order -// store an intermediate format which then can be converted to the others... -// e.g. path ¶ traceid ¶ comm ¶ pid ¶ tid ¶ flags ¶ counter -// counter can also have bytes (for reads and writes) -type collapsed map[string]map[types.TraceId]counter +// TODO: Clean up all code commented with COLLAPSED once collapsed here retired. +type collapsed map[string]map[types.TraceId]collapsedCounter func (c collapsed) merge(other collapsed) (merged int) { for k, v := range other { if _, ok := c[k]; !ok { - c[k] = make(map[types.TraceId]counter) + c[k] = make(map[types.TraceId]collapsedCounter) } for traceId, cnt := range v { if existingCnt, ok := c[k][traceId]; ok { @@ -48,23 +43,23 @@ func (c collapsed) dump() { var wg sync.WaitGroup wg.Add(4) - go c.dumpBy(&wg, "ior-by-path-count-flamegraph.collapsed", true, func(cnt counter) uint64 { + go c.dumpBy(&wg, "ior-by-path-count-flamegraph.collapsed", true, func(cnt collapsedCounter) uint64 { return cnt.count }) - go c.dumpBy(&wg, "ior-by-path-duration-flamegraph.collapsed", true, func(cnt counter) uint64 { + go c.dumpBy(&wg, "ior-by-path-duration-flamegraph.collapsed", true, func(cnt collapsedCounter) uint64 { return cnt.duration }) - go c.dumpBy(&wg, "ior-by-syscall-count-flamegraph.collapsed", false, func(cnt counter) uint64 { + go c.dumpBy(&wg, "ior-by-syscall-count-flamegraph.collapsed", false, func(cnt collapsedCounter) uint64 { return cnt.count }) - go c.dumpBy(&wg, "ior-by-syscall-duration-flamegraph.collapsed", false, func(cnt counter) uint64 { + go c.dumpBy(&wg, "ior-by-syscall-duration-flamegraph.collapsed", false, func(cnt collapsedCounter) uint64 { return cnt.duration }) wg.Wait() } -func (c collapsed) dumpBy(wg *sync.WaitGroup, outfile string, syscallAtTop bool, by func(counter) uint64) { +func (c collapsed) dumpBy(wg *sync.WaitGroup, outfile string, syscallAtTop bool, by func(collapsedCounter) uint64) { defer wg.Done() defer fmt.Println("Dumping done") diff --git a/internal/flamegraph/counter.go b/internal/flamegraph/counter.go new file mode 100644 index 0000000..4a63f50 --- /dev/null +++ b/internal/flamegraph/counter.go @@ -0,0 +1,14 @@ +package flamegraph + +type counter struct { + count uint64 + duration uint64 + durationToPrev uint64 +} + +func (c counter) add(other counter) counter { + c.count += other.count + c.duration += other.duration + c.durationToPrev += other.durationToPrev + return c +} diff --git a/internal/flamegraph/flamegraph.go b/internal/flamegraph/flamegraph.go index d5d7e96..9e1e14b 100644 --- a/internal/flamegraph/flamegraph.go +++ b/internal/flamegraph/flamegraph.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "ior/internal/event" + "ior/internal/flags" "runtime" "sync" ) @@ -11,12 +12,13 @@ import ( // TODO: Add Command in path! Make it configurable? comm/syscall/path, or path/syscall/comm, etc... // TODO: Idea, show time spent between the syscalls (off syscalls) as well, but in a different color type Flamegraph struct { + flags flags.Flags Ch chan *event.Pair Done chan struct{} workers []worker } -func New() Flamegraph { +func New(flags flags.Flags) Flamegraph { f := Flamegraph{ Ch: make(chan *event.Pair, 4096), Done: make(chan struct{}), @@ -39,17 +41,24 @@ func (f Flamegraph) Start(ctx context.Context) { for i, worker := range f.workers { fmt.Println("Starting flamegraph worker", i) - go worker.run(ctx, &wg, f.Ch) + if f.flags.FlamegraphName == "" { // Empty string means: old style collapsed + go worker.runCollapsed(ctx, &wg, f.Ch) + } else { + go worker.run(ctx, &wg, f.Ch) + } } wg.Wait() - collapsed := f.workers[0].collapsed - if len(f.workers) > 1 { - for i, c := range f.workers[1:] { - fmt.Println("Worker", i+1, "merged", collapsed.merge(c.collapsed), - "counters =>", len(collapsed), "total counters") + // COLLAPSED: Will be removed, once migrated to iorData + if f.flags.FlamegraphName == "" { // Empty string means: old style collapsed + collapsed := f.workers[0].collapsed + if len(f.workers) > 1 { + for i, c := range f.workers[1:] { + fmt.Println("Worker", i+1, "merged", collapsed.merge(c.collapsed), + "counters =>", len(collapsed), "total counters") + } } + collapsed.dump() } - collapsed.dump() }() } diff --git a/internal/flamegraph/iordata.go b/internal/flamegraph/iordata.go index 26e5749..80571d9 100644 --- a/internal/flamegraph/iordata.go +++ b/internal/flamegraph/iordata.go @@ -3,6 +3,7 @@ package flamegraph import ( "encoding/json" "fmt" + "ior/internal/event" "ior/internal/types" "os" "time" @@ -16,39 +17,73 @@ type traceIdType = types.TraceId type commType = string type pidType = uint32 type tidType = uint32 -type flagsType = int32 +type flagsType = string type pathMap map[pathType]map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]counter type iorData struct{ paths pathMap } -# TODO: Flag to enable iorData -# TODO: Name flag for iorData -# TODO: Output path for iorData flag +// TODO: Flag to enable iorData +// TODO: Name flag for iorData (outfile format: hostname-name-timestamp.ior.zst) +// TODO: Output path for iorData flag +// TODO: Add helper to convert .ior data file to collapsed format func newIorData() iorData { return iorData{paths: make(pathMap)} } -func (id iorData) addPath(path pathType, traceId traceIdType, comm commType, pid pidType, tid tidType, flags flagsType, cnt counter) { - if _, ok := id.paths[path]; !ok { - id.paths[path] = make(map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]counter) +func (iod iorData) add(ev *event.Pair) { + // type Pair struct { + // EnterEv, ExitEv Event + // File file.File + // Comm string + // Duration uint64 + + // // To calculate the time difference from the previoud event. + // PrevPair *Pair + // durationToPrev uint64 + // } + // TODO: Add duration to prev to counter + cnt := counter{ + count: 1, + duration: ev.Duration, + } + iod.addPath(ev.File.Name(), ev.EnterEv.GetTraceId(), ev.Comm, + ev.EnterEv.GetPid(), ev.EnterEv.GetTid(), ev.File.FlagsString(), cnt) +} + +func (iod iorData) addPath(path pathType, traceId traceIdType, comm commType, + pid pidType, tid tidType, flags flagsType, addCnt counter) { + + pathMap, ok := iod.paths[path] + if !ok { + pathMap = make(map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]counter) + iod.paths[path] = pathMap } - if _, ok := id.paths[path][traceId]; !ok { - id.paths[path][traceId] = make(map[commType]map[pidType]map[tidType]map[flagsType]counter) + traceIdMap, ok := iod.paths[path][traceId] + if !ok { + traceIdMap = make(map[commType]map[pidType]map[tidType]map[flagsType]counter) + iod.paths[path][traceId] = traceIdMap } - if _, ok := id.paths[path][traceId][comm]; !ok { - id.paths[path][traceId][comm] = make(map[pidType]map[tidType]map[flagsType]counter) + commMap, ok := iod.paths[path][traceId][comm] + if !ok { + commMap = make(map[pidType]map[tidType]map[flagsType]counter) + iod.paths[path][traceId][comm] = commMap } - if _, ok := id.paths[path][traceId][comm][pid]; !ok { - id.paths[path][traceId][comm][pid] = make(map[tidType]map[flagsType]counter) + pidMap, ok := iod.paths[path][traceId][comm][pid] + if !ok { + pidMap = make(map[tidType]map[flagsType]counter) + iod.paths[path][traceId][comm][pid] = pidMap } - if _, ok := id.paths[path][traceId][comm][pid][tid]; !ok { - id.paths[path][traceId][comm][pid][tid] = make(map[flagsType]counter) + tidMap, ok := iod.paths[path][traceId][comm][pid][tid] + if !ok { + tidMap = make(map[flagsType]counter) + iod.paths[path][traceId][comm][pid][tid] = tidMap } - if _, ok := id.paths[path][traceId][comm][pid][tid][flags]; !ok { - id.paths[path][traceId][comm][pid][tid][flags] = cnt + cnt, ok := iod.paths[path][traceId][comm][pid][tid][flags] + if !ok { + iod.paths[path][traceId][comm][pid][tid][flags] = addCnt } else { - // iorData.paths[path][traceId][comm][pid][tid][flags] += cnt + iod.paths[path][traceId][comm][pid][tid][flags] = cnt.add(addCnt) } } -func (id iorData) commit() error { +func (iod iorData) commit() error { currentTime := time.Now().Format("2006-01-02_15:04:05") hostname, err := os.Hostname() if err != nil { @@ -62,5 +97,5 @@ func (id iorData) commit() error { defer file.Close() encoder := json.NewEncoder(file) - return encoder.Encode(id.paths) + return encoder.Encode(iod.paths) } diff --git a/internal/flamegraph/worker.go b/internal/flamegraph/worker.go index d10e0bf..6c2b9cd 100644 --- a/internal/flamegraph/worker.go +++ b/internal/flamegraph/worker.go @@ -10,42 +10,29 @@ import ( type worker struct { collapsed collapsed - id iorData + iod iorData done chan struct{} } func newWorker() worker { return worker{ - collapsed: make(collapsed), // TODO: Retire - id: newIorData(), // TODO: Implement fully + collapsed: make(collapsed), // COLLAPSED: Retire ocne newIorData implemented + iod: newIorData(), } } -// Run until ch is closed or has no more events and ctx is done. func (w worker) run(ctx context.Context, wg *sync.WaitGroup, ch <-chan *event.Pair) { defer wg.Done() for { select { case ev := <-ch: - var filePath string - if ev.File == nil { - filePath = "N:file" - } else { - filePath = ev.File.Name() - } - pathMap, ok := w.collapsed[filePath] - if !ok { - pathMap = make(map[types.TraceId]counter) - } - - traceId := ev.EnterEv.GetTraceId() - cnt := pathMap[traceId] - cnt.count++ - cnt.duration += ev.Duration - pathMap[traceId] = cnt - - w.collapsed[filePath] = pathMap + // var filePath string + // if ev.File == nil { + // filePath = "N:file" + // } else { + // filePath = ev.File.Name() + // } ev.Recycle() default: @@ -58,3 +45,43 @@ func (w worker) run(ctx context.Context, wg *sync.WaitGroup, ch <-chan *event.Pa } } } + +// TODO: Retire collapsed +func (w worker) runCollapsed(ctx context.Context, wg *sync.WaitGroup, ch <-chan *event.Pair) { + { + defer wg.Done() + + for { + select { + case ev := <-ch: + var filePath string + if ev.File == nil { + filePath = "N:file" + } else { + filePath = ev.File.Name() + } + pathMap, ok := w.collapsed[filePath] + if !ok { + pathMap = make(map[types.TraceId]collapsedCounter) + } + + traceId := ev.EnterEv.GetTraceId() + cnt := pathMap[traceId] + cnt.count++ + cnt.duration += ev.Duration + pathMap[traceId] = cnt + + w.collapsed[filePath] = pathMap + ev.Recycle() + + default: + select { + case <-ctx.Done(): + return + default: + time.Sleep(time.Millisecond * 10) + } + } + } + } +} -- cgit v1.2.3