summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-13 07:46:29 +0200
committerPaul Buetow <paul@buetow.org>2026-03-13 07:46:29 +0200
commit973bc4be068d337ff9ab13c47d08485b1946d133 (patch)
tree8491c159fa052d632ec2d8866eae05b5669a36db
parentb52fd1f297c178f17fe75f8fb03d5cbdd3ece71d (diff)
bench: add parquet recording benchmarks
-rw-r--r--internal/bench_pipeline_test.go161
-rw-r--r--internal/parquet/bench_test.go68
-rw-r--r--internal/streamrow/bench_test.go34
3 files changed, 263 insertions, 0 deletions
diff --git a/internal/bench_pipeline_test.go b/internal/bench_pipeline_test.go
index 80001f3..ad4edf1 100644
--- a/internal/bench_pipeline_test.go
+++ b/internal/bench_pipeline_test.go
@@ -3,15 +3,22 @@ package internal
import (
"context"
"fmt"
+ "path/filepath"
+ "sync"
"testing"
"ior/internal/benchutil"
"ior/internal/event"
+ "ior/internal/flamegraph"
+ "ior/internal/parquet"
+ "ior/internal/statsengine"
+ "ior/internal/tui/eventstream"
)
const (
benchPipelineBaseTid = 2000
benchPipelineSize = 10_000
+ benchParquetSize = 2_000
)
func BenchmarkPipelineReadHeavy(b *testing.B) {
@@ -48,6 +55,14 @@ func BenchmarkPipelineThreadScaling(b *testing.B) {
}
}
+func BenchmarkPipelineHeadlessParquetCapture(b *testing.B) {
+ benchmarkPipelineHeadlessParquet(b, benchutil.DiverseAllTypes, benchParquetSize, 10)
+}
+
+func BenchmarkPipelineTUIParquetRecording(b *testing.B) {
+ benchmarkPipelineTUIParquet(b, benchutil.DiverseAllTypes, benchParquetSize, 10)
+}
+
func benchmarkPipelineMix(b *testing.B, mix benchutil.EventMix, events, numThreads int) {
b.Helper()
b.ReportAllocs()
@@ -95,6 +110,152 @@ func benchmarkPipelineMix(b *testing.B, mix benchutil.EventMix, events, numThrea
}
}
+func benchmarkPipelineHeadlessParquet(b *testing.B, mix benchutil.EventMix, events, numThreads int) {
+ b.Helper()
+ b.ReportAllocs()
+
+ gen := benchutil.NewEventGenerator()
+ stream, err := mix.GenerateStream(gen, events, numThreads)
+ if err != nil {
+ b.Fatalf("generate stream: %v", err)
+ }
+ if len(stream) == 0 {
+ b.Fatal("generated empty benchmark stream")
+ }
+
+ dir := b.TempDir()
+ var totalPairs int64
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ b.StopTimer()
+
+ rawCh := make(chan []byte, len(stream))
+ for _, raw := range stream {
+ rawCh <- raw
+ }
+ close(rawCh)
+
+ el := mustNewEventLoop(b, eventLoopConfig{})
+ preseedBenchComms(el, numThreads)
+
+ recorder := parquet.NewRecorder(parquet.RecorderConfig{})
+ path := filepath.Join(dir, fmt.Sprintf("headless-%d.parquet", i))
+ if err := recorder.Start(path, parquet.StartOptions{
+ Metadata: parquet.FileMetadata{Mode: "headless"},
+ }); err != nil {
+ b.Fatalf("recorder.Start() error = %v", err)
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ sink := newHeadlessParquetSink(recorder, cancel)
+ sink.configure(el)
+
+ b.StartTimer()
+ el.run(ctx, rawCh)
+ b.StopTimer()
+
+ cancel()
+ if err := recorder.Stop(); err != nil {
+ b.Fatalf("recorder.Stop() error = %v", err)
+ }
+ if err := sink.err(); err != nil {
+ b.Fatalf("sink.err() = %v", err)
+ }
+ totalPairs += int64(events)
+ }
+
+ if b.N > 0 {
+ b.ReportMetric(float64(totalPairs)/float64(b.N), "pairs/op")
+ }
+}
+
+func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, numThreads int) {
+ b.Helper()
+ b.ReportAllocs()
+
+ gen := benchutil.NewEventGenerator()
+ stream, err := mix.GenerateStream(gen, events, numThreads)
+ if err != nil {
+ b.Fatalf("generate stream: %v", err)
+ }
+ if len(stream) == 0 {
+ b.Fatal("generated empty benchmark stream")
+ }
+
+ dir := b.TempDir()
+ var totalPairs int64
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ b.StopTimer()
+
+ rawCh := make(chan []byte, len(stream))
+ for _, raw := range stream {
+ rawCh <- raw
+ }
+ close(rawCh)
+
+ el := mustNewEventLoop(b, eventLoopConfig{})
+ preseedBenchComms(el, numThreads)
+
+ engine := statsengine.NewEngine(64)
+ streamBuf := eventstream.NewRingBuffer()
+ streamSeq := eventstream.NewSequencer(0)
+ liveTrie := flamegraph.NewLiveTrie([]string{"comm", "tracepoint", "path"}, "count")
+ streamEvents := make(chan eventstream.StreamEvent, events)
+
+ var streamWG sync.WaitGroup
+ streamWG.Add(1)
+ go func() {
+ defer streamWG.Done()
+ for row := range streamEvents {
+ streamBuf.Push(row)
+ }
+ }()
+
+ recorder := parquet.NewRecorder(parquet.RecorderConfig{})
+ path := filepath.Join(dir, fmt.Sprintf("tui-%d.parquet", i))
+ if err := recorder.Start(path, parquet.StartOptions{
+ Metadata: parquet.FileMetadata{Mode: "tui"},
+ }); err != nil {
+ b.Fatalf("recorder.Start() error = %v", err)
+ }
+
+ var recordErr error
+ el.printCb = func(ep *event.Pair) {
+ row := eventstream.NewStreamEvent(streamSeq.Next(), ep)
+ engine.Ingest(ep)
+ streamEvents <- row
+ if recordErr == nil {
+ recordErr = recorder.Record(row, 0)
+ }
+ liveTrie.Ingest(ep)
+ ep.Recycle()
+ }
+
+ b.StartTimer()
+ el.run(context.Background(), rawCh)
+ b.StopTimer()
+
+ close(streamEvents)
+ streamWG.Wait()
+ if recordErr != nil {
+ b.Fatalf("recorder.Record() error = %v", recordErr)
+ }
+ if err := recorder.Stop(); err != nil {
+ b.Fatalf("recorder.Stop() error = %v", err)
+ }
+ totalPairs += int64(events)
+ _ = engine
+ _ = liveTrie
+ _ = streamBuf
+ }
+
+ if b.N > 0 {
+ b.ReportMetric(float64(totalPairs)/float64(b.N), "pairs/op")
+ }
+}
+
func preseedBenchComms(el *eventLoop, numThreads int) {
threadCount := numThreads
if threadCount < 1 {
diff --git a/internal/parquet/bench_test.go b/internal/parquet/bench_test.go
new file mode 100644
index 0000000..7083e27
--- /dev/null
+++ b/internal/parquet/bench_test.go
@@ -0,0 +1,68 @@
+package parquet
+
+import (
+ "path/filepath"
+ "testing"
+)
+
+var benchmarkRecordSink Record
+
+func BenchmarkWriterThroughput(b *testing.B) {
+ rows := benchmarkRecords(256)
+ dir := b.TempDir()
+ writer, err := NewWriter(filepath.Join(dir, "writer-throughput.parquet"), WriterConfig{}, FileMetadata{Mode: "bench"})
+ if err != nil {
+ b.Fatalf("NewWriter() error = %v", err)
+ }
+
+ b.ReportAllocs()
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ if err := writer.WriteRows(rows); err != nil {
+ b.Fatalf("WriteRows() error = %v", err)
+ }
+ benchmarkRecordSink = rows[i%len(rows)]
+ }
+
+ b.StopTimer()
+ if err := writer.Close(); err != nil {
+ b.Fatalf("Close() error = %v", err)
+ }
+}
+
+func BenchmarkRecorderQueueHandoff(b *testing.B) {
+ row := testStreamRow(1, "read", false)
+ session := newRecordingSession(1)
+ recorder := &Recorder{
+ active: session,
+ status: Status{Active: true},
+ }
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ row.Seq = uint64(i + 1)
+ if err := recorder.Record(row, 0); err != nil {
+ b.Fatalf("Record() error = %v", err)
+ }
+ select {
+ case <-session.queue:
+ default:
+ b.Fatal("expected queued record request")
+ }
+ }
+
+ b.StopTimer()
+ session.stop(nil)
+ recorder.finishSession(session, nil)
+}
+
+func benchmarkRecords(n int) []Record {
+ rows := make([]Record, 0, n)
+ for i := 0; i < n; i++ {
+ row := testStreamRow(uint64(i+1), "read", i%7 == 0)
+ rows = append(rows, RecordFromStream(row, uint64(i%4)))
+ }
+ return rows
+}
diff --git a/internal/streamrow/bench_test.go b/internal/streamrow/bench_test.go
new file mode 100644
index 0000000..ea75dd9
--- /dev/null
+++ b/internal/streamrow/bench_test.go
@@ -0,0 +1,34 @@
+package streamrow
+
+import (
+ "testing"
+
+ "ior/internal/event"
+ "ior/internal/file"
+ "ior/internal/types"
+)
+
+var benchmarkRowSink Row
+
+func BenchmarkNew(b *testing.B) {
+ pair := benchmarkPair()
+
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ benchmarkRowSink = New(uint64(i+1), pair)
+ }
+}
+
+func benchmarkPair() *event.Pair {
+ enter := &types.OpenEvent{TraceId: types.SYS_ENTER_OPENAT, Time: 1234, Pid: 42, Tid: 84}
+ exit := &types.RetEvent{TraceId: types.SYS_EXIT_OPENAT, Time: 1300, Ret: 64, Pid: 42, Tid: 84}
+ pair := event.NewPair(enter)
+ pair.ExitEv = exit
+ pair.File = file.NewFd(7, "/tmp/bench.txt", 0)
+ pair.Comm = "bench"
+ pair.Duration = 66
+ pair.DurationToPrev = 19
+ pair.Bytes = 512
+ return pair
+}