diff options
| -rw-r--r-- | integrationtests/mmap_test.go | 54 | ||||
| -rw-r--r-- | internal/parquet/recorder_test.go | 29 | ||||
| -rw-r--r-- | internal/parquet/schema.go | 62 | ||||
| -rw-r--r-- | internal/streamrow/row.go | 33 | ||||
| -rw-r--r-- | internal/streamrow/row_test.go | 4 |
5 files changed, 123 insertions, 59 deletions
diff --git a/integrationtests/mmap_test.go b/integrationtests/mmap_test.go index 3ef84c5..9d1b3ad 100644 --- a/integrationtests/mmap_test.go +++ b/integrationtests/mmap_test.go @@ -2,6 +2,13 @@ package integrationtests import "testing" +const ( + mmapParquetDuration = 6 + mmapWorkloadStartupEnv = "IOR_WORKLOAD_STARTUP_DELAY_MS=1000" + mmapScenarioAddressSpaceBytes = 8192 + mmapMinAddressSpaceBytesTotal = mmapScenarioAddressSpaceBytes * 2 +) + func TestMmapBasic(t *testing.T) { runScenario(t, "mmap-basic", []ExpectedEvent{ { @@ -68,3 +75,50 @@ func TestMmapMremapMunmap(t *testing.T) { Comm: "ioworkload", }, 0) } + +func TestMmapMremapMunmapAddressSpaceBytesInParquet(t *testing.T) { + h := newTestHarness(t) + h.WorkloadEnv = []string{mmapWorkloadStartupEnv} + path, pid, err := h.RunParquet("mmap-mremap-munmap", mmapParquetDuration) + if err != nil { + t.Fatalf("run mmap-mremap-munmap parquet scenario: %v", err) + } + + rows := filterRecordsByPID(readParquetRecords(t, path), uint32(pid)) + if len(rows) == 0 { + t.Fatalf("expected parquet rows for workload PID %d", pid) + } + + var foundMremap, foundMunmap bool + var addressSpaceTotal uint64 + for _, row := range rows { + switch row.Syscall { + case "mremap": + if row.Bytes != 0 { + t.Fatalf("mremap bytes = %d, want 0 (I/O bytes must stay separate)", row.Bytes) + } + if row.AddressSpaceBytes == mmapScenarioAddressSpaceBytes { + foundMremap = true + } + addressSpaceTotal += row.AddressSpaceBytes + case "munmap": + if row.Bytes != 0 { + t.Fatalf("munmap bytes = %d, want 0 (I/O bytes must stay separate)", row.Bytes) + } + if row.AddressSpaceBytes == mmapScenarioAddressSpaceBytes { + foundMunmap = true + } + addressSpaceTotal += row.AddressSpaceBytes + } + } + + if !foundMremap { + t.Fatalf("expected mremap row with AddressSpaceBytes=%d", mmapScenarioAddressSpaceBytes) + } + if !foundMunmap { + t.Fatalf("expected munmap row with AddressSpaceBytes=%d", mmapScenarioAddressSpaceBytes) + } + if addressSpaceTotal < mmapMinAddressSpaceBytesTotal { + t.Fatalf("mremap+munmap AddressSpaceBytes total = %d, want >= %d", addressSpaceTotal, mmapMinAddressSpaceBytesTotal) + } +} diff --git a/internal/parquet/recorder_test.go b/internal/parquet/recorder_test.go index 76d70a5..96ae87c 100644 --- a/internal/parquet/recorder_test.go +++ b/internal/parquet/recorder_test.go @@ -134,20 +134,21 @@ func TestRecorderStopReturnsTerminalErrorOnRepeatedCalls(t *testing.T) { func testStreamRow(seq uint64, syscall string, isError bool) streamrow.Row { return streamrow.Row{ - Seq: seq, - TimeNs: seq * 10, - Syscall: syscall, - Family: "FS", - Comm: "ior-test", - PID: 100 + uint32(seq), - TID: 200 + uint32(seq), - FileName: "/tmp/file", - DurationNs: seq + 1, - GapNs: seq + 2, - Bytes: seq + 3, - RetVal: int64(seq), - IsError: isError, - FD: int32(seq), + Seq: seq, + TimeNs: seq * 10, + Syscall: syscall, + Family: "FS", + Comm: "ior-test", + PID: 100 + uint32(seq), + TID: 200 + uint32(seq), + FileName: "/tmp/file", + DurationNs: seq + 1, + GapNs: seq + 2, + Bytes: seq + 3, + AddressSpaceBytes: seq + 4, + RetVal: int64(seq), + IsError: isError, + FD: int32(seq), } } diff --git a/internal/parquet/schema.go b/internal/parquet/schema.go index 03937bf..87d4e2f 100644 --- a/internal/parquet/schema.go +++ b/internal/parquet/schema.go @@ -13,21 +13,22 @@ import ( // Record is the persisted Parquet schema for one syscall stream row. type Record struct { - Seq uint64 `parquet:"seq"` - TimeNS uint64 `parquet:"time_ns"` - GapNS uint64 `parquet:"gap_ns"` - LatencyNS uint64 `parquet:"latency_ns"` - Comm string `parquet:"comm"` - PID uint32 `parquet:"pid"` - TID uint32 `parquet:"tid"` - Syscall string `parquet:"syscall"` - Family string `parquet:"family"` - FD int32 `parquet:"fd"` - Ret int64 `parquet:"ret"` - Bytes uint64 `parquet:"bytes"` - File string `parquet:"file"` - IsError bool `parquet:"is_error"` - FilterEpoch uint64 `parquet:"filter_epoch"` + Seq uint64 `parquet:"seq"` + TimeNS uint64 `parquet:"time_ns"` + GapNS uint64 `parquet:"gap_ns"` + LatencyNS uint64 `parquet:"latency_ns"` + Comm string `parquet:"comm"` + PID uint32 `parquet:"pid"` + TID uint32 `parquet:"tid"` + Syscall string `parquet:"syscall"` + Family string `parquet:"family"` + FD int32 `parquet:"fd"` + Ret int64 `parquet:"ret"` + Bytes uint64 `parquet:"bytes"` + AddressSpaceBytes uint64 `parquet:"address_space_bytes"` + File string `parquet:"file"` + IsError bool `parquet:"is_error"` + FilterEpoch uint64 `parquet:"filter_epoch"` } // FileMetadata captures constant metadata written once into the parquet file. @@ -55,21 +56,22 @@ func NewFileMetadata(mode string) FileMetadata { // RecordFromStream converts one shared stream row into the persisted format. func RecordFromStream(row streamrow.Row, filterEpoch uint64) Record { return Record{ - Seq: row.Seq, - TimeNS: row.TimeNs, - GapNS: row.GapNs, - LatencyNS: row.DurationNs, - Comm: row.Comm, - PID: row.PID, - TID: row.TID, - Syscall: row.Syscall, - Family: row.Family, - FD: row.FD, - Ret: row.RetVal, - Bytes: row.Bytes, - File: row.FileName, - IsError: row.IsError, - FilterEpoch: filterEpoch, + Seq: row.Seq, + TimeNS: row.TimeNs, + GapNS: row.GapNs, + LatencyNS: row.DurationNs, + Comm: row.Comm, + PID: row.PID, + TID: row.TID, + Syscall: row.Syscall, + Family: row.Family, + FD: row.FD, + Ret: row.RetVal, + Bytes: row.Bytes, + AddressSpaceBytes: row.AddressSpaceBytes, + File: row.FileName, + IsError: row.IsError, + FilterEpoch: filterEpoch, } } diff --git a/internal/streamrow/row.go b/internal/streamrow/row.go index 5bccb61..7497583 100644 --- a/internal/streamrow/row.go +++ b/internal/streamrow/row.go @@ -23,9 +23,11 @@ type Row struct { DurationNs uint64 GapNs uint64 Bytes uint64 - RetVal int64 - IsError bool - FD int32 + // AddressSpaceBytes tracks memory-region extent for memory-management syscalls. + AddressSpaceBytes uint64 + RetVal int64 + IsError bool + FD int32 } func (r Row) SyscallValue() string { @@ -99,18 +101,19 @@ func (s *Sequencer) Next() uint64 { // New converts one syscall pair into the shared row model. func New(seq uint64, pair *event.Pair) Row { row := Row{ - Seq: seq, - TimeNs: pair.EnterEv.GetTime(), - Syscall: pair.EnterEv.GetTraceId().Name(), - Family: string(pair.EnterEv.GetTraceId().Family()), - Comm: pair.Comm, - PID: pair.EnterEv.GetPid(), - TID: pair.EnterEv.GetTid(), - FileName: pair.FileName(), - DurationNs: pair.Duration, - GapNs: pair.DurationToPrev, - Bytes: pair.Bytes, - FD: UnknownFD, + Seq: seq, + TimeNs: pair.EnterEv.GetTime(), + Syscall: pair.EnterEv.GetTraceId().Name(), + Family: string(pair.EnterEv.GetTraceId().Family()), + Comm: pair.Comm, + PID: pair.EnterEv.GetPid(), + TID: pair.EnterEv.GetTid(), + FileName: pair.FileName(), + DurationNs: pair.Duration, + GapNs: pair.DurationToPrev, + Bytes: pair.Bytes, + AddressSpaceBytes: pair.AddressSpaceBytes, + FD: UnknownFD, } if fd, ok := pair.FileDescriptor(); ok { row.FD = fd diff --git a/internal/streamrow/row_test.go b/internal/streamrow/row_test.go index b239969..7573f43 100644 --- a/internal/streamrow/row_test.go +++ b/internal/streamrow/row_test.go @@ -61,6 +61,7 @@ func TestNewPopulatesFieldsFromPair(t *testing.T) { pair.Duration = 66 pair.DurationToPrev = 19 pair.Bytes = 512 + pair.AddressSpaceBytes = 2048 got := New(9, pair) if got.Seq != 9 || got.TimeNs != 1234 { @@ -78,6 +79,9 @@ func TestNewPopulatesFieldsFromPair(t *testing.T) { if got.DurationNs != 66 || got.GapNs != 19 || got.Bytes != 512 { t.Fatalf("DurationNs/GapNs/Bytes = %d/%d/%d, want 66/19/512", got.DurationNs, got.GapNs, got.Bytes) } + if got.AddressSpaceBytes != 2048 { + t.Fatalf("AddressSpaceBytes = %d, want 2048", got.AddressSpaceBytes) + } if got.RetVal != -2 || !got.IsError { t.Fatalf("RetVal/IsError = %d/%v, want -2/true", got.RetVal, got.IsError) } |
