summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/parquet/recorder_test.go29
-rw-r--r--internal/parquet/schema.go62
-rw-r--r--internal/streamrow/row.go33
-rw-r--r--internal/streamrow/row_test.go4
4 files changed, 69 insertions, 59 deletions
diff --git a/internal/parquet/recorder_test.go b/internal/parquet/recorder_test.go
index 76d70a5..96ae87c 100644
--- a/internal/parquet/recorder_test.go
+++ b/internal/parquet/recorder_test.go
@@ -134,20 +134,21 @@ func TestRecorderStopReturnsTerminalErrorOnRepeatedCalls(t *testing.T) {
func testStreamRow(seq uint64, syscall string, isError bool) streamrow.Row {
return streamrow.Row{
- Seq: seq,
- TimeNs: seq * 10,
- Syscall: syscall,
- Family: "FS",
- Comm: "ior-test",
- PID: 100 + uint32(seq),
- TID: 200 + uint32(seq),
- FileName: "/tmp/file",
- DurationNs: seq + 1,
- GapNs: seq + 2,
- Bytes: seq + 3,
- RetVal: int64(seq),
- IsError: isError,
- FD: int32(seq),
+ Seq: seq,
+ TimeNs: seq * 10,
+ Syscall: syscall,
+ Family: "FS",
+ Comm: "ior-test",
+ PID: 100 + uint32(seq),
+ TID: 200 + uint32(seq),
+ FileName: "/tmp/file",
+ DurationNs: seq + 1,
+ GapNs: seq + 2,
+ Bytes: seq + 3,
+ AddressSpaceBytes: seq + 4,
+ RetVal: int64(seq),
+ IsError: isError,
+ FD: int32(seq),
}
}
diff --git a/internal/parquet/schema.go b/internal/parquet/schema.go
index 03937bf..87d4e2f 100644
--- a/internal/parquet/schema.go
+++ b/internal/parquet/schema.go
@@ -13,21 +13,22 @@ import (
// Record is the persisted Parquet schema for one syscall stream row.
type Record struct {
- Seq uint64 `parquet:"seq"`
- TimeNS uint64 `parquet:"time_ns"`
- GapNS uint64 `parquet:"gap_ns"`
- LatencyNS uint64 `parquet:"latency_ns"`
- Comm string `parquet:"comm"`
- PID uint32 `parquet:"pid"`
- TID uint32 `parquet:"tid"`
- Syscall string `parquet:"syscall"`
- Family string `parquet:"family"`
- FD int32 `parquet:"fd"`
- Ret int64 `parquet:"ret"`
- Bytes uint64 `parquet:"bytes"`
- File string `parquet:"file"`
- IsError bool `parquet:"is_error"`
- FilterEpoch uint64 `parquet:"filter_epoch"`
+ Seq uint64 `parquet:"seq"`
+ TimeNS uint64 `parquet:"time_ns"`
+ GapNS uint64 `parquet:"gap_ns"`
+ LatencyNS uint64 `parquet:"latency_ns"`
+ Comm string `parquet:"comm"`
+ PID uint32 `parquet:"pid"`
+ TID uint32 `parquet:"tid"`
+ Syscall string `parquet:"syscall"`
+ Family string `parquet:"family"`
+ FD int32 `parquet:"fd"`
+ Ret int64 `parquet:"ret"`
+ Bytes uint64 `parquet:"bytes"`
+ AddressSpaceBytes uint64 `parquet:"address_space_bytes"`
+ File string `parquet:"file"`
+ IsError bool `parquet:"is_error"`
+ FilterEpoch uint64 `parquet:"filter_epoch"`
}
// FileMetadata captures constant metadata written once into the parquet file.
@@ -55,21 +56,22 @@ func NewFileMetadata(mode string) FileMetadata {
// RecordFromStream converts one shared stream row into the persisted format.
func RecordFromStream(row streamrow.Row, filterEpoch uint64) Record {
return Record{
- Seq: row.Seq,
- TimeNS: row.TimeNs,
- GapNS: row.GapNs,
- LatencyNS: row.DurationNs,
- Comm: row.Comm,
- PID: row.PID,
- TID: row.TID,
- Syscall: row.Syscall,
- Family: row.Family,
- FD: row.FD,
- Ret: row.RetVal,
- Bytes: row.Bytes,
- File: row.FileName,
- IsError: row.IsError,
- FilterEpoch: filterEpoch,
+ Seq: row.Seq,
+ TimeNS: row.TimeNs,
+ GapNS: row.GapNs,
+ LatencyNS: row.DurationNs,
+ Comm: row.Comm,
+ PID: row.PID,
+ TID: row.TID,
+ Syscall: row.Syscall,
+ Family: row.Family,
+ FD: row.FD,
+ Ret: row.RetVal,
+ Bytes: row.Bytes,
+ AddressSpaceBytes: row.AddressSpaceBytes,
+ File: row.FileName,
+ IsError: row.IsError,
+ FilterEpoch: filterEpoch,
}
}
diff --git a/internal/streamrow/row.go b/internal/streamrow/row.go
index 5bccb61..7497583 100644
--- a/internal/streamrow/row.go
+++ b/internal/streamrow/row.go
@@ -23,9 +23,11 @@ type Row struct {
DurationNs uint64
GapNs uint64
Bytes uint64
- RetVal int64
- IsError bool
- FD int32
+ // AddressSpaceBytes tracks memory-region extent for memory-management syscalls.
+ AddressSpaceBytes uint64
+ RetVal int64
+ IsError bool
+ FD int32
}
func (r Row) SyscallValue() string {
@@ -99,18 +101,19 @@ func (s *Sequencer) Next() uint64 {
// New converts one syscall pair into the shared row model.
func New(seq uint64, pair *event.Pair) Row {
row := Row{
- Seq: seq,
- TimeNs: pair.EnterEv.GetTime(),
- Syscall: pair.EnterEv.GetTraceId().Name(),
- Family: string(pair.EnterEv.GetTraceId().Family()),
- Comm: pair.Comm,
- PID: pair.EnterEv.GetPid(),
- TID: pair.EnterEv.GetTid(),
- FileName: pair.FileName(),
- DurationNs: pair.Duration,
- GapNs: pair.DurationToPrev,
- Bytes: pair.Bytes,
- FD: UnknownFD,
+ Seq: seq,
+ TimeNs: pair.EnterEv.GetTime(),
+ Syscall: pair.EnterEv.GetTraceId().Name(),
+ Family: string(pair.EnterEv.GetTraceId().Family()),
+ Comm: pair.Comm,
+ PID: pair.EnterEv.GetPid(),
+ TID: pair.EnterEv.GetTid(),
+ FileName: pair.FileName(),
+ DurationNs: pair.Duration,
+ GapNs: pair.DurationToPrev,
+ Bytes: pair.Bytes,
+ AddressSpaceBytes: pair.AddressSpaceBytes,
+ FD: UnknownFD,
}
if fd, ok := pair.FileDescriptor(); ok {
row.FD = fd
diff --git a/internal/streamrow/row_test.go b/internal/streamrow/row_test.go
index b239969..7573f43 100644
--- a/internal/streamrow/row_test.go
+++ b/internal/streamrow/row_test.go
@@ -61,6 +61,7 @@ func TestNewPopulatesFieldsFromPair(t *testing.T) {
pair.Duration = 66
pair.DurationToPrev = 19
pair.Bytes = 512
+ pair.AddressSpaceBytes = 2048
got := New(9, pair)
if got.Seq != 9 || got.TimeNs != 1234 {
@@ -78,6 +79,9 @@ func TestNewPopulatesFieldsFromPair(t *testing.T) {
if got.DurationNs != 66 || got.GapNs != 19 || got.Bytes != 512 {
t.Fatalf("DurationNs/GapNs/Bytes = %d/%d/%d, want 66/19/512", got.DurationNs, got.GapNs, got.Bytes)
}
+ if got.AddressSpaceBytes != 2048 {
+ t.Fatalf("AddressSpaceBytes = %d, want 2048", got.AddressSpaceBytes)
+ }
if got.RetVal != -2 || !got.IsError {
t.Fatalf("RetVal/IsError = %d/%v, want -2/true", got.RetVal, got.IsError)
}