From 017494938f061fd1276f2a54b1df0e7002655e9f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 10 Apr 2025 21:28:45 +0300 Subject: can serialize and deserialize to/from gob --- internal/eventloop.go | 2 +- internal/flags/flags.go | 29 ++++++++++ internal/flamegraph/counter.go | 20 +++---- internal/flamegraph/flamegraph.go | 60 --------------------- internal/flamegraph/iordata.go | 95 ++++++++++++++++++++++----------- internal/flamegraph/iordata_test.go | 48 ++++++++--------- internal/flamegraph/iordatacollector.go | 60 +++++++++++++++++++++ internal/flamegraph/worker.go | 2 +- internal/ior.go | 11 ++++ 9 files changed, 199 insertions(+), 128 deletions(-) delete mode 100644 internal/flamegraph/flamegraph.go create mode 100644 internal/flamegraph/iordatacollector.go (limited to 'internal') diff --git a/internal/eventloop.go b/internal/eventloop.go index 217d880..5ff2b45 100644 --- a/internal/eventloop.go +++ b/internal/eventloop.go @@ -25,7 +25,7 @@ type eventLoop struct { files map[int32]file.File // Track all open files by file descriptor.. comms map[uint32]string // Program or thread name of the current Tid. prevPairTimes map[uint32]uint64 // Previous event's time (to calculate time differences between two events) - flamegraph flamegraph.Flamegraph // Storing all paths in a map structure for analysis + flamegraph flamegraph.IorDataCollector // Storing all paths in a map structure for analysis // Statistics numTracepoints uint diff --git a/internal/flags/flags.go b/internal/flags/flags.go index 91bceb3..890c18f 100644 --- a/internal/flags/flags.go +++ b/internal/flags/flags.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "regexp" + "slices" "strings" "sync" @@ -14,6 +15,14 @@ import ( var singleton Flags var once sync.Once +var validCollapsedFields = []string{ + "path", + "comm", + "tracepoint", + "pid", "tid", + "count", +} + func Get() Flags { return singleton } @@ -34,6 +43,10 @@ type Flags struct { // Flamegraph flags FlamegraphEnable bool FlamegraphName string + + // To convert ior data into collapsed format + IorDataFile string + CollapsedFields []string } func Parse() { @@ -58,10 +71,26 @@ func parse() { flag.BoolVar(&singleton.FlamegraphEnable, "flamegraph", false, "Enable flamegraph builder") flag.StringVar(&singleton.FlamegraphName, "name", "foo", "Name of the flamegraph data output") + + flag.StringVar(&singleton.IorDataFile, "ior", "", "IOR data file to convert into collapsed format") + fields := flag.String("fields", "", "Comma separated list of fields to collapse") flag.Parse() singleton.TracepointsToAttach = extractTracepointFlags(*tracepointsToAttach) singleton.TracepointsToExclude = extractTracepointFlags(*tracepointsToExclude) + + if *fields == "" { + singleton.CollapsedFields = []string{"path", "tracepoint", "count"} + } else { + singleton.CollapsedFields = strings.Split(*fields, ",") + } + + for _, field := range singleton.CollapsedFields { + if !slices.Contains(validCollapsedFields, field) { + fmt.Println("Invalid field for collapse:", field) + os.Exit(2) + } + } } func extractTracepointFlags(tracepoints string) (regexes []*regexp.Regexp) { diff --git a/internal/flamegraph/counter.go b/internal/flamegraph/counter.go index bc30df0..96cfe06 100644 --- a/internal/flamegraph/counter.go +++ b/internal/flamegraph/counter.go @@ -1,16 +1,16 @@ package flamegraph -type counter struct { - count uint64 - duration uint64 - durationToPrev uint64 - bytes uint64 // TODO: implement +type Counter struct { + Count uint64 + Duration uint64 + DurationToPrev uint64 + Bytes uint64 // TODO: implement } -func (c counter) add(other counter) counter { - c.count += other.count - c.duration += other.duration - c.durationToPrev += other.durationToPrev - c.bytes += other.bytes +func (c Counter) add(other Counter) Counter { + c.Count += other.Count + c.Duration += other.Duration + c.DurationToPrev += other.DurationToPrev + c.Bytes += other.Bytes return c } diff --git a/internal/flamegraph/flamegraph.go b/internal/flamegraph/flamegraph.go deleted file mode 100644 index 9772f0c..0000000 --- a/internal/flamegraph/flamegraph.go +++ /dev/null @@ -1,60 +0,0 @@ -package flamegraph - -import ( - "context" - "fmt" - "ior/internal/event" - "ior/internal/flags" - "log" - "runtime" - "sync" -) - -// TODO: Add Command in path! Make it configurable? comm/syscall/path, or path/syscall/comm, etc... -// TODO: Idea, show time spent between the syscalls (off syscalls) as well, but in a different color -type Flamegraph struct { - flags flags.Flags - Ch chan *event.Pair - Done chan struct{} - workers []worker -} - -func New() Flamegraph { - f := Flamegraph{ - Ch: make(chan *event.Pair, 4096), - Done: make(chan struct{}), - } - numWorkers := runtime.NumCPU() / 4 - if numWorkers == 0 { - numWorkers = 1 - } - for range numWorkers { - f.workers = append(f.workers, newWorker()) - } - return f -} - -func (f Flamegraph) Start(ctx context.Context) { - go func() { - defer close(f.Done) - var wg sync.WaitGroup - wg.Add(len(f.workers)) - - for i, worker := range f.workers { - fmt.Println("Starting flamegraph worker", i) - go worker.run(ctx, &wg, f.Ch) - } - wg.Wait() - - iod := f.workers[0].iod - if len(f.workers) > 1 { - for i, w := range f.workers[1:] { - iod = iod.merge(w.iod) - fmt.Println("Worker", i+1, "merged") - } - } - if err := iod.commit(); err != nil { - log.Fatal(err) - } - }() -} diff --git a/internal/flamegraph/iordata.go b/internal/flamegraph/iordata.go index 2ba1e9e..44c4174 100644 --- a/internal/flamegraph/iordata.go +++ b/internal/flamegraph/iordata.go @@ -1,37 +1,35 @@ package flamegraph import ( + "bytes" + "encoding/gob" "fmt" + "io" "ior/internal/event" "ior/internal/file" "ior/internal/flags" "ior/internal/types" "iter" - "log" "os" "strings" "time" + // Is there a zstd library part of Go 1.25 "github.com/DataDog/zstd" ) -const recordSeparator = " ␞ " - type pathType = string type traceIdType = types.TraceId type commType = string type pidType = uint32 type tidType = uint32 type flagsType = file.Flags -type pathMap map[pathType]map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]counter +type pathMap map[pathType]map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]Counter type iorData struct { - paths pathMap + paths pathMap // Make sure this field is accessible from outside } -// TODO: Name flag for iorData (outfile format: hostname-name-timestamp.ior.zst) -// TODO: Output path for iorData flag -// TODO: Add helper to convert .ior data file to collapsed format func newIorData() iorData { return iorData{paths: make(pathMap)} } @@ -42,38 +40,38 @@ func cloneString(s string) string { return string([]byte(s)) } -func (iod iorData) add(ev *event.Pair) { - cnt := counter{count: 1, duration: ev.Duration, durationToPrev: ev.DurationToPrev} - iod.addPath(ev.FileName(), ev.EnterEv.GetTraceId(), string(ev.Comm), ev.EnterEv.GetPid(), +func (iod iorData) addEventPair(ev *event.Pair) { + cnt := Counter{Count: 1, Duration: ev.Duration, DurationToPrev: ev.DurationToPrev} + iod.add(ev.FileName(), ev.EnterEv.GetTraceId(), strings.TrimSpace(ev.Comm), ev.EnterEv.GetPid(), ev.EnterEv.GetTid(), ev.Flags(), cnt) } -func (iod iorData) addPath(path pathType, traceId traceIdType, comm commType, - pid pidType, tid tidType, flags flagsType, addCnt counter) { +func (iod iorData) add(path pathType, traceId traceIdType, comm commType, + pid pidType, tid tidType, flags flagsType, addCnt Counter) { pathMap, ok := iod.paths[path] if !ok { - pathMap = make(map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]counter) + pathMap = make(map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]Counter) iod.paths[path] = pathMap } traceIdMap, ok := iod.paths[path][traceId] if !ok { - traceIdMap = make(map[commType]map[pidType]map[tidType]map[flagsType]counter) + traceIdMap = make(map[commType]map[pidType]map[tidType]map[flagsType]Counter) iod.paths[path][traceId] = traceIdMap } commMap, ok := iod.paths[path][traceId][comm] if !ok { - commMap = make(map[pidType]map[tidType]map[flagsType]counter) + commMap = make(map[pidType]map[tidType]map[flagsType]Counter) iod.paths[path][traceId][comm] = commMap } pidMap, ok := iod.paths[path][traceId][comm][pid] if !ok { - pidMap = make(map[tidType]map[flagsType]counter) + pidMap = make(map[tidType]map[flagsType]Counter) iod.paths[path][traceId][comm][pid] = pidMap } tidMap, ok := iod.paths[path][traceId][comm][pid][tid] if !ok { - tidMap = make(map[flagsType]counter) + tidMap = make(map[flagsType]Counter) iod.paths[path][traceId][comm][pid][tid] = tidMap } cnt, ok := iod.paths[path][traceId][comm][pid][tid][flags] @@ -87,26 +85,26 @@ func (iod iorData) addPath(path pathType, traceId traceIdType, comm commType, func (iod iorData) merge(other iorData) iorData { for path, traceIdMap := range other.paths { if _, ok := iod.paths[path]; !ok { - iod.paths[path] = make(map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]counter) + iod.paths[path] = make(map[traceIdType]map[commType]map[pidType]map[tidType]map[flagsType]Counter) } for traceId, commMap := range traceIdMap { if _, ok := iod.paths[path][traceId]; !ok { - iod.paths[path][traceId] = make(map[commType]map[pidType]map[tidType]map[flagsType]counter) + iod.paths[path][traceId] = make(map[commType]map[pidType]map[tidType]map[flagsType]Counter) } for comm, pidMap := range commMap { if _, ok := iod.paths[path][traceId][comm]; !ok { - iod.paths[path][traceId][comm] = make(map[pidType]map[tidType]map[flagsType]counter) + iod.paths[path][traceId][comm] = make(map[pidType]map[tidType]map[flagsType]Counter) } for pid, tidMap := range pidMap { if _, ok := iod.paths[path][traceId][comm][pid]; !ok { - iod.paths[path][traceId][comm][pid] = make(map[tidType]map[flagsType]counter) + iod.paths[path][traceId][comm][pid] = make(map[tidType]map[flagsType]Counter) } for tid, flagsMap := range tidMap { if _, ok := iod.paths[path][traceId][comm][pid][tid]; !ok { - iod.paths[path][traceId][comm][pid][tid] = make(map[flagsType]counter) + iod.paths[path][traceId][comm][pid][tid] = make(map[flagsType]Counter) } for flags, cnt := range flagsMap { - iod.addPath(path, traceId, comm, pid, tid, flags, cnt) + iod.add(path, traceId, comm, pid, tid, flags, cnt) } } } @@ -116,7 +114,7 @@ func (iod iorData) merge(other iorData) iorData { return iod } -func (iod iorData) commit() error { +func (iod iorData) serializeToFile() error { hostname, err := os.Hostname() if err != nil { panic(err) @@ -124,7 +122,7 @@ func (iod iorData) commit() error { filename := fmt.Sprintf("%s-%s-%s.ior.zst", hostname, flags.Get().FlamegraphName, time.Now().Format("2006-01-02_15:04:05")) - log.Println("Writing", filename) + fmt.Println("Writing", filename) tmpFilename := fmt.Sprintf("%s.tmp", filename) file, err := os.Create(tmpFilename) @@ -136,15 +134,36 @@ func (iod iorData) commit() error { encoder := zstd.NewWriter(file) defer encoder.Close() - for line := range iod.lines() { - if _, err := encoder.Write([]byte(line + "\n")); err != nil { - return err - } + bytes, err := iod.serialize() + if err != nil { + return err + } + + if _, err := encoder.Write(bytes); err != nil { + return err } return os.Rename(tmpFilename, filename) } +func (iod iorData) loadFromFile(filename string) error { + file, err := os.Open(filename) + if err != nil { + return err + } + defer file.Close() + + decoder := zstd.NewReader(file) + defer decoder.Close() + + var buffer bytes.Buffer + if _, err = io.Copy(&buffer, decoder); err != nil { + return err + } + + return iod.deserialize(&buffer) +} + func (iod iorData) lines() iter.Seq[string] { return func(yield func(string) bool) { for path, traceIdMap := range iod.paths { @@ -160,9 +179,9 @@ func (iod iorData) lines() iter.Seq[string] { fmt.Sprint(pid), fmt.Sprint(tid), flags.String(), - fmt.Sprintf("%d %d %d %d", cnt.count, cnt.duration, cnt.durationToPrev, cnt.bytes), + fmt.Sprintf("%d %d %d %d", cnt.Count, cnt.Duration, cnt.DurationToPrev, cnt.Bytes), }, - recordSeparator) + " --- ") if !yield(joinedStr) { // Stop iteration if yield returns false return @@ -175,3 +194,15 @@ func (iod iorData) lines() iter.Seq[string] { } } } + +func (iod iorData) serialize() ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(iod.paths) + return buf.Bytes(), err +} + +func (iod *iorData) deserialize(buf *bytes.Buffer) error { + dec := gob.NewDecoder(buf) + return dec.Decode(&iod.paths) +} diff --git a/internal/flamegraph/iordata_test.go b/internal/flamegraph/iordata_test.go index ff17ca3..7d8c16b 100644 --- a/internal/flamegraph/iordata_test.go +++ b/internal/flamegraph/iordata_test.go @@ -14,16 +14,16 @@ func TestAddPath(t *testing.T) { pid := pidType(1234) tid := tidType(5678) flags := flagsType(syscall.O_RDONLY) - cnt1 := counter{count: 1, duration: 1000, durationToPrev: 100} + cnt1 := Counter{Count: 1, Duration: 1000, DurationToPrev: 100} - iod.addPath(path, traceId, comm, pid, tid, flags, cnt1) + iod.add(path, traceId, comm, pid, tid, flags, cnt1) if iod.paths[path][traceId][comm][pid][tid][flags] != cnt1 { t.Errorf("Expected counter %v, got %v", cnt1, iod.paths[path][traceId][comm][pid][tid][flags]) } - cnt2 := counter{count: 2, duration: 2000, durationToPrev: 200} + cnt2 := Counter{Count: 2, Duration: 2000, DurationToPrev: 200} - iod.addPath(path, traceId, comm, pid, tid, flags, cnt2) + iod.add(path, traceId, comm, pid, tid, flags, cnt2) resultCnt := cnt1.add(cnt2) if iod.paths[path][traceId][comm][pid][tid][flags] != resultCnt { @@ -38,28 +38,28 @@ func TestMerge(t *testing.T) { // Initialize two iorData instances with sample data iod1 := iorData{paths: pathMap{ - "path1": {traceId: {"comm1": {100: {1000: {rdwrFlag: counter{ - count: 10, - duration: 1000, - durationToPrev: 100, + "path1": {traceId: {"comm1": {100: {1000: {rdwrFlag: Counter{ + Count: 10, + Duration: 1000, + DurationToPrev: 100, }}}}}}}} iod2 := iorData{paths: pathMap{ - "path1": {traceId: {"comm1": {100: {1000: {roFlag: counter{ - count: 20, - duration: 2000, - durationToPrev: 200, + "path1": {traceId: {"comm1": {100: {1000: {roFlag: Counter{ + Count: 20, + Duration: 2000, + DurationToPrev: 200, }}}}}}}} iod3 := iorData{paths: pathMap{ - "path2": {traceId: {"comm2": {101: {1000: {roFlag: counter{ - count: 20, - duration: 2000, - durationToPrev: 200, + "path2": {traceId: {"comm2": {101: {1000: {roFlag: Counter{ + Count: 20, + Duration: 2000, + DurationToPrev: 200, }}}}}}}} iod4 := iorData{paths: pathMap{ - "path2": {traceId: {"comm2": {101: {1000: {roFlag: counter{ - count: 40, - duration: 4000, - durationToPrev: 400, + "path2": {traceId: {"comm2": {101: {1000: {roFlag: Counter{ + Count: 40, + Duration: 4000, + DurationToPrev: 400, }}}}}}}} // Merge iod2 into iod1 @@ -72,11 +72,11 @@ func TestMerge(t *testing.T) { if len(merged.paths) != 2 { t.Errorf("Expected 2 paths, got %d", len(merged.paths)) } - if merged.paths["path1"][traceId]["comm1"][100][1000][roFlag].count != 10 { - t.Errorf("Expected counter 10, got %d", merged.paths["path1"][1]["comm1"][100][1000][rdwrFlag].count) + if merged.paths["path1"][traceId]["comm1"][100][1000][roFlag].Count != 10 { + t.Errorf("Expected counter 10, got %d", merged.paths["path1"][1]["comm1"][100][1000][rdwrFlag].Count) } - if merged.paths["path2"][traceId]["comm2"][101][1000][roFlag].count != 60 { - t.Errorf("Expected counter 60, got %d", merged.paths["path2"][1]["comm2"][101][1000][roFlag].count) + if merged.paths["path2"][traceId]["comm2"][101][1000][roFlag].Count != 60 { + t.Errorf("Expected counter 60, got %d", merged.paths["path2"][1]["comm2"][101][1000][roFlag].Count) } }) diff --git a/internal/flamegraph/iordatacollector.go b/internal/flamegraph/iordatacollector.go new file mode 100644 index 0000000..6a97379 --- /dev/null +++ b/internal/flamegraph/iordatacollector.go @@ -0,0 +1,60 @@ +package flamegraph + +import ( + "context" + "fmt" + "ior/internal/event" + "ior/internal/flags" + "os" + "runtime" + "sync" +) + +// TODO: Idea, show time spent between the syscalls (off syscalls) as well, but in a different color +type IorDataCollector struct { + flags flags.Flags + Ch chan *event.Pair + Done chan struct{} + workers []worker +} + +func New() IorDataCollector { + f := IorDataCollector{ + Ch: make(chan *event.Pair, 4096), + Done: make(chan struct{}), + } + numWorkers := runtime.NumCPU() / 4 + if numWorkers == 0 { + numWorkers = 1 + } + for range numWorkers { + f.workers = append(f.workers, newWorker()) + } + return f +} + +func (f IorDataCollector) Start(ctx context.Context) { + go func() { + defer close(f.Done) + var wg sync.WaitGroup + wg.Add(len(f.workers)) + + for i, worker := range f.workers { + fmt.Println("Starting flamegraph worker", i) + go worker.run(ctx, &wg, f.Ch) + } + wg.Wait() + + iod := f.workers[0].iod + if len(f.workers) > 1 { + for i, w := range f.workers[1:] { + iod = iod.merge(w.iod) + fmt.Println("Worker", i+1, "merged") + } + } + if err := iod.serializeToFile(); err != nil { + fmt.Println(err) + os.Exit(2) + } + }() +} diff --git a/internal/flamegraph/worker.go b/internal/flamegraph/worker.go index 40f6d3f..534bfc3 100644 --- a/internal/flamegraph/worker.go +++ b/internal/flamegraph/worker.go @@ -22,7 +22,7 @@ func (w worker) run(ctx context.Context, wg *sync.WaitGroup, ch <-chan *event.Pa for { select { case ev := <-ch: - w.iod.add(ev) + w.iod.addEventPair(ev) ev.Recycle() default: diff --git a/internal/ior.go b/internal/ior.go index c2f2f02..2aff207 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -12,6 +12,7 @@ import ( "time" "ior/internal/flags" + "ior/internal/flamegraph" "ior/internal/tracepoints" bpf "github.com/aquasecurity/libbpfgo" @@ -44,6 +45,16 @@ func attachTracepoints(bpfModule *bpf.Module) error { } func Run() error { + iorFile := flags.Get().IorDataFile + + if iorFile != "" { + return flamegraph.NewCollapsed(iorFile, flags.Get().CollapsedFields).Generate(iorFile) + } + + return runTrace() +} + +func runTrace() error { bpfModule, err := bpf.NewModuleFromFile("ior.bpf.o") if err != nil { return err -- cgit v1.2.3