diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-12 23:47:23 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-12 23:47:23 +0200 |
| commit | 767c2b54779cbf81b68217c6e83868cffb6a0965 (patch) | |
| tree | d657bcb88f22357f506a5134227d87958190f07a | |
| parent | 775d3e59c7a6c060d0a9ecf3536c0df383d241be (diff) | |
feat: add parquet recorder foundation
| -rw-r--r-- | go.mod | 9 | ||||
| -rw-r--r-- | go.sum | 32 | ||||
| -rw-r--r-- | internal/parquet/recorder.go | 372 | ||||
| -rw-r--r-- | internal/parquet/recorder_test.go | 175 | ||||
| -rw-r--r-- | internal/parquet/schema.go | 81 | ||||
| -rw-r--r-- | internal/parquet/writer.go | 209 | ||||
| -rw-r--r-- | internal/parquet/writer_test.go | 113 |
7 files changed, 991 insertions, 0 deletions
@@ -11,9 +11,11 @@ require ( github.com/charmbracelet/harmonica v0.2.0 github.com/charmbracelet/x/term v0.2.2 github.com/magefile/mage v1.15.0 + github.com/parquet-go/parquet-go v0.29.0 ) require ( + github.com/andybalholm/brotli v1.1.1 // indirect github.com/atotto/clipboard v0.1.4 // indirect github.com/charmbracelet/colorprofile v0.4.2 // indirect github.com/charmbracelet/ultraviolet v0.0.0-20260205113103-524a6607adb8 // indirect @@ -22,11 +24,18 @@ require ( github.com/charmbracelet/x/windows v0.2.2 // indirect github.com/clipperhouse/displaywidth v0.11.0 // indirect github.com/clipperhouse/uax29/v2 v2.7.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/lucasb-eyer/go-colorful v1.3.0 // indirect github.com/mattn/go-runewidth v0.0.20 // indirect github.com/muesli/cancelreader v0.2.2 // indirect + github.com/parquet-go/bitpack v1.0.0 // indirect + github.com/parquet-go/jsonlite v1.0.0 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/twpayne/go-geom v1.6.1 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect ) @@ -4,8 +4,16 @@ charm.land/bubbletea/v2 v2.0.1 h1:B8e9zzK7x9JJ+XvHGF4xnYu9Xa0E0y0MyggY6dbaCfQ= charm.land/bubbletea/v2 v2.0.1/go.mod h1:3LRff2U4WIYXy7MTxfbAQ+AdfM3D8Xuvz2wbsOD9OHQ= charm.land/lipgloss/v2 v2.0.0 h1:sd8N/B3x892oiOjFfBQdXBQp3cAkvjGaU5TvVZC3ivo= charm.land/lipgloss/v2 v2.0.0/go.mod h1:w6SnmsBFBmEFBodiEDurGS/sdUY/u1+v72DqUzc6J14= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE= github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/alecthomas/assert/v2 v2.10.0 h1:jjRCHsj6hBJhkmhznrCzoNpbA3zqy0fYiUcYZP/GkPY= +github.com/alecthomas/assert/v2 v2.10.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= +github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= +github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/aquasecurity/libbpfgo v0.9.2-libbpf-1.5.1 h1:TDN+16Nim3gimjuTxd+sFhb4v06mEeYH0JfRWAFowA0= github.com/aquasecurity/libbpfgo v0.9.2-libbpf-1.5.1/go.mod h1:JQNC5NuGwyYC7IZum6JqPNVHarFAuab+h4lO6t0jIhc= github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4= @@ -34,6 +42,14 @@ github.com/clipperhouse/uax29/v2 v2.7.0 h1:+gs4oBZ2gPfVrKPthwbMzWZDaAFPGYK72F0NJ github.com/clipperhouse/uax29/v2 v2.7.0/go.mod h1:EFJ2TJMRUaplDxHKj1qAEhCtQPW2tJSwu5BF98AuoVM= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/lucasb-eyer/go-colorful v1.3.0 h1:2/yBRLdWBZKrf7gB40FoiKfAWYQ0lqNcbuQwVHXptag= github.com/lucasb-eyer/go-colorful v1.3.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= @@ -42,20 +58,36 @@ github.com/mattn/go-runewidth v0.0.20 h1:WcT52H91ZUAwy8+HUkdM3THM6gXqXuLJi9O3rjc github.com/mattn/go-runewidth v0.0.20/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA= github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= +github.com/parquet-go/bitpack v1.0.0 h1:AUqzlKzPPXf2bCdjfj4sTeacrUwsT7NlcYDMUQxPcQA= +github.com/parquet-go/bitpack v1.0.0/go.mod h1:XnVk9TH+O40eOOmvpAVZ7K2ocQFrQwysLMnc6M/8lgs= +github.com/parquet-go/jsonlite v1.0.0 h1:87QNdi56wOfsE5bdgas0vRzHPxfJgzrXGml1zZdd7VU= +github.com/parquet-go/jsonlite v1.0.0/go.mod h1:nDjpkpL4EOtqs6NQugUsi0Rleq9sW/OtC1NnZEnxzF0= +github.com/parquet-go/parquet-go v0.29.0 h1:xXlPtFVR51jpSVzf+cgHnNIcb7Xet+iuvkbe0HIm90Y= +github.com/parquet-go/parquet-go v0.29.0/go.mod h1:navtkAYr2LGoJVp141oXPlO/sxLvaOe3la2JEoD8+rg= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twpayne/go-geom v1.6.1 h1:iLE+Opv0Ihm/ABIcvQFGIiFBXd76oBIar9drAwHFhR4= +github.com/twpayne/go-geom v1.6.1/go.mod h1:Kr+Nly6BswFsKM5sd31YaoWS5PeDDH2NftJTK7Gd028= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= kernel.org/pub/linux/libs/security/libcap/cap v1.2.76 h1:mrdLPj8ujM6eIKGtd1PkkuCIodpFFDM42Cfm0YODkIM= diff --git a/internal/parquet/recorder.go b/internal/parquet/recorder.go new file mode 100644 index 0000000..f489b68 --- /dev/null +++ b/internal/parquet/recorder.go @@ -0,0 +1,372 @@ +package parquet + +import ( + "errors" + "sync" + "time" + + "ior/internal/streamrow" +) + +const ( + defaultRecorderQueueCapacity = 4096 + defaultRecorderBatchSize = 256 + defaultRecorderFlushInterval = 250 * time.Millisecond +) + +var ( + // ErrRecorderActive indicates a start request while a recorder session is running. + ErrRecorderActive = errors.New("parquet recorder is already active") + // ErrRecorderNotActive indicates that rows cannot be accepted because no session is active. + ErrRecorderNotActive = errors.New("parquet recorder is not active") + // ErrRecorderQueueFull indicates bounded queue overflow. This is treated as a recording failure. + ErrRecorderQueueFull = errors.New("parquet recorder queue is full") +) + +type rowWriter interface { + WriteRows([]Record) error + Close() error + Abort() error + FinalPath() string + TempPath() string +} + +type writerFactory func(path string, cfg WriterConfig, meta FileMetadata) (rowWriter, error) + +// RecorderConfig controls queueing and batching behavior. +type RecorderConfig struct { + QueueCapacity int + BatchSize int + FlushInterval time.Duration + Writer WriterConfig + + newWriter writerFactory +} + +// StartOptions supplies per-session metadata. +type StartOptions struct { + Metadata FileMetadata +} + +// Status reports the last known recorder state. +type Status struct { + Active bool + Path string + TempPath string + RowsWritten uint64 + LastError error +} + +// Recorder manages one active parquet recording session at a time. +type Recorder struct { + mu sync.RWMutex + config RecorderConfig + active *recordingSession + status Status +} + +type recordingSession struct { + queue chan recordRequest + stopC chan struct{} + doneC chan error + + mu sync.Mutex + accepting bool + stopCause error + stopOnce sync.Once +} + +type recordRequest struct { + row streamrow.Row + filterEpoch uint64 +} + +// NewRecorder constructs a reusable parquet recorder controller. +func NewRecorder(config RecorderConfig) *Recorder { + return &Recorder{config: normalizeRecorderConfig(config)} +} + +// Start begins a new recording session. +func (r *Recorder) Start(path string, options StartOptions) error { + if r == nil { + return ErrRecorderNotActive + } + + cfg := normalizeRecorderConfig(r.config) + buildWriter := cfg.newWriter + if buildWriter == nil { + buildWriter = func(path string, cfg WriterConfig, meta FileMetadata) (rowWriter, error) { + return NewWriter(path, cfg, meta) + } + } + + writer, err := buildWriter(path, cfg.Writer, options.Metadata) + if err != nil { + return err + } + + session := newRecordingSession(cfg.QueueCapacity) + + r.mu.Lock() + if r.active != nil { + r.mu.Unlock() + _ = writer.Abort() + return ErrRecorderActive + } + r.active = session + r.status = Status{ + Active: true, + Path: writer.FinalPath(), + TempPath: writer.TempPath(), + } + r.mu.Unlock() + + go r.runSession(session, writer, cfg) + return nil +} + +// Record queues one shared stream row for persistence. +func (r *Recorder) Record(row streamrow.Row, filterEpoch uint64) error { + if r == nil { + return ErrRecorderNotActive + } + + r.mu.RLock() + session := r.active + lastErr := r.status.LastError + r.mu.RUnlock() + + if session == nil { + if lastErr != nil { + return lastErr + } + return ErrRecorderNotActive + } + return session.enqueue(recordRequest{row: row, filterEpoch: filterEpoch}) +} + +// Stop gracefully flushes and finalizes the active recording session. +func (r *Recorder) Stop() error { + if r == nil { + return nil + } + + r.mu.RLock() + session := r.active + lastErr := r.status.LastError + r.mu.RUnlock() + + if session == nil { + return lastErr + } + + session.stop(nil) + return <-session.doneC +} + +// Status returns a snapshot of the recorder state. +func (r *Recorder) Status() Status { + if r == nil { + return Status{} + } + + r.mu.RLock() + defer r.mu.RUnlock() + return r.status +} + +func (r *Recorder) runSession(session *recordingSession, writer rowWriter, cfg RecorderConfig) { + ticker := time.NewTicker(cfg.FlushInterval) + defer ticker.Stop() + + var written uint64 + batch := make([]Record, 0, cfg.BatchSize) + + for { + select { + case req := <-session.queue: + if err := r.bufferRecord(session, writer, &batch, &written, cfg.BatchSize, req); err != nil { + r.abortSession(session, writer, err) + return + } + + case <-ticker.C: + if err := r.flushBatch(session, writer, &batch, &written); err != nil { + r.abortSession(session, writer, err) + return + } + + case <-session.stopC: + r.completeSession(session, r.stopSession(session, writer, &batch, &written, cfg.BatchSize)) + return + } + } +} + +func (r *Recorder) bufferRecord( + session *recordingSession, + writer rowWriter, + batch *[]Record, + written *uint64, + batchSize int, + req recordRequest, +) error { + *batch = append(*batch, RecordFromStream(req.row, req.filterEpoch)) + if len(*batch) < batchSize { + return nil + } + return r.flushBatch(session, writer, batch, written) +} + +func (r *Recorder) flushBatch( + session *recordingSession, + writer rowWriter, + batch *[]Record, + written *uint64, +) error { + if len(*batch) == 0 { + return nil + } + if err := writer.WriteRows(*batch); err != nil { + return err + } + *written += uint64(len(*batch)) + r.updateRowsWritten(session, *written) + *batch = (*batch)[:0] + return nil +} + +func (r *Recorder) stopSession( + session *recordingSession, + writer rowWriter, + batch *[]Record, + written *uint64, + batchSize int, +) error { + if cause := session.cause(); cause != nil { + _ = writer.Abort() + return cause + } + if err := drainQueue(session, func(req recordRequest) error { + return r.bufferRecord(session, writer, batch, written, batchSize, req) + }); err != nil { + _ = writer.Abort() + return err + } + if err := r.flushBatch(session, writer, batch, written); err != nil { + _ = writer.Abort() + return err + } + return writer.Close() +} + +func (r *Recorder) abortSession(session *recordingSession, writer rowWriter, err error) { + session.stop(err) + _ = writer.Abort() + r.completeSession(session, err) +} + +func (r *Recorder) completeSession(session *recordingSession, err error) { + r.finishSession(session, err) + session.doneC <- err + close(session.doneC) +} + +func (r *Recorder) updateRowsWritten(session *recordingSession, rowsWritten uint64) { + r.mu.Lock() + defer r.mu.Unlock() + if r.active != session { + return + } + r.status.RowsWritten = rowsWritten +} + +func (r *Recorder) finishSession(session *recordingSession, err error) { + r.mu.Lock() + defer r.mu.Unlock() + if r.active != session { + return + } + r.status.Active = false + r.status.LastError = err + if err == nil { + r.status.TempPath = "" + } + r.active = nil +} + +func normalizeRecorderConfig(cfg RecorderConfig) RecorderConfig { + if cfg.QueueCapacity <= 0 { + cfg.QueueCapacity = defaultRecorderQueueCapacity + } + if cfg.BatchSize <= 0 { + cfg.BatchSize = defaultRecorderBatchSize + } + if cfg.FlushInterval <= 0 { + cfg.FlushInterval = defaultRecorderFlushInterval + } + cfg.Writer = normalizeWriterConfig(cfg.Writer) + return cfg +} + +func newRecordingSession(queueCapacity int) *recordingSession { + return &recordingSession{ + queue: make(chan recordRequest, queueCapacity), + stopC: make(chan struct{}), + doneC: make(chan error, 1), + accepting: true, + } +} + +func (s *recordingSession) enqueue(req recordRequest) error { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.accepting { + if s.stopCause != nil { + return s.stopCause + } + return ErrRecorderNotActive + } + + select { + case s.queue <- req: + return nil + default: + s.accepting = false + if s.stopCause == nil { + s.stopCause = ErrRecorderQueueFull + } + s.stopOnce.Do(func() { close(s.stopC) }) + return ErrRecorderQueueFull + } +} + +func (s *recordingSession) stop(cause error) { + s.mu.Lock() + s.accepting = false + if cause != nil && s.stopCause == nil { + s.stopCause = cause + } + s.mu.Unlock() + s.stopOnce.Do(func() { close(s.stopC) }) +} + +func (s *recordingSession) cause() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.stopCause +} + +func drainQueue(session *recordingSession, consume func(recordRequest) error) error { + for { + select { + case req := <-session.queue: + if err := consume(req); err != nil { + return err + } + default: + return nil + } + } +} diff --git a/internal/parquet/recorder_test.go b/internal/parquet/recorder_test.go new file mode 100644 index 0000000..d4caa11 --- /dev/null +++ b/internal/parquet/recorder_test.go @@ -0,0 +1,175 @@ +package parquet + +import ( + "errors" + "path/filepath" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" + + "ior/internal/streamrow" +) + +func TestRecorderRoundTrip(t *testing.T) { + recorder := NewRecorder(RecorderConfig{ + QueueCapacity: 8, + BatchSize: 2, + FlushInterval: time.Hour, + }) + + path := filepath.Join(t.TempDir(), "session") + if err := recorder.Start(path, StartOptions{Metadata: FileMetadata{Mode: "tui"}}); err != nil { + t.Fatalf("Start() error = %v", err) + } + + rows := []streamrow.Row{ + testStreamRow(1, "read", false), + testStreamRow(2, "write", false), + testStreamRow(3, "openat", true), + } + epochs := []uint64{0, 0, 3} + + for i := range rows { + if err := recorder.Record(rows[i], epochs[i]); err != nil { + t.Fatalf("Record(%d) error = %v", i, err) + } + } + + if err := recorder.Stop(); err != nil { + t.Fatalf("Stop() error = %v", err) + } + + status := recorder.Status() + if status.Active { + t.Fatalf("Status().Active = true, want false") + } + if status.RowsWritten != 3 { + t.Fatalf("Status().RowsWritten = %d, want 3", status.RowsWritten) + } + if status.LastError != nil { + t.Fatalf("Status().LastError = %v, want nil", status.LastError) + } + if status.TempPath != "" { + t.Fatalf("Status().TempPath = %q, want empty after successful stop", status.TempPath) + } + + want := []Record{ + RecordFromStream(rows[0], epochs[0]), + RecordFromStream(rows[1], epochs[1]), + RecordFromStream(rows[2], epochs[2]), + } + got := readAllRecords(t, status.Path) + if !reflect.DeepEqual(got, want) { + t.Fatalf("records mismatch\n got: %+v\nwant: %+v", got, want) + } +} + +func TestRecorderFailsOnQueueOverflow(t *testing.T) { + writer := newBlockingWriter() + recorder := NewRecorder(RecorderConfig{ + QueueCapacity: 1, + BatchSize: 1, + FlushInterval: time.Hour, + newWriter: func(string, WriterConfig, FileMetadata) (rowWriter, error) { + return writer, nil + }, + }) + + if err := recorder.Start("ignored", StartOptions{}); err != nil { + t.Fatalf("Start() error = %v", err) + } + if err := recorder.Record(testStreamRow(1, "read", false), 0); err != nil { + t.Fatalf("first Record() error = %v", err) + } + + <-writer.started + + if err := recorder.Record(testStreamRow(2, "write", false), 0); err != nil { + t.Fatalf("second Record() error = %v", err) + } + if err := recorder.Record(testStreamRow(3, "openat", false), 0); !errors.Is(err, ErrRecorderQueueFull) { + t.Fatalf("third Record() error = %v, want %v", err, ErrRecorderQueueFull) + } + + writer.releaseWrites() + + if err := recorder.Stop(); !errors.Is(err, ErrRecorderQueueFull) { + t.Fatalf("Stop() error = %v, want %v", err, ErrRecorderQueueFull) + } + + status := recorder.Status() + if status.Active { + t.Fatalf("Status().Active = true, want false") + } + if !errors.Is(status.LastError, ErrRecorderQueueFull) { + t.Fatalf("Status().LastError = %v, want %v", status.LastError, ErrRecorderQueueFull) + } + if !writer.aborted.Load() { + t.Fatalf("expected recorder failure path to abort the backing writer") + } +} + +func testStreamRow(seq uint64, syscall string, isError bool) streamrow.Row { + return streamrow.Row{ + Seq: seq, + TimeNs: seq * 10, + Syscall: syscall, + Comm: "ior-test", + PID: 100 + uint32(seq), + TID: 200 + uint32(seq), + FileName: "/tmp/file", + DurationNs: seq + 1, + GapNs: seq + 2, + Bytes: seq + 3, + RetVal: int64(seq), + IsError: isError, + FD: int32(seq), + } +} + +type blockingWriter struct { + started chan struct{} + release chan struct{} + + startOnce sync.Once + releaseOnce sync.Once + + aborted atomic.Bool +} + +func newBlockingWriter() *blockingWriter { + return &blockingWriter{ + started: make(chan struct{}), + release: make(chan struct{}), + } +} + +func (w *blockingWriter) WriteRows([]Record) error { + w.startOnce.Do(func() { close(w.started) }) + <-w.release + return nil +} + +func (w *blockingWriter) Close() error { + return nil +} + +func (w *blockingWriter) Abort() error { + w.aborted.Store(true) + w.releaseWrites() + return nil +} + +func (w *blockingWriter) FinalPath() string { + return "ignored.parquet" +} + +func (w *blockingWriter) TempPath() string { + return "ignored.parquet.tmp" +} + +func (w *blockingWriter) releaseWrites() { + w.releaseOnce.Do(func() { close(w.release) }) +} diff --git a/internal/parquet/schema.go b/internal/parquet/schema.go new file mode 100644 index 0000000..2ede444 --- /dev/null +++ b/internal/parquet/schema.go @@ -0,0 +1,81 @@ +package parquet + +import ( + "strconv" + + "ior/internal/flags" + "ior/internal/streamrow" + + parquetgo "github.com/parquet-go/parquet-go" +) + +// Record is the persisted Parquet schema for one syscall stream row. +type Record struct { + Seq uint64 `parquet:"seq"` + TimeNS uint64 `parquet:"time_ns"` + GapNS uint64 `parquet:"gap_ns"` + LatencyNS uint64 `parquet:"latency_ns"` + Comm string `parquet:"comm"` + PID uint32 `parquet:"pid"` + TID uint32 `parquet:"tid"` + Syscall string `parquet:"syscall"` + FD int32 `parquet:"fd"` + Ret int64 `parquet:"ret"` + Bytes uint64 `parquet:"bytes"` + File string `parquet:"file"` + IsError bool `parquet:"is_error"` + FilterEpoch uint64 `parquet:"filter_epoch"` +} + +// FileMetadata captures constant metadata written once into the parquet file. +type FileMetadata struct { + Hostname string + StartedAtUnixNano uint64 + Mode string + IORVersion string +} + +// RecordFromStream converts one shared stream row into the persisted format. +func RecordFromStream(row streamrow.Row, filterEpoch uint64) Record { + return Record{ + Seq: row.Seq, + TimeNS: row.TimeNs, + GapNS: row.GapNs, + LatencyNS: row.DurationNs, + Comm: row.Comm, + PID: row.PID, + TID: row.TID, + Syscall: row.Syscall, + FD: row.FD, + Ret: row.RetVal, + Bytes: row.Bytes, + File: row.FileName, + IsError: row.IsError, + FilterEpoch: filterEpoch, + } +} + +func writerMetadataOptions(meta FileMetadata) []parquetgo.WriterOption { + meta = normalizeMetadata(meta) + options := make([]parquetgo.WriterOption, 0, 4) + if meta.Hostname != "" { + options = append(options, parquetgo.KeyValueMetadata("ior.hostname", meta.Hostname)) + } + if meta.StartedAtUnixNano != 0 { + options = append(options, parquetgo.KeyValueMetadata("ior.started_at_unix_nano", strconv.FormatUint(meta.StartedAtUnixNano, 10))) + } + if meta.Mode != "" { + options = append(options, parquetgo.KeyValueMetadata("ior.mode", meta.Mode)) + } + if meta.IORVersion != "" { + options = append(options, parquetgo.KeyValueMetadata("ior.version", meta.IORVersion)) + } + return options +} + +func normalizeMetadata(meta FileMetadata) FileMetadata { + if meta.IORVersion == "" { + meta.IORVersion = flags.Version + } + return meta +} diff --git a/internal/parquet/writer.go b/internal/parquet/writer.go new file mode 100644 index 0000000..82428d9 --- /dev/null +++ b/internal/parquet/writer.go @@ -0,0 +1,209 @@ +package parquet + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + + parquetgo "github.com/parquet-go/parquet-go" +) + +const ( + defaultMaxRowsPerRowGroup = int64(8192) + defaultPageBufferSize = 256 * 1024 +) + +var errWriterClosed = errors.New("parquet writer is closed") + +// WriterConfig tunes parquet file layout details. +type WriterConfig struct { + MaxRowsPerRowGroup int64 + PageBufferSize int +} + +// DefaultWriterConfig returns the recommended writer defaults for ior traces. +func DefaultWriterConfig() WriterConfig { + return WriterConfig{ + MaxRowsPerRowGroup: defaultMaxRowsPerRowGroup, + PageBufferSize: defaultPageBufferSize, + } +} + +type writerState int + +const ( + writerStateOpen writerState = iota + writerStateClosed + writerStateAborted +) + +// Writer wraps the parquet library behind repo-local file lifecycle semantics. +type Writer struct { + mu sync.Mutex + + finalPath string + tempPath string + file *os.File + writer *parquetgo.GenericWriter[Record] + state writerState +} + +// NewWriter creates a new parquet writer that writes to a temporary file first +// and only publishes the final path once Close succeeds. +func NewWriter(path string, cfg WriterConfig, meta FileMetadata) (*Writer, error) { + finalPath, tempPath, err := normalizeOutputPaths(path) + if err != nil { + return nil, err + } + + cfg = normalizeWriterConfig(cfg) + file, err := os.Create(tempPath) + if err != nil { + return nil, err + } + + options := []parquetgo.WriterOption{ + parquetgo.Compression(&parquetgo.Zstd), + parquetgo.CreatedBy("ior", normalizeMetadata(meta).IORVersion, ""), + parquetgo.MaxRowsPerRowGroup(cfg.MaxRowsPerRowGroup), + parquetgo.PageBufferSize(cfg.PageBufferSize), + } + options = append(options, writerMetadataOptions(meta)...) + + return &Writer{ + finalPath: finalPath, + tempPath: tempPath, + file: file, + writer: parquetgo.NewGenericWriter[Record](file, options...), + state: writerStateOpen, + }, nil +} + +// FinalPath returns the finalized parquet path. +func (w *Writer) FinalPath() string { + if w == nil { + return "" + } + return w.finalPath +} + +// TempPath returns the temporary parquet path used before finalization. +func (w *Writer) TempPath() string { + if w == nil { + return "" + } + return w.tempPath +} + +// WriteRows appends a batch of records to the parquet file. +func (w *Writer) WriteRows(rows []Record) error { + if len(rows) == 0 { + return nil + } + + w.mu.Lock() + defer w.mu.Unlock() + + if w.state != writerStateOpen { + return errWriterClosed + } + + written, err := w.writer.Write(rows) + if err != nil { + return fmt.Errorf("write parquet rows: %w", err) + } + if written != len(rows) { + return fmt.Errorf("write parquet rows: wrote %d of %d rows", written, len(rows)) + } + return nil +} + +// Close finalizes the parquet footer and publishes the file atomically. +func (w *Writer) Close() error { + if w == nil { + return nil + } + + w.mu.Lock() + if w.state != writerStateOpen { + w.mu.Unlock() + return nil + } + file := w.file + writer := w.writer + tempPath := w.tempPath + finalPath := w.finalPath + w.state = writerStateClosed + w.mu.Unlock() + + if err := writer.Close(); err != nil { + closeErr := file.Close() + removeErr := os.Remove(tempPath) + return errors.Join(fmt.Errorf("close parquet writer: %w", err), closeErr, removeErr) + } + if err := file.Close(); err != nil { + removeErr := os.Remove(tempPath) + return errors.Join(fmt.Errorf("close parquet file: %w", err), removeErr) + } + if err := os.Rename(tempPath, finalPath); err != nil { + return fmt.Errorf("rename parquet file %q to %q: %w", tempPath, finalPath, err) + } + return nil +} + +// Abort discards the temporary parquet file. +func (w *Writer) Abort() error { + if w == nil { + return nil + } + + w.mu.Lock() + if w.state != writerStateOpen { + w.mu.Unlock() + return nil + } + file := w.file + tempPath := w.tempPath + w.state = writerStateAborted + w.mu.Unlock() + + closeErr := file.Close() + removeErr := os.Remove(tempPath) + if errors.Is(removeErr, os.ErrNotExist) { + removeErr = nil + } + return errors.Join(closeErr, removeErr) +} + +func normalizeWriterConfig(cfg WriterConfig) WriterConfig { + defaults := DefaultWriterConfig() + if cfg.MaxRowsPerRowGroup <= 0 { + cfg.MaxRowsPerRowGroup = defaults.MaxRowsPerRowGroup + } + if cfg.PageBufferSize <= 0 { + cfg.PageBufferSize = defaults.PageBufferSize + } + return cfg +} + +func normalizeOutputPaths(path string) (string, string, error) { + clean := filepath.Clean(strings.TrimSpace(path)) + if clean == "." || clean == "" { + return "", "", errors.New("parquet output path cannot be empty") + } + + lower := strings.ToLower(clean) + switch { + case strings.HasSuffix(lower, ".parquet.tmp"): + finalPath := strings.TrimSuffix(clean, ".tmp") + return finalPath, clean, nil + case strings.HasSuffix(lower, ".parquet"): + return clean, clean + ".tmp", nil + default: + finalPath := clean + ".parquet" + return finalPath, finalPath + ".tmp", nil + } +} diff --git a/internal/parquet/writer_test.go b/internal/parquet/writer_test.go new file mode 100644 index 0000000..0bd7d97 --- /dev/null +++ b/internal/parquet/writer_test.go @@ -0,0 +1,113 @@ +package parquet + +import ( + "io" + "os" + "path/filepath" + "reflect" + "testing" + + parquetgo "github.com/parquet-go/parquet-go" +) + +func TestWriterRoundTripAndFinalize(t *testing.T) { + dir := t.TempDir() + writer, err := NewWriter(filepath.Join(dir, "trace"), WriterConfig{}, FileMetadata{ + Hostname: "test-host", + StartedAtUnixNano: 1234, + Mode: "tui", + }) + if err != nil { + t.Fatalf("NewWriter() error = %v", err) + } + + rows := []Record{ + { + Seq: 1, + TimeNS: 10, + GapNS: 2, + LatencyNS: 5, + Comm: "cat", + PID: 11, + TID: 12, + Syscall: "read", + FD: 3, + Ret: 42, + Bytes: 42, + File: "/tmp/input", + IsError: false, + FilterEpoch: 7, + }, + { + Seq: 2, + TimeNS: 20, + GapNS: 3, + LatencyNS: 6, + Comm: "cp", + PID: 21, + TID: 22, + Syscall: "write", + FD: 4, + Ret: -1, + Bytes: 99, + File: "/tmp/output", + IsError: true, + FilterEpoch: 8, + }, + } + + if err := writer.WriteRows(rows); err != nil { + t.Fatalf("WriteRows() error = %v", err) + } + if _, err := os.Stat(writer.TempPath()); err != nil { + t.Fatalf("Stat(%q) error = %v, want temp file present", writer.TempPath(), err) + } + if _, err := os.Stat(writer.FinalPath()); !os.IsNotExist(err) { + t.Fatalf("Stat(%q) error = %v, want not-exist before Close", writer.FinalPath(), err) + } + + if err := writer.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + + if _, err := os.Stat(writer.TempPath()); !os.IsNotExist(err) { + t.Fatalf("Stat(%q) error = %v, want temp removed after Close", writer.TempPath(), err) + } + if _, err := os.Stat(writer.FinalPath()); err != nil { + t.Fatalf("Stat(%q) error = %v, want finalized parquet file", writer.FinalPath(), err) + } + + got := readAllRecords(t, writer.FinalPath()) + if !reflect.DeepEqual(got, rows) { + t.Fatalf("records mismatch\n got: %+v\nwant: %+v", got, rows) + } +} + +func readAllRecords(t *testing.T, path string) []Record { + t.Helper() + + f, err := os.Open(path) + if err != nil { + t.Fatalf("Open(%q) error = %v", path, err) + } + defer f.Close() + + reader := parquetgo.NewGenericReader[Record](f) + defer reader.Close() + + var rows []Record + buf := make([]Record, 4) + for { + n, err := reader.Read(buf) + if n > 0 { + rows = append(rows, buf[:n]...) + } + if err == nil { + continue + } + if err == io.EOF { + return rows + } + t.Fatalf("Read() error = %v", err) + } +} |
