diff options
| -rw-r--r-- | internal/ior.go | 248 |
1 files changed, 156 insertions, 92 deletions
diff --git a/internal/ior.go b/internal/ior.go index 4d5aea1..7b411f2 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -301,53 +301,67 @@ func newEventLoopConfig(cfg flags.Flags) eventLoopConfig { } } -func runTraceWithContext(parentCtx context.Context, cfg flags.Flags, started chan<- struct{}, configure func(*eventLoop)) error { - if getEUID() != 0 { - return errRootPrivilegesRequired - } +type profilingControl struct { + done chan struct{} + enabled bool + cpuProfile *os.File + memProfile *os.File + stopExecTrace func() + stopOnce sync.Once +} - verbose := started == nil - logln := func(...any) {} - if verbose { - logln = func(args ...any) { _, _ = fmt.Println(args...) } +func newLogger(verbose bool) func(...any) { + if !verbose { + return func(...any) {} } + return func(args ...any) { _, _ = fmt.Println(args...) } +} + +func setupBPFModule(parentCtx context.Context, cfg flags.Flags) (*bpf.Module, *probemanager.Manager, func(), error) { + releaseBindings := func() {} bpfModule, err := bpf.NewModuleFromFile("ior.bpf.o") if err != nil { - return err + return nil, nil, releaseBindings, err } - defer bpfModule.Close() - if err := resizeBPFMaps(cfg, bpfModule); err != nil { - return err + bpfModule.Close() + return nil, nil, releaseBindings, err } - if err := setBPFGlobals(cfg, bpfModule); err != nil { - return err + bpfModule.Close() + return nil, nil, releaseBindings, err } - if err := bpfModule.BPFLoadObject(); err != nil { - return err + bpfModule.Close() + return nil, nil, releaseBindings, err } mgr := probemanager.NewManager(libbpfTracepointModule{module: bpfModule}) - defer mgr.Close() if err := mgr.AttachAll(cfg.ShouldIAttachTracepoint, tracepoints.List); err != nil { - return err + mgr.Close() + bpfModule.Close() + return nil, nil, releaseBindings, err } if bindings, ok := tui.RuntimeBindingsFromContext(parentCtx); ok { bindings.SetProbeManager(mgr) - defer bindings.SetProbeManager(nil) + releaseBindings = func() { bindings.SetProbeManager(nil) } } + return bpfModule, mgr, releaseBindings, nil +} - // 4096 channel size, minimises event drops +func setupEventChannel(bpfModule *bpf.Module) (chan []byte, error) { + // 4096 channel size minimizes event drops. ch := make(chan []byte, 4096) rb, err := bpfModule.InitRingBuf("event_map", ch) if err != nil { - return err + return nil, err } rb.Poll(300) + return ch, nil +} +func setupTraceContext(parentCtx context.Context, cfg flags.Flags, logln func(...any)) (context.Context, context.CancelFunc, func()) { ctx := parentCtx cancel := func() {} if shouldAutoStopByDuration(cfg) { @@ -358,88 +372,106 @@ func runTraceWithContext(parentCtx context.Context, cfg flags.Flags, started cha logln("Probing until stopped...") ctx, cancel = context.WithCancel(parentCtx) } - defer cancel() signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - defer signal.Stop(signalCh) - + stopSignals := func() { + signal.Stop(signalCh) + } go func() { select { case <-signalCh: logln("Received signal, shutting down...") cancel() case <-ctx.Done(): - return } }() + return ctx, cancel, stopSignals +} - pprofDone := make(chan struct{}) - var cpuProfile, memProfile, execTraceProfile *os.File - stopExecTrace := func() {} - if cfg.PprofEnable { - isTUIMode := started != nil - cpuProfilePath, memProfilePath, execTracePath, execTraceDuration := profilingFilesForMode(isTUIMode) - - if cpuProfile, err = os.Create(cpuProfilePath); err != nil { - return err - } - if memProfile, err = os.Create(memProfilePath); err != nil { - _ = cpuProfile.Close() - return err - } +func setupProfiling(ctx context.Context, cfg flags.Flags, started chan<- struct{}) (*profilingControl, error) { + control := &profilingControl{ + done: make(chan struct{}), + stopExecTrace: func() {}, + } + if !cfg.PprofEnable { + close(control.done) + return control, nil + } - if execTracePath != "" { - if execTraceProfile, err = os.Create(execTracePath); err != nil { - _ = cpuProfile.Close() - _ = memProfile.Close() - return err - } - if err := trace.Start(execTraceProfile); err != nil { - _ = cpuProfile.Close() - _ = memProfile.Close() - _ = execTraceProfile.Close() - return err - } + control.enabled = true + isTUIMode := started != nil + cpuProfilePath, memProfilePath, execTracePath, execTraceDuration := profilingFilesForMode(isTUIMode) - // TUI profiling workflow: - // go tool pprof -http=:8080 ior-tui-cpu.prof - // go tool trace ior-tui-trace.out - var stopOnce sync.Once - stopExecTrace = func() { - stopOnce.Do(func() { - trace.Stop() - _ = execTraceProfile.Close() - }) - } + cpuProfile, err := os.Create(cpuProfilePath) + if err != nil { + return nil, err + } + memProfile, err := os.Create(memProfilePath) + if err != nil { + _ = cpuProfile.Close() + return nil, err + } + control.cpuProfile = cpuProfile + control.memProfile = memProfile - go func() { - timer := time.NewTimer(execTraceDuration) - defer timer.Stop() - select { - case <-ctx.Done(): - case <-timer.C: - } - stopExecTrace() - }() + if execTracePath != "" { + execTraceProfile, err := os.Create(execTracePath) + if err != nil { + _ = cpuProfile.Close() + _ = memProfile.Close() + return nil, err } - - if err := pprof.StartCPUProfile(cpuProfile); err != nil { - stopExecTrace() + if err := trace.Start(execTraceProfile); err != nil { _ = cpuProfile.Close() _ = memProfile.Close() - return err + _ = execTraceProfile.Close() + return nil, err } - } else { - close(pprofDone) + var stopOnce sync.Once + control.stopExecTrace = func() { + stopOnce.Do(func() { + trace.Stop() + _ = execTraceProfile.Close() + }) + } + go func() { + timer := time.NewTimer(execTraceDuration) + defer timer.Stop() + select { + case <-ctx.Done(): + case <-timer.C: + } + control.stopExecTrace() + }() } - signalTraceStarted(started) - - el, err := newEventLoop(newEventLoopConfig(cfg)) - if err != nil { - return err + if err := pprof.StartCPUProfile(cpuProfile); err != nil { + control.stopExecTrace() + _ = cpuProfile.Close() + _ = memProfile.Close() + return nil, err } + return control, nil +} + +func (p *profilingControl) stop(logln func(...any)) { + p.stopOnce.Do(func() { + if !p.enabled { + return + } + logln("Stopping profiling and writing profile files") + pprof.StopCPUProfile() + runtime.GC() + _ = pprof.WriteHeapProfile(p.memProfile) + p.stopExecTrace() + _ = p.cpuProfile.Close() + _ = p.memProfile.Close() + close(p.done) + }) +} + +func configureEventLoopOutput(el *eventLoop, mgr *probemanager.Manager, configure func(*eventLoop)) { if configure != nil { configure(el) } @@ -453,28 +485,60 @@ func runTraceWithContext(parentCtx context.Context, cfg flags.Flags, started cha origPrintCb(ep) } } +} +func startTraceShutdownWatcher(ctx context.Context, verbose bool, el *eventLoop, profiling *profilingControl, logln func(...any)) { go func() { <-ctx.Done() if verbose { fmt.Println(el.stats()) } - if cfg.PprofEnable { - logln("Stopping profiling and writing profile files") - pprof.StopCPUProfile() - runtime.GC() - _ = pprof.WriteHeapProfile(memProfile) - stopExecTrace() - _ = cpuProfile.Close() - _ = memProfile.Close() - close(pprofDone) - } + profiling.stop(logln) }() +} + +func runTraceWithContext(parentCtx context.Context, cfg flags.Flags, started chan<- struct{}, configure func(*eventLoop)) error { + if getEUID() != 0 { + return errRootPrivilegesRequired + } + + verbose := started == nil + logln := newLogger(verbose) + + bpfModule, mgr, releaseBindings, err := setupBPFModule(parentCtx, cfg) + if err != nil { + return err + } + defer bpfModule.Close() + defer mgr.Close() + defer releaseBindings() + + ch, err := setupEventChannel(bpfModule) + if err != nil { + return err + } + ctx, cancel, stopSignals := setupTraceContext(parentCtx, cfg, logln) + defer cancel() + defer stopSignals() + + profiling, err := setupProfiling(ctx, cfg, started) + if err != nil { + return err + } + + signalTraceStarted(started) + + el, err := newEventLoop(newEventLoopConfig(cfg)) + if err != nil { + return err + } + configureEventLoopOutput(el, mgr, configure) + startTraceShutdownWatcher(ctx, verbose, el, profiling, logln) startTime := time.Now() el.run(ctx, ch) totalDuration := time.Since(startTime) - <-pprofDone + <-profiling.done logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration) return nil } |
