From d699ef44a9ded5d419a470a4f6715ecff7f0fcd1 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 19 May 2026 09:36:51 +0300 Subject: t6 address family recording review --- cmd/ioworkload/scenario_family.go | 90 +++++++++++++++++++++++++ cmd/ioworkload/scenarios.go | 1 + docs/parquet-querying.md | 1 + docs/tutorial/tutorial.md | 2 +- integrationtests/family_test.go | 115 ++++++++++++++++++++++++++++++++ integrationtests/harness.go | 38 +++++++++++ internal/tui/eventstream/export.go | 4 +- internal/tui/eventstream/export_test.go | 39 +++++++++++ 8 files changed, 287 insertions(+), 3 deletions(-) create mode 100644 cmd/ioworkload/scenario_family.go create mode 100644 integrationtests/family_test.go diff --git a/cmd/ioworkload/scenario_family.go b/cmd/ioworkload/scenario_family.go new file mode 100644 index 0000000..c550b4b --- /dev/null +++ b/cmd/ioworkload/scenario_family.go @@ -0,0 +1,90 @@ +package main + +import ( + "fmt" + "path/filepath" + "syscall" +) + +// familyMixed emits representative syscalls from multiple broad families so +// integration tests can verify family tagging and aggregation from a real trace. +func familyMixed() error { + if err := familyMixedFS(); err != nil { + return err + } + if err := familyMixedMemory(); err != nil { + return err + } + if err := familyMixedIPC(); err != nil { + return err + } + if err := familyMixedNetwork(); err != nil { + return err + } + if err := familyMixedProcessSchedTime(); err != nil { + return err + } + return nil +} + +func familyMixedFS() error { + dir, cleanup, err := makeTempDir("family-mixed") + if err != nil { + return err + } + defer cleanup() + + path := filepath.Join(dir, "family-file.txt") + fd, err := syscall.Open(path, syscall.O_RDWR|syscall.O_CREAT|syscall.O_TRUNC, 0o644) + if err != nil { + return fmt.Errorf("open family file: %w", err) + } + defer syscall.Close(fd) + if _, err := syscall.Write(fd, []byte("family")); err != nil { + return fmt.Errorf("write family file: %w", err) + } + return nil +} + +func familyMixedMemory() error { + mapped, err := syscall.Mmap(-1, 0, 4096, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_PRIVATE|syscall.MAP_ANON) + if err != nil { + return fmt.Errorf("mmap anonymous page: %w", err) + } + defer syscall.Munmap(mapped) + mapped[0] = 1 + return nil +} + +func familyMixedIPC() error { + var pipe [2]int + if err := syscall.Pipe2(pipe[:], syscall.O_CLOEXEC); err != nil { + return fmt.Errorf("pipe2: %w", err) + } + defer syscall.Close(pipe[0]) + defer syscall.Close(pipe[1]) + return nil +} + +func familyMixedNetwork() error { + fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0) + if err != nil { + return fmt.Errorf("socketpair: %w", err) + } + defer syscall.Close(fds[0]) + defer syscall.Close(fds[1]) + return nil +} + +func familyMixedProcessSchedTime() error { + if _, _, errno := syscall.RawSyscall(syscall.SYS_GETPID, 0, 0, 0); errno != 0 { + return fmt.Errorf("getpid: %w", errno) + } + if _, _, errno := syscall.RawSyscall(syscall.SYS_SCHED_YIELD, 0, 0, 0); errno != 0 { + return fmt.Errorf("sched_yield: %w", errno) + } + if err := syscall.Nanosleep(&syscall.Timespec{Nsec: 1000}, nil); err != nil { + return fmt.Errorf("nanosleep: %w", err) + } + return nil +} diff --git a/cmd/ioworkload/scenarios.go b/cmd/ioworkload/scenarios.go index fc06825..326c954 100644 --- a/cmd/ioworkload/scenarios.go +++ b/cmd/ioworkload/scenarios.go @@ -25,6 +25,7 @@ var scenarios = map[string]func() error{ "readwrite-pread-invalid": readwritePreadInvalid, "readwrite-pwrite-invalid": readwritePwriteInvalid, "retbytes-phase-a": retbytesPhaseA, + "family-mixed": familyMixed, "close-basic": closeBasic, "close-range": closeRange, "close-invalid-fd": closeInvalidFd, diff --git a/docs/parquet-querying.md b/docs/parquet-querying.md index 783c4e0..4c31474 100644 --- a/docs/parquet-querying.md +++ b/docs/parquet-querying.md @@ -26,6 +26,7 @@ state, no installation needed beyond Docker. | `pid` | UInt32 | Process ID | | `tid` | UInt32 | Thread ID | | `syscall` | String | Syscall name (e.g. `read`, `openat`) | +| `family` | String | Broad syscall family (e.g. `FS`, `Network`, `Memory`) | | `fd` | Int32 | File descriptor | | `ret` | Int64 | Return value (negative = errno) | | `bytes` | UInt64 | Bytes transferred (0 if not applicable) | diff --git a/docs/tutorial/tutorial.md b/docs/tutorial/tutorial.md index 4c556e0..7aa5f13 100644 --- a/docs/tutorial/tutorial.md +++ b/docs/tutorial/tutorial.md @@ -109,7 +109,7 @@ Press `6`. Two histograms: syscall **latency** (how long the syscall ran) and th ### 7 ยท Stream -Press `7`. A live tail of every traced event row: comm, PID, TID, syscall, family, file, FD, return value, bytes, latency, gap. This is the workhorse view; the next section explores it in depth. +Press `7`. A live tail of every traced event row: comm, PID, TID, syscall, file, FD, return value, bytes, latency, gap. This is the workhorse view; the next section explores it in depth. ![Stream tab live-tailing rows](./assets/07-stream-live.gif) diff --git a/integrationtests/family_test.go b/integrationtests/family_test.go new file mode 100644 index 0000000..ad35b46 --- /dev/null +++ b/integrationtests/family_test.go @@ -0,0 +1,115 @@ +package integrationtests + +import ( + "io" + "os" + "testing" + + iorparquet "ior/internal/parquet" + + parquetgo "github.com/parquet-go/parquet-go" +) + +func TestFamilyParquetRecordingAndAggregation(t *testing.T) { + h := newTestHarness(t) + path, pid, err := h.RunParquet("family-mixed", defaultDuration) + if err != nil { + t.Fatalf("run family-mixed parquet scenario: %v", err) + } + + rows := filterRecordsByPID(readParquetRecords(t, path), uint32(pid)) + if len(rows) == 0 { + t.Fatalf("expected parquet rows for workload PID %d", pid) + } + + expectedSyscallFamilies := map[string]string{ + "openat": "FS", + "write": "FS", + "mmap": "Memory", + "munmap": "Memory", + "pipe2": "IPC", + "socketpair": "Network", + "getpid": "Process", + "sched_yield": "Sched", + } + seenSyscalls := make(map[string]bool, len(expectedSyscallFamilies)) + for _, row := range rows { + if row.Family == "" { + t.Fatalf("record has empty family tag: %+v", row) + } + wantFamily, ok := expectedSyscallFamilies[row.Syscall] + if !ok { + continue + } + if row.Family != wantFamily { + t.Fatalf("%s family = %q, want %q in row %+v", row.Syscall, row.Family, wantFamily, row) + } + seenSyscalls[row.Syscall] = true + } + for syscall := range expectedSyscallFamilies { + if !seenSyscalls[syscall] { + t.Fatalf("expected traced syscall %q in parquet rows", syscall) + } + } + + families := aggregateRecordedFamilies(rows) + var total uint64 + for _, count := range families { + total += count + } + if total != uint64(len(rows)) { + t.Fatalf("aggregated family total = %d, want row count %d", total, len(rows)) + } + for _, family := range []string{"FS", "Memory", "IPC", "Network", "Process", "Sched", "Time"} { + if families[family] == 0 { + t.Fatalf("expected family %q in aggregate counts, got %+v", family, families) + } + } +} + +func filterRecordsByPID(rows []iorparquet.Record, pid uint32) []iorparquet.Record { + filtered := make([]iorparquet.Record, 0, len(rows)) + for _, row := range rows { + if row.PID == pid { + filtered = append(filtered, row) + } + } + return filtered +} + +func readParquetRecords(t *testing.T, path string) []iorparquet.Record { + t.Helper() + + f, err := os.Open(path) + if err != nil { + t.Fatalf("open parquet %q: %v", path, err) + } + defer f.Close() + + reader := parquetgo.NewGenericReader[iorparquet.Record](f) + defer reader.Close() + + var rows []iorparquet.Record + buf := make([]iorparquet.Record, 16) + for { + n, err := reader.Read(buf) + if n > 0 { + rows = append(rows, buf[:n]...) + } + if err == nil { + continue + } + if err == io.EOF { + return rows + } + t.Fatalf("read parquet rows: %v", err) + } +} + +func aggregateRecordedFamilies(rows []iorparquet.Record) map[string]uint64 { + families := make(map[string]uint64) + for _, row := range rows { + families[row.Family]++ + } + return families +} diff --git a/integrationtests/harness.go b/integrationtests/harness.go index e3ee900..afde05e 100644 --- a/integrationtests/harness.go +++ b/integrationtests/harness.go @@ -70,6 +70,32 @@ func (h *TestHarness) RunWithIorArgs(scenario string, duration int, extraIorArgs return result, workloadPID, nil } +// RunParquet executes a scenario in headless Parquet mode and returns the +// recorded Parquet path. +func (h *TestHarness) RunParquet(scenario string, duration int) (string, int, error) { + parquetPath := filepath.Join(h.OutputDir, scenario+".parquet") + workloadCmd, workloadPID, err := h.startWorkload(scenario) + if err != nil { + return "", 0, err + } + + iorCmd, err := h.startIorParquet(parquetPath, duration) + if err != nil { + workloadCmd.Process.Kill() + workloadCmd.Wait() + return "", workloadPID, err + } + + workloadErr, iorErr := waitBoth(workloadCmd, iorCmd, duration, iorShutdownGrace) + if iorErr != nil { + return "", workloadPID, fmt.Errorf("ior: %w", iorErr) + } + if workloadErr != nil { + return "", workloadPID, fmt.Errorf("workload: %w", workloadErr) + } + return parquetPath, workloadPID, nil +} + func (h *TestHarness) startWorkload(scenario string) (*exec.Cmd, int, error) { cmd := exec.Command(h.WorkloadBinary, "--scenario="+scenario) cmd.Stderr = os.Stderr @@ -128,6 +154,18 @@ func (h *TestHarness) startIor(pid int, scenario string, duration int, extraArgs "-duration", strconv.Itoa(duration), } args = append(args, extraArgs...) + return h.startIorArgs(args) +} + +func (h *TestHarness) startIorParquet(parquetPath string, duration int) (*exec.Cmd, error) { + args := []string{ + "-parquet", parquetPath, + "-duration", strconv.Itoa(duration), + } + return h.startIorArgs(args) +} + +func (h *TestHarness) startIorArgs(args []string) (*exec.Cmd, error) { cmd := exec.Command(h.IorBinary, args...) cmd.Dir = h.OutputDir cmd.Stdout = os.Stdout diff --git a/internal/tui/eventstream/export.go b/internal/tui/eventstream/export.go index accec8a..73bc1fa 100644 --- a/internal/tui/eventstream/export.go +++ b/internal/tui/eventstream/export.go @@ -182,7 +182,7 @@ func exportRowsToCSV(rows []StreamEvent, exportDir, filename string) (string, er // writeStreamCSV writes the CSV header and all event rows to w, calling fail // on the first write error to close the underlying file before returning. func writeStreamCSV(w *csv.Writer, rows []StreamEvent, fail func(error) (string, error)) error { - header := []string{"seq", "time_ns", "gap_ns", "latency_ns", "comm", "pid", "tid", "syscall", "family", "fd", "ret", "bytes", "file", "error"} + header := []string{"seq", "time_ns", "gap_ns", "latency_ns", "comm", "pid", "tid", "syscall", "fd", "ret", "bytes", "file", "error", "family"} if err := w.Write(header); err != nil { _, err = fail(err) return err @@ -198,12 +198,12 @@ func writeStreamCSV(w *csv.Writer, rows []StreamEvent, fail func(error) (string, fmt.Sprintf("%d", ev.PID), fmt.Sprintf("%d", ev.TID), ev.Syscall, - ev.Family, fmt.Sprintf("%d", ev.FD), fmt.Sprintf("%d", ev.RetVal), fmt.Sprintf("%d", ev.Bytes), ev.FileName, fmt.Sprintf("%t", ev.IsError), + ev.Family, } if err := w.Write(record); err != nil { _, err = fail(err) diff --git a/internal/tui/eventstream/export_test.go b/internal/tui/eventstream/export_test.go index f398fce..6cea2b1 100644 --- a/internal/tui/eventstream/export_test.go +++ b/internal/tui/eventstream/export_test.go @@ -1,6 +1,8 @@ package eventstream import ( + "bytes" + "encoding/csv" "os" "path/filepath" "reflect" @@ -196,6 +198,43 @@ func TestExportRowsToCSVPathTraversal(t *testing.T) { _ = traversal // silence unused-variable warning } +func TestWriteStreamCSVAppendsFamilyColumn(t *testing.T) { + var buf bytes.Buffer + rows := []StreamEvent{{ + Seq: 7, + TimeNs: 100, + GapNs: 3, + DurationNs: 5, + Comm: "worker", + PID: 10, + TID: 11, + Syscall: "socketpair", + FD: 4, + RetVal: 0, + Bytes: 0, + FileName: "/tmp/sock", + IsError: false, + Family: "Network", + }} + fail := func(err error) (string, error) { return "", err } + + if err := writeStreamCSV(csv.NewWriter(&buf), rows, fail); err != nil { + t.Fatalf("writeStreamCSV() error = %v", err) + } + + records, err := csv.NewReader(bytes.NewReader(buf.Bytes())).ReadAll() + if err != nil { + t.Fatalf("read CSV: %v", err) + } + wantHeader := []string{"seq", "time_ns", "gap_ns", "latency_ns", "comm", "pid", "tid", "syscall", "fd", "ret", "bytes", "file", "error", "family"} + if !reflect.DeepEqual(records[0], wantHeader) { + t.Fatalf("header = %#v, want %#v", records[0], wantHeader) + } + if records[1][8] != "4" || records[1][12] != "false" || records[1][13] != "Network" { + t.Fatalf("family should be appended without shifting legacy columns, got %#v", records[1]) + } +} + // TestShellSplitVariousCases covers the tokenizer with a table-driven approach. func TestShellSplitVariousCases(t *testing.T) { cases := []struct { -- cgit v1.2.3