summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/ioworkload/scenario_family.go90
-rw-r--r--cmd/ioworkload/scenarios.go1
-rw-r--r--docs/parquet-querying.md1
-rw-r--r--docs/tutorial/tutorial.md2
-rw-r--r--integrationtests/family_test.go115
-rw-r--r--integrationtests/harness.go38
-rw-r--r--internal/tui/eventstream/export.go4
-rw-r--r--internal/tui/eventstream/export_test.go39
8 files changed, 287 insertions, 3 deletions
diff --git a/cmd/ioworkload/scenario_family.go b/cmd/ioworkload/scenario_family.go
new file mode 100644
index 0000000..c550b4b
--- /dev/null
+++ b/cmd/ioworkload/scenario_family.go
@@ -0,0 +1,90 @@
+package main
+
+import (
+ "fmt"
+ "path/filepath"
+ "syscall"
+)
+
+// familyMixed emits representative syscalls from multiple broad families so
+// integration tests can verify family tagging and aggregation from a real trace.
+func familyMixed() error {
+ if err := familyMixedFS(); err != nil {
+ return err
+ }
+ if err := familyMixedMemory(); err != nil {
+ return err
+ }
+ if err := familyMixedIPC(); err != nil {
+ return err
+ }
+ if err := familyMixedNetwork(); err != nil {
+ return err
+ }
+ if err := familyMixedProcessSchedTime(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func familyMixedFS() error {
+ dir, cleanup, err := makeTempDir("family-mixed")
+ if err != nil {
+ return err
+ }
+ defer cleanup()
+
+ path := filepath.Join(dir, "family-file.txt")
+ fd, err := syscall.Open(path, syscall.O_RDWR|syscall.O_CREAT|syscall.O_TRUNC, 0o644)
+ if err != nil {
+ return fmt.Errorf("open family file: %w", err)
+ }
+ defer syscall.Close(fd)
+ if _, err := syscall.Write(fd, []byte("family")); err != nil {
+ return fmt.Errorf("write family file: %w", err)
+ }
+ return nil
+}
+
+func familyMixedMemory() error {
+ mapped, err := syscall.Mmap(-1, 0, 4096, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_PRIVATE|syscall.MAP_ANON)
+ if err != nil {
+ return fmt.Errorf("mmap anonymous page: %w", err)
+ }
+ defer syscall.Munmap(mapped)
+ mapped[0] = 1
+ return nil
+}
+
+func familyMixedIPC() error {
+ var pipe [2]int
+ if err := syscall.Pipe2(pipe[:], syscall.O_CLOEXEC); err != nil {
+ return fmt.Errorf("pipe2: %w", err)
+ }
+ defer syscall.Close(pipe[0])
+ defer syscall.Close(pipe[1])
+ return nil
+}
+
+func familyMixedNetwork() error {
+ fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0)
+ if err != nil {
+ return fmt.Errorf("socketpair: %w", err)
+ }
+ defer syscall.Close(fds[0])
+ defer syscall.Close(fds[1])
+ return nil
+}
+
+func familyMixedProcessSchedTime() error {
+ if _, _, errno := syscall.RawSyscall(syscall.SYS_GETPID, 0, 0, 0); errno != 0 {
+ return fmt.Errorf("getpid: %w", errno)
+ }
+ if _, _, errno := syscall.RawSyscall(syscall.SYS_SCHED_YIELD, 0, 0, 0); errno != 0 {
+ return fmt.Errorf("sched_yield: %w", errno)
+ }
+ if err := syscall.Nanosleep(&syscall.Timespec{Nsec: 1000}, nil); err != nil {
+ return fmt.Errorf("nanosleep: %w", err)
+ }
+ return nil
+}
diff --git a/cmd/ioworkload/scenarios.go b/cmd/ioworkload/scenarios.go
index fc06825..326c954 100644
--- a/cmd/ioworkload/scenarios.go
+++ b/cmd/ioworkload/scenarios.go
@@ -25,6 +25,7 @@ var scenarios = map[string]func() error{
"readwrite-pread-invalid": readwritePreadInvalid,
"readwrite-pwrite-invalid": readwritePwriteInvalid,
"retbytes-phase-a": retbytesPhaseA,
+ "family-mixed": familyMixed,
"close-basic": closeBasic,
"close-range": closeRange,
"close-invalid-fd": closeInvalidFd,
diff --git a/docs/parquet-querying.md b/docs/parquet-querying.md
index 783c4e0..4c31474 100644
--- a/docs/parquet-querying.md
+++ b/docs/parquet-querying.md
@@ -26,6 +26,7 @@ state, no installation needed beyond Docker.
| `pid` | UInt32 | Process ID |
| `tid` | UInt32 | Thread ID |
| `syscall` | String | Syscall name (e.g. `read`, `openat`) |
+| `family` | String | Broad syscall family (e.g. `FS`, `Network`, `Memory`) |
| `fd` | Int32 | File descriptor |
| `ret` | Int64 | Return value (negative = errno) |
| `bytes` | UInt64 | Bytes transferred (0 if not applicable) |
diff --git a/docs/tutorial/tutorial.md b/docs/tutorial/tutorial.md
index 4c556e0..7aa5f13 100644
--- a/docs/tutorial/tutorial.md
+++ b/docs/tutorial/tutorial.md
@@ -109,7 +109,7 @@ Press `6`. Two histograms: syscall **latency** (how long the syscall ran) and th
### 7 ยท Stream
-Press `7`. A live tail of every traced event row: comm, PID, TID, syscall, family, file, FD, return value, bytes, latency, gap. This is the workhorse view; the next section explores it in depth.
+Press `7`. A live tail of every traced event row: comm, PID, TID, syscall, file, FD, return value, bytes, latency, gap. This is the workhorse view; the next section explores it in depth.
![Stream tab live-tailing rows](./assets/07-stream-live.gif)
diff --git a/integrationtests/family_test.go b/integrationtests/family_test.go
new file mode 100644
index 0000000..ad35b46
--- /dev/null
+++ b/integrationtests/family_test.go
@@ -0,0 +1,115 @@
+package integrationtests
+
+import (
+ "io"
+ "os"
+ "testing"
+
+ iorparquet "ior/internal/parquet"
+
+ parquetgo "github.com/parquet-go/parquet-go"
+)
+
+func TestFamilyParquetRecordingAndAggregation(t *testing.T) {
+ h := newTestHarness(t)
+ path, pid, err := h.RunParquet("family-mixed", defaultDuration)
+ if err != nil {
+ t.Fatalf("run family-mixed parquet scenario: %v", err)
+ }
+
+ rows := filterRecordsByPID(readParquetRecords(t, path), uint32(pid))
+ if len(rows) == 0 {
+ t.Fatalf("expected parquet rows for workload PID %d", pid)
+ }
+
+ expectedSyscallFamilies := map[string]string{
+ "openat": "FS",
+ "write": "FS",
+ "mmap": "Memory",
+ "munmap": "Memory",
+ "pipe2": "IPC",
+ "socketpair": "Network",
+ "getpid": "Process",
+ "sched_yield": "Sched",
+ }
+ seenSyscalls := make(map[string]bool, len(expectedSyscallFamilies))
+ for _, row := range rows {
+ if row.Family == "" {
+ t.Fatalf("record has empty family tag: %+v", row)
+ }
+ wantFamily, ok := expectedSyscallFamilies[row.Syscall]
+ if !ok {
+ continue
+ }
+ if row.Family != wantFamily {
+ t.Fatalf("%s family = %q, want %q in row %+v", row.Syscall, row.Family, wantFamily, row)
+ }
+ seenSyscalls[row.Syscall] = true
+ }
+ for syscall := range expectedSyscallFamilies {
+ if !seenSyscalls[syscall] {
+ t.Fatalf("expected traced syscall %q in parquet rows", syscall)
+ }
+ }
+
+ families := aggregateRecordedFamilies(rows)
+ var total uint64
+ for _, count := range families {
+ total += count
+ }
+ if total != uint64(len(rows)) {
+ t.Fatalf("aggregated family total = %d, want row count %d", total, len(rows))
+ }
+ for _, family := range []string{"FS", "Memory", "IPC", "Network", "Process", "Sched", "Time"} {
+ if families[family] == 0 {
+ t.Fatalf("expected family %q in aggregate counts, got %+v", family, families)
+ }
+ }
+}
+
+func filterRecordsByPID(rows []iorparquet.Record, pid uint32) []iorparquet.Record {
+ filtered := make([]iorparquet.Record, 0, len(rows))
+ for _, row := range rows {
+ if row.PID == pid {
+ filtered = append(filtered, row)
+ }
+ }
+ return filtered
+}
+
+func readParquetRecords(t *testing.T, path string) []iorparquet.Record {
+ t.Helper()
+
+ f, err := os.Open(path)
+ if err != nil {
+ t.Fatalf("open parquet %q: %v", path, err)
+ }
+ defer f.Close()
+
+ reader := parquetgo.NewGenericReader[iorparquet.Record](f)
+ defer reader.Close()
+
+ var rows []iorparquet.Record
+ buf := make([]iorparquet.Record, 16)
+ for {
+ n, err := reader.Read(buf)
+ if n > 0 {
+ rows = append(rows, buf[:n]...)
+ }
+ if err == nil {
+ continue
+ }
+ if err == io.EOF {
+ return rows
+ }
+ t.Fatalf("read parquet rows: %v", err)
+ }
+}
+
+func aggregateRecordedFamilies(rows []iorparquet.Record) map[string]uint64 {
+ families := make(map[string]uint64)
+ for _, row := range rows {
+ families[row.Family]++
+ }
+ return families
+}
diff --git a/integrationtests/harness.go b/integrationtests/harness.go
index e3ee900..afde05e 100644
--- a/integrationtests/harness.go
+++ b/integrationtests/harness.go
@@ -70,6 +70,32 @@ func (h *TestHarness) RunWithIorArgs(scenario string, duration int, extraIorArgs
return result, workloadPID, nil
}
+// RunParquet executes a scenario in headless Parquet mode and returns the
+// recorded Parquet path.
+func (h *TestHarness) RunParquet(scenario string, duration int) (string, int, error) {
+ parquetPath := filepath.Join(h.OutputDir, scenario+".parquet")
+ workloadCmd, workloadPID, err := h.startWorkload(scenario)
+ if err != nil {
+ return "", 0, err
+ }
+
+ iorCmd, err := h.startIorParquet(parquetPath, duration)
+ if err != nil {
+ workloadCmd.Process.Kill()
+ workloadCmd.Wait()
+ return "", workloadPID, err
+ }
+
+ workloadErr, iorErr := waitBoth(workloadCmd, iorCmd, duration, iorShutdownGrace)
+ if iorErr != nil {
+ return "", workloadPID, fmt.Errorf("ior: %w", iorErr)
+ }
+ if workloadErr != nil {
+ return "", workloadPID, fmt.Errorf("workload: %w", workloadErr)
+ }
+ return parquetPath, workloadPID, nil
+}
+
func (h *TestHarness) startWorkload(scenario string) (*exec.Cmd, int, error) {
cmd := exec.Command(h.WorkloadBinary, "--scenario="+scenario)
cmd.Stderr = os.Stderr
@@ -128,6 +154,18 @@ func (h *TestHarness) startIor(pid int, scenario string, duration int, extraArgs
"-duration", strconv.Itoa(duration),
}
args = append(args, extraArgs...)
+ return h.startIorArgs(args)
+}
+
+func (h *TestHarness) startIorParquet(parquetPath string, duration int) (*exec.Cmd, error) {
+ args := []string{
+ "-parquet", parquetPath,
+ "-duration", strconv.Itoa(duration),
+ }
+ return h.startIorArgs(args)
+}
+
+func (h *TestHarness) startIorArgs(args []string) (*exec.Cmd, error) {
cmd := exec.Command(h.IorBinary, args...)
cmd.Dir = h.OutputDir
cmd.Stdout = os.Stdout
diff --git a/internal/tui/eventstream/export.go b/internal/tui/eventstream/export.go
index accec8a..73bc1fa 100644
--- a/internal/tui/eventstream/export.go
+++ b/internal/tui/eventstream/export.go
@@ -182,7 +182,7 @@ func exportRowsToCSV(rows []StreamEvent, exportDir, filename string) (string, er
// writeStreamCSV writes the CSV header and all event rows to w, calling fail
// on the first write error to close the underlying file before returning.
func writeStreamCSV(w *csv.Writer, rows []StreamEvent, fail func(error) (string, error)) error {
- header := []string{"seq", "time_ns", "gap_ns", "latency_ns", "comm", "pid", "tid", "syscall", "family", "fd", "ret", "bytes", "file", "error"}
+ header := []string{"seq", "time_ns", "gap_ns", "latency_ns", "comm", "pid", "tid", "syscall", "fd", "ret", "bytes", "file", "error", "family"}
if err := w.Write(header); err != nil {
_, err = fail(err)
return err
@@ -198,12 +198,12 @@ func writeStreamCSV(w *csv.Writer, rows []StreamEvent, fail func(error) (string,
fmt.Sprintf("%d", ev.PID),
fmt.Sprintf("%d", ev.TID),
ev.Syscall,
- ev.Family,
fmt.Sprintf("%d", ev.FD),
fmt.Sprintf("%d", ev.RetVal),
fmt.Sprintf("%d", ev.Bytes),
ev.FileName,
fmt.Sprintf("%t", ev.IsError),
+ ev.Family,
}
if err := w.Write(record); err != nil {
_, err = fail(err)
diff --git a/internal/tui/eventstream/export_test.go b/internal/tui/eventstream/export_test.go
index f398fce..6cea2b1 100644
--- a/internal/tui/eventstream/export_test.go
+++ b/internal/tui/eventstream/export_test.go
@@ -1,6 +1,8 @@
package eventstream
import (
+ "bytes"
+ "encoding/csv"
"os"
"path/filepath"
"reflect"
@@ -196,6 +198,43 @@ func TestExportRowsToCSVPathTraversal(t *testing.T) {
_ = traversal // silence unused-variable warning
}
+func TestWriteStreamCSVAppendsFamilyColumn(t *testing.T) {
+ var buf bytes.Buffer
+ rows := []StreamEvent{{
+ Seq: 7,
+ TimeNs: 100,
+ GapNs: 3,
+ DurationNs: 5,
+ Comm: "worker",
+ PID: 10,
+ TID: 11,
+ Syscall: "socketpair",
+ FD: 4,
+ RetVal: 0,
+ Bytes: 0,
+ FileName: "/tmp/sock",
+ IsError: false,
+ Family: "Network",
+ }}
+ fail := func(err error) (string, error) { return "", err }
+
+ if err := writeStreamCSV(csv.NewWriter(&buf), rows, fail); err != nil {
+ t.Fatalf("writeStreamCSV() error = %v", err)
+ }
+
+ records, err := csv.NewReader(bytes.NewReader(buf.Bytes())).ReadAll()
+ if err != nil {
+ t.Fatalf("read CSV: %v", err)
+ }
+ wantHeader := []string{"seq", "time_ns", "gap_ns", "latency_ns", "comm", "pid", "tid", "syscall", "fd", "ret", "bytes", "file", "error", "family"}
+ if !reflect.DeepEqual(records[0], wantHeader) {
+ t.Fatalf("header = %#v, want %#v", records[0], wantHeader)
+ }
+ if records[1][8] != "4" || records[1][12] != "false" || records[1][13] != "Network" {
+ t.Fatalf("family should be appended without shifting legacy columns, got %#v", records[1])
+ }
+}
+
// TestShellSplitVariousCases covers the tokenizer with a table-driven approach.
func TestShellSplitVariousCases(t *testing.T) {
cases := []struct {