diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-12 23:33:15 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-12 23:33:15 +0200 |
| commit | 775d3e59c7a6c060d0a9ecf3536c0df383d241be (patch) | |
| tree | 981a6550849039ef4c443dce38a05829b79a11df /internal/streamrow | |
| parent | 2efe5330cb43f685f3159a28ce211392a0bbe3c3 (diff) | |
refactor: extract shared syscall stream row model
Diffstat (limited to 'internal/streamrow')
| -rw-r--r-- | internal/streamrow/row.go | 137 | ||||
| -rw-r--r-- | internal/streamrow/row_test.go | 48 |
2 files changed, 185 insertions, 0 deletions
diff --git a/internal/streamrow/row.go b/internal/streamrow/row.go new file mode 100644 index 0000000..457fcc0 --- /dev/null +++ b/internal/streamrow/row.go @@ -0,0 +1,137 @@ +package streamrow + +import ( + "sync/atomic" + "time" + + "ior/internal/event" + "ior/internal/types" +) + +// Row is the shared syscall stream row model used by live TUI views, +// snapshot export, and future recording outputs. +type Row struct { + Seq uint64 + TimeNs uint64 + Syscall string + Comm string + PID uint32 + TID uint32 + FileName string + DurationNs uint64 + GapNs uint64 + Bytes uint64 + RetVal int64 + IsError bool + FD int32 +} + +func (r Row) SyscallValue() string { + return r.Syscall +} + +func (r Row) CommValue() string { + return r.Comm +} + +func (r Row) FileValue() string { + return r.FileName +} + +func (r Row) PIDValue() uint32 { + return r.PID +} + +func (r Row) TIDValue() uint32 { + return r.TID +} + +func (r Row) FDValue() int32 { + return r.FD +} + +func (r Row) LatencyValue() uint64 { + return r.DurationNs +} + +func (r Row) GapValue() uint64 { + return r.GapNs +} + +func (r Row) BytesValue() uint64 { + return r.Bytes +} + +func (r Row) ReturnValue() int64 { + return r.RetVal +} + +func (r Row) ErrorValue() bool { + return r.IsError +} + +// UnknownFD marks events that are not associated with a file descriptor. +const UnknownFD int32 = -1 + +// Sequencer hands out strictly increasing row sequence numbers. +type Sequencer struct { + next atomic.Uint64 +} + +// NewSequencer constructs a monotonic sequence generator. The first call to +// Next returns start+1. +func NewSequencer(start uint64) *Sequencer { + s := &Sequencer{} + s.next.Store(start) + return s +} + +// Next returns the next sequence number. +func (s *Sequencer) Next() uint64 { + if s == nil { + return 0 + } + return s.next.Add(1) +} + +// New converts one syscall pair into the shared row model. +func New(seq uint64, pair *event.Pair) Row { + row := Row{ + Seq: seq, + TimeNs: pair.EnterEv.GetTime(), + Syscall: pair.EnterEv.GetTraceId().Name(), + Comm: pair.Comm, + PID: pair.EnterEv.GetPid(), + TID: pair.EnterEv.GetTid(), + FileName: pair.FileName(), + DurationNs: pair.Duration, + GapNs: pair.DurationToPrev, + Bytes: pair.Bytes, + FD: UnknownFD, + } + if fd, ok := pair.FileDescriptor(); ok { + row.FD = fd + } + + if retEv, ok := pair.ExitEv.(*types.RetEvent); ok { + row.RetVal = retEv.Ret + row.IsError = retEv.Ret < 0 + } + + return row +} + +// NewWarning creates a synthetic row for non-fatal runtime warnings. +func NewWarning(seq uint64, message string) Row { + now := uint64(time.Now().UnixNano()) + return Row{ + Seq: seq, + TimeNs: now, + Syscall: "warning", + Comm: "ior", + FileName: message, + FD: UnknownFD, + RetVal: -1, + IsError: true, + } +} diff --git a/internal/streamrow/row_test.go b/internal/streamrow/row_test.go new file mode 100644 index 0000000..729ba94 --- /dev/null +++ b/internal/streamrow/row_test.go @@ -0,0 +1,48 @@ +package streamrow + +import ( + "sync" + "testing" +) + +func TestSequencerStartsAfterSeed(t *testing.T) { + seq := NewSequencer(41) + if got, want := seq.Next(), uint64(42); got != want { + t.Fatalf("first Next() = %d, want %d", got, want) + } + if got, want := seq.Next(), uint64(43); got != want { + t.Fatalf("second Next() = %d, want %d", got, want) + } +} + +func TestSequencerIsMonotonicUnderConcurrency(t *testing.T) { + seq := NewSequencer(0) + + const workers = 8 + const perWorker = 64 + + got := make(chan uint64, workers*perWorker) + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < perWorker; j++ { + got <- seq.Next() + } + }() + } + wg.Wait() + close(got) + + seen := make(map[uint64]struct{}, workers*perWorker) + for n := range got { + if _, ok := seen[n]; ok { + t.Fatalf("duplicate sequence number %d", n) + } + seen[n] = struct{}{} + } + if got, want := len(seen), workers*perWorker; got != want { + t.Fatalf("unique sequence count = %d, want %d", got, want) + } +} |
