diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-19 09:36:51 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-19 09:36:51 +0300 |
| commit | d699ef44a9ded5d419a470a4f6715ecff7f0fcd1 (patch) | |
| tree | 4a56611a392c7eb17a0a2e4f243b0342f8c3ec6c /integrationtests | |
| parent | 65599ad9b87b1c61cb6d8232200da88952370e96 (diff) | |
t6 address family recording review
Diffstat (limited to 'integrationtests')
| -rw-r--r-- | integrationtests/family_test.go | 115 | ||||
| -rw-r--r-- | integrationtests/harness.go | 38 |
2 files changed, 153 insertions, 0 deletions
diff --git a/integrationtests/family_test.go b/integrationtests/family_test.go new file mode 100644 index 0000000..ad35b46 --- /dev/null +++ b/integrationtests/family_test.go @@ -0,0 +1,115 @@ +package integrationtests + +import ( + "io" + "os" + "testing" + + iorparquet "ior/internal/parquet" + + parquetgo "github.com/parquet-go/parquet-go" +) + +func TestFamilyParquetRecordingAndAggregation(t *testing.T) { + h := newTestHarness(t) + path, pid, err := h.RunParquet("family-mixed", defaultDuration) + if err != nil { + t.Fatalf("run family-mixed parquet scenario: %v", err) + } + + rows := filterRecordsByPID(readParquetRecords(t, path), uint32(pid)) + if len(rows) == 0 { + t.Fatalf("expected parquet rows for workload PID %d", pid) + } + + expectedSyscallFamilies := map[string]string{ + "openat": "FS", + "write": "FS", + "mmap": "Memory", + "munmap": "Memory", + "pipe2": "IPC", + "socketpair": "Network", + "getpid": "Process", + "sched_yield": "Sched", + } + seenSyscalls := make(map[string]bool, len(expectedSyscallFamilies)) + for _, row := range rows { + if row.Family == "" { + t.Fatalf("record has empty family tag: %+v", row) + } + wantFamily, ok := expectedSyscallFamilies[row.Syscall] + if !ok { + continue + } + if row.Family != wantFamily { + t.Fatalf("%s family = %q, want %q in row %+v", row.Syscall, row.Family, wantFamily, row) + } + seenSyscalls[row.Syscall] = true + } + for syscall := range expectedSyscallFamilies { + if !seenSyscalls[syscall] { + t.Fatalf("expected traced syscall %q in parquet rows", syscall) + } + } + + families := aggregateRecordedFamilies(rows) + var total uint64 + for _, count := range families { + total += count + } + if total != uint64(len(rows)) { + t.Fatalf("aggregated family total = %d, want row count %d", total, len(rows)) + } + for _, family := range []string{"FS", "Memory", "IPC", "Network", "Process", "Sched", "Time"} { + if families[family] == 0 { + t.Fatalf("expected family %q in aggregate counts, got %+v", family, families) + } + } +} + +func filterRecordsByPID(rows []iorparquet.Record, pid uint32) []iorparquet.Record { + filtered := make([]iorparquet.Record, 0, len(rows)) + for _, row := range rows { + if row.PID == pid { + filtered = append(filtered, row) + } + } + return filtered +} + +func readParquetRecords(t *testing.T, path string) []iorparquet.Record { + t.Helper() + + f, err := os.Open(path) + if err != nil { + t.Fatalf("open parquet %q: %v", path, err) + } + defer f.Close() + + reader := parquetgo.NewGenericReader[iorparquet.Record](f) + defer reader.Close() + + var rows []iorparquet.Record + buf := make([]iorparquet.Record, 16) + for { + n, err := reader.Read(buf) + if n > 0 { + rows = append(rows, buf[:n]...) + } + if err == nil { + continue + } + if err == io.EOF { + return rows + } + t.Fatalf("read parquet rows: %v", err) + } +} + +func aggregateRecordedFamilies(rows []iorparquet.Record) map[string]uint64 { + families := make(map[string]uint64) + for _, row := range rows { + families[row.Family]++ + } + return families +} diff --git a/integrationtests/harness.go b/integrationtests/harness.go index e3ee900..afde05e 100644 --- a/integrationtests/harness.go +++ b/integrationtests/harness.go @@ -70,6 +70,32 @@ func (h *TestHarness) RunWithIorArgs(scenario string, duration int, extraIorArgs return result, workloadPID, nil } +// RunParquet executes a scenario in headless Parquet mode and returns the +// recorded Parquet path. +func (h *TestHarness) RunParquet(scenario string, duration int) (string, int, error) { + parquetPath := filepath.Join(h.OutputDir, scenario+".parquet") + workloadCmd, workloadPID, err := h.startWorkload(scenario) + if err != nil { + return "", 0, err + } + + iorCmd, err := h.startIorParquet(parquetPath, duration) + if err != nil { + workloadCmd.Process.Kill() + workloadCmd.Wait() + return "", workloadPID, err + } + + workloadErr, iorErr := waitBoth(workloadCmd, iorCmd, duration, iorShutdownGrace) + if iorErr != nil { + return "", workloadPID, fmt.Errorf("ior: %w", iorErr) + } + if workloadErr != nil { + return "", workloadPID, fmt.Errorf("workload: %w", workloadErr) + } + return parquetPath, workloadPID, nil +} + func (h *TestHarness) startWorkload(scenario string) (*exec.Cmd, int, error) { cmd := exec.Command(h.WorkloadBinary, "--scenario="+scenario) cmd.Stderr = os.Stderr @@ -128,6 +154,18 @@ func (h *TestHarness) startIor(pid int, scenario string, duration int, extraArgs "-duration", strconv.Itoa(duration), } args = append(args, extraArgs...) + return h.startIorArgs(args) +} + +func (h *TestHarness) startIorParquet(parquetPath string, duration int) (*exec.Cmd, error) { + args := []string{ + "-parquet", parquetPath, + "-duration", strconv.Itoa(duration), + } + return h.startIorArgs(args) +} + +func (h *TestHarness) startIorArgs(args []string) (*exec.Cmd, error) { cmd := exec.Command(h.IorBinary, args...) cmd.Dir = h.OutputDir cmd.Stdout = os.Stdout |
