diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-19 09:47:09 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-19 09:47:09 +0300 |
| commit | 5bd44dcb1e588fd5df8c02aec58353f7aa8f7d13 (patch) | |
| tree | d4059fce1d43a1f0f0815d753b627802e7599b13 | |
| parent | d699ef44a9ded5d419a470a4f6715ecff7f0fcd1 (diff) | |
t6 stabilize family recording integration
| -rw-r--r-- | cmd/ioworkload/main.go | 20 | ||||
| -rw-r--r-- | cmd/ioworkload/scenario_family.go | 23 | ||||
| -rw-r--r-- | integrationtests/family_test.go | 8 | ||||
| -rw-r--r-- | integrationtests/harness.go | 9 | ||||
| -rw-r--r-- | internal/flags/flags.go | 2 | ||||
| -rw-r--r-- | internal/ior_mode_registry.go | 2 | ||||
| -rw-r--r-- | internal/ior_mode_test.go | 17 | ||||
| -rw-r--r-- | internal/ior_parquet_sink.go | 7 |
8 files changed, 67 insertions, 21 deletions
diff --git a/cmd/ioworkload/main.go b/cmd/ioworkload/main.go index 0276a9c..57424e4 100644 --- a/cmd/ioworkload/main.go +++ b/cmd/ioworkload/main.go @@ -8,13 +8,17 @@ import ( "fmt" "os" "slices" + "strconv" "time" ) // Give ior enough time to attach tracepoints before scenarios emit syscalls. // Under slower CI or locally saturated systems, 5s can still miss first-call // events for single-shot scenarios. Use a slightly larger delay for stability. -const startupDelay = 8 * time.Second +const ( + defaultStartupDelay = 8 * time.Second + startupDelayEnv = "IOR_WORKLOAD_STARTUP_DELAY_MS" +) func main() { scenario := flag.String("scenario", "", "I/O scenario to execute") @@ -40,10 +44,22 @@ func main() { } fmt.Println(os.Getpid()) - time.Sleep(startupDelay) + time.Sleep(configuredStartupDelay()) if err := run(); err != nil { fmt.Fprintf(os.Stderr, "scenario %s failed: %v\n", *scenario, err) os.Exit(1) } } + +func configuredStartupDelay() time.Duration { + raw := os.Getenv(startupDelayEnv) + if raw == "" { + return defaultStartupDelay + } + ms, err := strconv.Atoi(raw) + if err != nil || ms < 0 { + return defaultStartupDelay + } + return time.Duration(ms) * time.Millisecond +} diff --git a/cmd/ioworkload/scenario_family.go b/cmd/ioworkload/scenario_family.go index c550b4b..21d58e8 100644 --- a/cmd/ioworkload/scenario_family.go +++ b/cmd/ioworkload/scenario_family.go @@ -4,11 +4,27 @@ import ( "fmt" "path/filepath" "syscall" + "time" ) +const familyMixedEmitFor = 3 * time.Second + // familyMixed emits representative syscalls from multiple broad families so // integration tests can verify family tagging and aggregation from a real trace. func familyMixed() error { + deadline := time.Now().Add(familyMixedEmitFor) + for { + if err := familyMixedOnce(); err != nil { + return err + } + if time.Now().After(deadline) { + return nil + } + time.Sleep(25 * time.Millisecond) + } +} + +func familyMixedOnce() error { if err := familyMixedFS(); err != nil { return err } @@ -21,10 +37,7 @@ func familyMixed() error { if err := familyMixedNetwork(); err != nil { return err } - if err := familyMixedProcessSchedTime(); err != nil { - return err - } - return nil + return familyMixedProcessSchedTime() } func familyMixedFS() error { @@ -83,7 +96,7 @@ func familyMixedProcessSchedTime() error { if _, _, errno := syscall.RawSyscall(syscall.SYS_SCHED_YIELD, 0, 0, 0); errno != 0 { return fmt.Errorf("sched_yield: %w", errno) } - if err := syscall.Nanosleep(&syscall.Timespec{Nsec: 1000}, nil); err != nil { + if err := syscall.Nanosleep(&syscall.Timespec{Nsec: 1000}, nil); err != nil && err != syscall.EINTR { return fmt.Errorf("nanosleep: %w", err) } return nil diff --git a/integrationtests/family_test.go b/integrationtests/family_test.go index ad35b46..7558bfa 100644 --- a/integrationtests/family_test.go +++ b/integrationtests/family_test.go @@ -10,9 +10,15 @@ import ( parquetgo "github.com/parquet-go/parquet-go" ) +const ( + familyParquetDuration = 6 + familyWorkloadStartupEnv = "IOR_WORKLOAD_STARTUP_DELAY_MS=1000" +) + func TestFamilyParquetRecordingAndAggregation(t *testing.T) { h := newTestHarness(t) - path, pid, err := h.RunParquet("family-mixed", defaultDuration) + h.WorkloadEnv = []string{familyWorkloadStartupEnv} + path, pid, err := h.RunParquet("family-mixed", familyParquetDuration) if err != nil { t.Fatalf("run family-mixed parquet scenario: %v", err) } diff --git a/integrationtests/harness.go b/integrationtests/harness.go index afde05e..b69e5d2 100644 --- a/integrationtests/harness.go +++ b/integrationtests/harness.go @@ -25,6 +25,7 @@ type TestHarness struct { WorkloadBinary string // path to built ioworkload binary BpfObject string // optional path to external BPF object override OutputDir string // temp dir for .ior.zst output + WorkloadEnv []string } // Run executes a single integration test scenario. It starts the ioworkload @@ -79,7 +80,7 @@ func (h *TestHarness) RunParquet(scenario string, duration int) (string, int, er return "", 0, err } - iorCmd, err := h.startIorParquet(parquetPath, duration) + iorCmd, err := h.startIorParquet(workloadPID, parquetPath, duration) if err != nil { workloadCmd.Process.Kill() workloadCmd.Wait() @@ -99,6 +100,9 @@ func (h *TestHarness) RunParquet(scenario string, duration int) (string, int, er func (h *TestHarness) startWorkload(scenario string) (*exec.Cmd, int, error) { cmd := exec.Command(h.WorkloadBinary, "--scenario="+scenario) cmd.Stderr = os.Stderr + if len(h.WorkloadEnv) > 0 { + cmd.Env = append(os.Environ(), h.WorkloadEnv...) + } stdout, err := cmd.StdoutPipe() if err != nil { @@ -157,8 +161,9 @@ func (h *TestHarness) startIor(pid int, scenario string, duration int, extraArgs return h.startIorArgs(args) } -func (h *TestHarness) startIorParquet(parquetPath string, duration int) (*exec.Cmd, error) { +func (h *TestHarness) startIorParquet(pid int, parquetPath string, duration int) (*exec.Cmd, error) { args := []string{ + "-pid", strconv.Itoa(pid), "-parquet", parquetPath, "-duration", strconv.Itoa(duration), } diff --git a/internal/flags/flags.go b/internal/flags/flags.go index e7171ce..285569c 100644 --- a/internal/flags/flags.go +++ b/internal/flags/flags.go @@ -170,7 +170,7 @@ func registerFlags(fs *flag.FlagSet, cfg *Config) (tpsAttach, tpsExclude, fields fs.BoolVar(&cfg.PlainMode, "plain", false, "Enable plain CSV output mode (disable TUI)") fs.BoolVar(&cfg.FlamegraphOutput, "flamegraph", false, "Write aggregated .ior.zst output for trace/integration workflows") - fs.StringVar(&cfg.ParquetPath, "parquet", cfg.ParquetPath, "Write all traced syscall rows directly to a parquet file in headless mode (skip the TUI; incompatible with -plain, -flamegraph, --testflames, --testliveflames, and content filters)") + fs.StringVar(&cfg.ParquetPath, "parquet", cfg.ParquetPath, "Write traced syscall rows directly to a parquet file in headless mode (skip the TUI; compatible with -pid; incompatible with -plain, -flamegraph, --testflames, --testliveflames, and other content filters)") fs.StringVar(&cfg.OutputName, "name", cfg.OutputName, "Base name for .ior.zst trace output files") fs.BoolVar(&cfg.TestFlames, "testflames", false, "Run TUI with static synthetic flamegraph data for keyboard-navigation testing") fs.BoolVar(&cfg.TestLiveFlames, "testliveflames", false, "Run TUI with continuously-updating synthetic flamegraph data for live keyboard-navigation testing") diff --git a/internal/ior_mode_registry.go b/internal/ior_mode_registry.go index e262784..b0ef59f 100644 --- a/internal/ior_mode_registry.go +++ b/internal/ior_mode_registry.go @@ -214,7 +214,7 @@ func (h *headlessParquetModeHandler) validate(cfg flags.Config) error { return errors.New("-parquet and -flamegraph are mutually exclusive") } if hasHeadlessParquetContentFilters(cfg) { - return errors.New("-parquet cannot be combined with content filters (-comm, -path, -pid, -tid)") + return errors.New("-parquet cannot be combined with content filters (-comm, -path, -tid)") } return nil } diff --git a/internal/ior_mode_test.go b/internal/ior_mode_test.go index 526b9af..8a152cc 100644 --- a/internal/ior_mode_test.go +++ b/internal/ior_mode_test.go @@ -446,11 +446,18 @@ func TestValidateRunConfigRejectsParquetWithContentFilters(t *testing.T) { if err == nil { t.Fatalf("expected error for -parquet with content filters") } - if err.Error() != "-parquet cannot be combined with content filters (-comm, -path, -pid, -tid)" { + if err.Error() != "-parquet cannot be combined with content filters (-comm, -path, -tid)" { t.Fatalf("unexpected error: %v", err) } } +func TestValidateRunConfigAllowsParquetWithPIDFilter(t *testing.T) { + cfg := flags.Config{ParquetPath: "trace.parquet", PidFilter: 42} + if err := validateRunConfig(cfg); err != nil { + t.Fatalf("expected -parquet with -pid to be accepted, got %v", err) + } +} + func TestValidateRunConfigRejectsParquetWithGlobalFilter(t *testing.T) { cfg := flags.Config{ ParquetPath: "trace.parquet", @@ -462,7 +469,7 @@ func TestValidateRunConfigRejectsParquetWithGlobalFilter(t *testing.T) { if err == nil { t.Fatalf("expected error for -parquet with global filter") } - if err.Error() != "-parquet cannot be combined with content filters (-comm, -path, -pid, -tid)" { + if err.Error() != "-parquet cannot be combined with content filters (-comm, -path, -tid)" { t.Fatalf("unexpected error: %v", err) } } @@ -664,7 +671,7 @@ func TestTuiTraceStarterFromRunTraceRespectsCancel(t *testing.T) { } } -func TestHeadlessParquetTraceConfigClearsContentFilters(t *testing.T) { +func TestHeadlessParquetTraceConfigPreservesPIDAndClearsContentFilters(t *testing.T) { cfg := flags.Config{ ParquetPath: "trace.parquet", PidFilter: 1234, @@ -677,8 +684,8 @@ func TestHeadlessParquetTraceConfigClearsContentFilters(t *testing.T) { } got := headlessParquetTraceConfig(cfg) - if got.PidFilter != -1 || got.TidFilter != -1 { - t.Fatalf("pid/tid filters = %d/%d, want -1/-1", got.PidFilter, got.TidFilter) + if got.PidFilter != 1234 || got.TidFilter != -1 { + t.Fatalf("pid/tid filters = %d/%d, want 1234/-1", got.PidFilter, got.TidFilter) } if got.CommFilter != "" || got.PathFilter != "" { t.Fatalf("comm/path filters = %q/%q, want empty", got.CommFilter, got.PathFilter) diff --git a/internal/ior_parquet_sink.go b/internal/ior_parquet_sink.go index 6869e1e..f52dffa 100644 --- a/internal/ior_parquet_sink.go +++ b/internal/ior_parquet_sink.go @@ -67,24 +67,23 @@ func isHeadlessParquetMode(cfg flags.Config) bool { } // hasHeadlessParquetContentFilters reports whether cfg carries filters that are -// incompatible with headless Parquet mode (all events must be recorded unfiltered). +// incompatible with headless Parquet mode. PID filtering is still allowed so +// focused headless recordings can avoid tracing unrelated system activity. func hasHeadlessParquetContentFilters(cfg flags.Config) bool { return cfg.CommFilter != "" || cfg.PathFilter != "" || - cfg.PidFilter > 0 || cfg.TidFilter > 0 || cfg.GlobalFilter.IsActive() } // headlessParquetTraceConfig strips TUI-only flags from cfg so that the -// headless Parquet run records a clean, unfiltered event stream. +// headless Parquet run records a clean event stream, optionally scoped by PID. func headlessParquetTraceConfig(cfg flags.Config) flags.Config { out := cfg out.PlainMode = false out.FlamegraphOutput = false out.CommFilter = "" out.PathFilter = "" - out.PidFilter = -1 out.TidFilter = -1 out.GlobalFilter = globalfilter.Filter{} return out |
