diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 07:14:00 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 07:14:00 +0200 |
| commit | c9d22e32dc9d8d0447beb4ffa78f47a03d0cddc4 (patch) | |
| tree | 18db76552b60a215a5ec9ef95a692aafc6af6564 /internal/ior.go | |
| parent | 2e401326d7abf687f2f67537cfe1b7f93d548305 (diff) | |
feat: add headless parquet recording mode
Diffstat (limited to 'internal/ior.go')
| -rw-r--r-- | internal/ior.go | 169 |
1 files changed, 167 insertions, 2 deletions
diff --git a/internal/ior.go b/internal/ior.go index 12aab7c..1010445 100644 --- a/internal/ior.go +++ b/internal/ior.go @@ -11,6 +11,7 @@ import ( "runtime" "runtime/pprof" "runtime/trace" + "strings" "sync" "syscall" "time" @@ -23,6 +24,7 @@ import ( "ior/internal/parquet" "ior/internal/probemanager" "ior/internal/statsengine" + "ior/internal/streamrow" "ior/internal/tracepoints" "ior/internal/tui" "ior/internal/tui/eventstream" @@ -32,6 +34,7 @@ import ( var ( runTraceFn = runTrace + runParquetFn = runHeadlessParquet runTraceWithContextFn = runTraceWithContext runTUIFn = tui.RunWithTraceStarterConfig runTUITestFlamesFn = tui.RunTestFlamesWithTraceStarterConfig @@ -82,6 +85,9 @@ func dispatchRun(cfg flags.Config) error { if cfg.TestLiveFlames { return runTUITestLiveFlamesFn(cfg, tuiTestLiveFlamesStarter(cfg)) } + if isHeadlessParquetMode(cfg) { + return runParquetFn(cfg) + } if shouldRunTraceMode(cfg) { return runTraceFn(cfg) } @@ -89,6 +95,23 @@ func dispatchRun(cfg flags.Config) error { } func validateRunConfig(cfg flags.Config) error { + if isHeadlessParquetMode(cfg) { + if cfg.TestFlames { + return errors.New("--testflames cannot be combined with -parquet") + } + if cfg.TestLiveFlames { + return errors.New("--testliveflames cannot be combined with -parquet") + } + if cfg.PlainMode { + return errors.New("-parquet and -plain are mutually exclusive") + } + if cfg.FlamegraphOutput { + return errors.New("-parquet and -flamegraph are mutually exclusive") + } + if hasHeadlessParquetContentFilters(cfg) { + return errors.New("-parquet cannot be combined with content filters (-comm, -path, -pid, -tid)") + } + } if cfg.TestFlames && cfg.PlainMode { return errors.New("--testflames cannot be combined with -plain") } @@ -178,7 +201,7 @@ func runSyntheticLiveFlames(ctx context.Context, liveTrie *flamegraph.LiveTrie, } func shouldRunTraceMode(cfg flags.Config) bool { - return cfg.PlainMode || cfg.FlamegraphOutput + return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg) } func tuiTraceStarterFromRunTrace( @@ -312,6 +335,69 @@ func runTrace(cfg flags.Config) error { return runTraceWithContext(context.Background(), cfg, nil, nil) } +func runHeadlessParquet(cfg flags.Config) error { + if getEUID() != 0 { + return errRootPrivilegesRequired + } + + cfg = headlessParquetTraceConfig(cfg) + logln := newLogger(true) + + bpfModule, mgr, releaseBindings, err := setupBPFModule(context.Background(), cfg) + if err != nil { + return err + } + defer bpfModule.Close() + defer mgr.Close() + defer releaseBindings() + + ch, err := setupEventChannel(bpfModule) + if err != nil { + return err + } + ctx, cancel, stopSignals := setupTraceContext(context.Background(), cfg, logln) + defer cancel() + defer stopSignals() + + profiling, err := setupProfiling(ctx, cfg, nil) + if err != nil { + return err + } + + el, err := newEventLoop(newEventLoopConfig(cfg)) + if err != nil { + return err + } + + recorder := parquet.NewRecorder(parquet.RecorderConfig{}) + if err := recorder.Start(cfg.ParquetPath, parquet.StartOptions{Metadata: parquetMetadata("headless")}); err != nil { + return err + } + + sink := newHeadlessParquetSink(recorder, cancel) + configureEventLoopOutput(el, mgr, sink.configure) + startTraceShutdownWatcher(ctx, true, el, profiling, logln) + + startTime := time.Now() + el.run(ctx, ch) + totalDuration := time.Since(startTime) + <-profiling.done + + stopErr := recorder.Stop() + if err := sink.err(); err != nil { + if stopErr != nil && !errors.Is(stopErr, err) { + return errors.Join(err, stopErr) + } + return err + } + if stopErr != nil { + return stopErr + } + + logln("Good bye... (unloading BPF tracepoints will take a few seconds...) after", totalDuration) + return nil +} + func newEventLoopConfig(cfg flags.Config) eventLoopConfig { fields := make([]string, len(cfg.CollapsedFields)) copy(fields, cfg.CollapsedFields) @@ -630,7 +716,86 @@ func signalTraceStarted(started chan<- struct{}) { } func shouldAutoStopByDuration(cfg flags.Config) bool { - return cfg.PlainMode || cfg.FlamegraphOutput + return cfg.PlainMode || cfg.FlamegraphOutput || isHeadlessParquetMode(cfg) +} + +func isHeadlessParquetMode(cfg flags.Config) bool { + return strings.TrimSpace(cfg.ParquetPath) != "" +} + +func hasHeadlessParquetContentFilters(cfg flags.Config) bool { + return cfg.CommFilter != "" || + cfg.PathFilter != "" || + cfg.PidFilter > 0 || + cfg.TidFilter > 0 || + cfg.GlobalFilter.IsActive() +} + +func headlessParquetTraceConfig(cfg flags.Config) flags.Config { + out := cfg + out.PlainMode = false + out.FlamegraphOutput = false + out.CommFilter = "" + out.PathFilter = "" + out.PidFilter = -1 + out.TidFilter = -1 + out.GlobalFilter = globalfilter.Filter{} + return out +} + +func parquetMetadata(mode string) parquet.FileMetadata { + meta := parquet.FileMetadata{ + StartedAtUnixNano: uint64(time.Now().UnixNano()), + Mode: mode, + IORVersion: flags.Version, + } + if hostname, err := os.Hostname(); err == nil { + meta.Hostname = hostname + } + return meta +} + +type headlessParquetSink struct { + recorder *parquet.Recorder + seq *streamrow.Sequencer + cancel context.CancelFunc + + mu sync.Mutex + recErr error +} + +func newHeadlessParquetSink(recorder *parquet.Recorder, cancel context.CancelFunc) *headlessParquetSink { + return &headlessParquetSink{ + recorder: recorder, + seq: streamrow.NewSequencer(0), + cancel: cancel, + } +} + +func (s *headlessParquetSink) configure(el *eventLoop) { + el.printCb = func(ep *event.Pair) { + row := streamrow.New(s.seq.Next(), ep) + if err := s.recorder.Record(row, 0); err != nil { + s.fail(err) + } + ep.Recycle() + } +} + +func (s *headlessParquetSink) fail(err error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.recErr != nil { + return + } + s.recErr = err + s.cancel() +} + +func (s *headlessParquetSink) err() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.recErr } func profilingFilesForMode(tuiMode bool) (cpuProfilePath, memProfilePath, execTracePath string, execTraceDuration time.Duration) { |
