summaryrefslogtreecommitdiff
path: root/integrationtests
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-19 09:36:51 +0300
committerPaul Buetow <paul@buetow.org>2026-05-19 09:36:51 +0300
commitd699ef44a9ded5d419a470a4f6715ecff7f0fcd1 (patch)
tree4a56611a392c7eb17a0a2e4f243b0342f8c3ec6c /integrationtests
parent65599ad9b87b1c61cb6d8232200da88952370e96 (diff)
t6 address family recording review
Diffstat (limited to 'integrationtests')
-rw-r--r--integrationtests/family_test.go115
-rw-r--r--integrationtests/harness.go38
2 files changed, 153 insertions, 0 deletions
diff --git a/integrationtests/family_test.go b/integrationtests/family_test.go
new file mode 100644
index 0000000..ad35b46
--- /dev/null
+++ b/integrationtests/family_test.go
@@ -0,0 +1,115 @@
+package integrationtests
+
+import (
+ "io"
+ "os"
+ "testing"
+
+ iorparquet "ior/internal/parquet"
+
+ parquetgo "github.com/parquet-go/parquet-go"
+)
+
+func TestFamilyParquetRecordingAndAggregation(t *testing.T) {
+ h := newTestHarness(t)
+ path, pid, err := h.RunParquet("family-mixed", defaultDuration)
+ if err != nil {
+ t.Fatalf("run family-mixed parquet scenario: %v", err)
+ }
+
+ rows := filterRecordsByPID(readParquetRecords(t, path), uint32(pid))
+ if len(rows) == 0 {
+ t.Fatalf("expected parquet rows for workload PID %d", pid)
+ }
+
+ expectedSyscallFamilies := map[string]string{
+ "openat": "FS",
+ "write": "FS",
+ "mmap": "Memory",
+ "munmap": "Memory",
+ "pipe2": "IPC",
+ "socketpair": "Network",
+ "getpid": "Process",
+ "sched_yield": "Sched",
+ }
+ seenSyscalls := make(map[string]bool, len(expectedSyscallFamilies))
+ for _, row := range rows {
+ if row.Family == "" {
+ t.Fatalf("record has empty family tag: %+v", row)
+ }
+ wantFamily, ok := expectedSyscallFamilies[row.Syscall]
+ if !ok {
+ continue
+ }
+ if row.Family != wantFamily {
+ t.Fatalf("%s family = %q, want %q in row %+v", row.Syscall, row.Family, wantFamily, row)
+ }
+ seenSyscalls[row.Syscall] = true
+ }
+ for syscall := range expectedSyscallFamilies {
+ if !seenSyscalls[syscall] {
+ t.Fatalf("expected traced syscall %q in parquet rows", syscall)
+ }
+ }
+
+ families := aggregateRecordedFamilies(rows)
+ var total uint64
+ for _, count := range families {
+ total += count
+ }
+ if total != uint64(len(rows)) {
+ t.Fatalf("aggregated family total = %d, want row count %d", total, len(rows))
+ }
+ for _, family := range []string{"FS", "Memory", "IPC", "Network", "Process", "Sched", "Time"} {
+ if families[family] == 0 {
+ t.Fatalf("expected family %q in aggregate counts, got %+v", family, families)
+ }
+ }
+}
+
+func filterRecordsByPID(rows []iorparquet.Record, pid uint32) []iorparquet.Record {
+ filtered := make([]iorparquet.Record, 0, len(rows))
+ for _, row := range rows {
+ if row.PID == pid {
+ filtered = append(filtered, row)
+ }
+ }
+ return filtered
+}
+
+func readParquetRecords(t *testing.T, path string) []iorparquet.Record {
+ t.Helper()
+
+ f, err := os.Open(path)
+ if err != nil {
+ t.Fatalf("open parquet %q: %v", path, err)
+ }
+ defer f.Close()
+
+ reader := parquetgo.NewGenericReader[iorparquet.Record](f)
+ defer reader.Close()
+
+ var rows []iorparquet.Record
+ buf := make([]iorparquet.Record, 16)
+ for {
+ n, err := reader.Read(buf)
+ if n > 0 {
+ rows = append(rows, buf[:n]...)
+ }
+ if err == nil {
+ continue
+ }
+ if err == io.EOF {
+ return rows
+ }
+ t.Fatalf("read parquet rows: %v", err)
+ }
+}
+
+func aggregateRecordedFamilies(rows []iorparquet.Record) map[string]uint64 {
+ families := make(map[string]uint64)
+ for _, row := range rows {
+ families[row.Family]++
+ }
+ return families
+}
diff --git a/integrationtests/harness.go b/integrationtests/harness.go
index e3ee900..afde05e 100644
--- a/integrationtests/harness.go
+++ b/integrationtests/harness.go
@@ -70,6 +70,32 @@ func (h *TestHarness) RunWithIorArgs(scenario string, duration int, extraIorArgs
return result, workloadPID, nil
}
+// RunParquet executes a scenario in headless Parquet mode and returns the
+// recorded Parquet path.
+func (h *TestHarness) RunParquet(scenario string, duration int) (string, int, error) {
+ parquetPath := filepath.Join(h.OutputDir, scenario+".parquet")
+ workloadCmd, workloadPID, err := h.startWorkload(scenario)
+ if err != nil {
+ return "", 0, err
+ }
+
+ iorCmd, err := h.startIorParquet(parquetPath, duration)
+ if err != nil {
+ workloadCmd.Process.Kill()
+ workloadCmd.Wait()
+ return "", workloadPID, err
+ }
+
+ workloadErr, iorErr := waitBoth(workloadCmd, iorCmd, duration, iorShutdownGrace)
+ if iorErr != nil {
+ return "", workloadPID, fmt.Errorf("ior: %w", iorErr)
+ }
+ if workloadErr != nil {
+ return "", workloadPID, fmt.Errorf("workload: %w", workloadErr)
+ }
+ return parquetPath, workloadPID, nil
+}
+
func (h *TestHarness) startWorkload(scenario string) (*exec.Cmd, int, error) {
cmd := exec.Command(h.WorkloadBinary, "--scenario="+scenario)
cmd.Stderr = os.Stderr
@@ -128,6 +154,18 @@ func (h *TestHarness) startIor(pid int, scenario string, duration int, extraArgs
"-duration", strconv.Itoa(duration),
}
args = append(args, extraArgs...)
+ return h.startIorArgs(args)
+}
+
+func (h *TestHarness) startIorParquet(parquetPath string, duration int) (*exec.Cmd, error) {
+ args := []string{
+ "-parquet", parquetPath,
+ "-duration", strconv.Itoa(duration),
+ }
+ return h.startIorArgs(args)
+}
+
+func (h *TestHarness) startIorArgs(args []string) (*exec.Cmd, error) {
cmd := exec.Command(h.IorBinary, args...)
cmd.Dir = h.OutputDir
cmd.Stdout = os.Stdout