diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 07:46:29 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 07:46:29 +0200 |
| commit | 973bc4be068d337ff9ab13c47d08485b1946d133 (patch) | |
| tree | 8491c159fa052d632ec2d8866eae05b5669a36db | |
| parent | b52fd1f297c178f17fe75f8fb03d5cbdd3ece71d (diff) | |
bench: add parquet recording benchmarks
| -rw-r--r-- | internal/bench_pipeline_test.go | 161 | ||||
| -rw-r--r-- | internal/parquet/bench_test.go | 68 | ||||
| -rw-r--r-- | internal/streamrow/bench_test.go | 34 |
3 files changed, 263 insertions, 0 deletions
diff --git a/internal/bench_pipeline_test.go b/internal/bench_pipeline_test.go index 80001f3..ad4edf1 100644 --- a/internal/bench_pipeline_test.go +++ b/internal/bench_pipeline_test.go @@ -3,15 +3,22 @@ package internal import ( "context" "fmt" + "path/filepath" + "sync" "testing" "ior/internal/benchutil" "ior/internal/event" + "ior/internal/flamegraph" + "ior/internal/parquet" + "ior/internal/statsengine" + "ior/internal/tui/eventstream" ) const ( benchPipelineBaseTid = 2000 benchPipelineSize = 10_000 + benchParquetSize = 2_000 ) func BenchmarkPipelineReadHeavy(b *testing.B) { @@ -48,6 +55,14 @@ func BenchmarkPipelineThreadScaling(b *testing.B) { } } +func BenchmarkPipelineHeadlessParquetCapture(b *testing.B) { + benchmarkPipelineHeadlessParquet(b, benchutil.DiverseAllTypes, benchParquetSize, 10) +} + +func BenchmarkPipelineTUIParquetRecording(b *testing.B) { + benchmarkPipelineTUIParquet(b, benchutil.DiverseAllTypes, benchParquetSize, 10) +} + func benchmarkPipelineMix(b *testing.B, mix benchutil.EventMix, events, numThreads int) { b.Helper() b.ReportAllocs() @@ -95,6 +110,152 @@ func benchmarkPipelineMix(b *testing.B, mix benchutil.EventMix, events, numThrea } } +func benchmarkPipelineHeadlessParquet(b *testing.B, mix benchutil.EventMix, events, numThreads int) { + b.Helper() + b.ReportAllocs() + + gen := benchutil.NewEventGenerator() + stream, err := mix.GenerateStream(gen, events, numThreads) + if err != nil { + b.Fatalf("generate stream: %v", err) + } + if len(stream) == 0 { + b.Fatal("generated empty benchmark stream") + } + + dir := b.TempDir() + var totalPairs int64 + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + + rawCh := make(chan []byte, len(stream)) + for _, raw := range stream { + rawCh <- raw + } + close(rawCh) + + el := mustNewEventLoop(b, eventLoopConfig{}) + preseedBenchComms(el, numThreads) + + recorder := parquet.NewRecorder(parquet.RecorderConfig{}) + path := filepath.Join(dir, fmt.Sprintf("headless-%d.parquet", i)) + if err := recorder.Start(path, parquet.StartOptions{ + Metadata: parquet.FileMetadata{Mode: "headless"}, + }); err != nil { + b.Fatalf("recorder.Start() error = %v", err) + } + ctx, cancel := context.WithCancel(context.Background()) + sink := newHeadlessParquetSink(recorder, cancel) + sink.configure(el) + + b.StartTimer() + el.run(ctx, rawCh) + b.StopTimer() + + cancel() + if err := recorder.Stop(); err != nil { + b.Fatalf("recorder.Stop() error = %v", err) + } + if err := sink.err(); err != nil { + b.Fatalf("sink.err() = %v", err) + } + totalPairs += int64(events) + } + + if b.N > 0 { + b.ReportMetric(float64(totalPairs)/float64(b.N), "pairs/op") + } +} + +func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, numThreads int) { + b.Helper() + b.ReportAllocs() + + gen := benchutil.NewEventGenerator() + stream, err := mix.GenerateStream(gen, events, numThreads) + if err != nil { + b.Fatalf("generate stream: %v", err) + } + if len(stream) == 0 { + b.Fatal("generated empty benchmark stream") + } + + dir := b.TempDir() + var totalPairs int64 + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + + rawCh := make(chan []byte, len(stream)) + for _, raw := range stream { + rawCh <- raw + } + close(rawCh) + + el := mustNewEventLoop(b, eventLoopConfig{}) + preseedBenchComms(el, numThreads) + + engine := statsengine.NewEngine(64) + streamBuf := eventstream.NewRingBuffer() + streamSeq := eventstream.NewSequencer(0) + liveTrie := flamegraph.NewLiveTrie([]string{"comm", "tracepoint", "path"}, "count") + streamEvents := make(chan eventstream.StreamEvent, events) + + var streamWG sync.WaitGroup + streamWG.Add(1) + go func() { + defer streamWG.Done() + for row := range streamEvents { + streamBuf.Push(row) + } + }() + + recorder := parquet.NewRecorder(parquet.RecorderConfig{}) + path := filepath.Join(dir, fmt.Sprintf("tui-%d.parquet", i)) + if err := recorder.Start(path, parquet.StartOptions{ + Metadata: parquet.FileMetadata{Mode: "tui"}, + }); err != nil { + b.Fatalf("recorder.Start() error = %v", err) + } + + var recordErr error + el.printCb = func(ep *event.Pair) { + row := eventstream.NewStreamEvent(streamSeq.Next(), ep) + engine.Ingest(ep) + streamEvents <- row + if recordErr == nil { + recordErr = recorder.Record(row, 0) + } + liveTrie.Ingest(ep) + ep.Recycle() + } + + b.StartTimer() + el.run(context.Background(), rawCh) + b.StopTimer() + + close(streamEvents) + streamWG.Wait() + if recordErr != nil { + b.Fatalf("recorder.Record() error = %v", recordErr) + } + if err := recorder.Stop(); err != nil { + b.Fatalf("recorder.Stop() error = %v", err) + } + totalPairs += int64(events) + _ = engine + _ = liveTrie + _ = streamBuf + } + + if b.N > 0 { + b.ReportMetric(float64(totalPairs)/float64(b.N), "pairs/op") + } +} + func preseedBenchComms(el *eventLoop, numThreads int) { threadCount := numThreads if threadCount < 1 { diff --git a/internal/parquet/bench_test.go b/internal/parquet/bench_test.go new file mode 100644 index 0000000..7083e27 --- /dev/null +++ b/internal/parquet/bench_test.go @@ -0,0 +1,68 @@ +package parquet + +import ( + "path/filepath" + "testing" +) + +var benchmarkRecordSink Record + +func BenchmarkWriterThroughput(b *testing.B) { + rows := benchmarkRecords(256) + dir := b.TempDir() + writer, err := NewWriter(filepath.Join(dir, "writer-throughput.parquet"), WriterConfig{}, FileMetadata{Mode: "bench"}) + if err != nil { + b.Fatalf("NewWriter() error = %v", err) + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := writer.WriteRows(rows); err != nil { + b.Fatalf("WriteRows() error = %v", err) + } + benchmarkRecordSink = rows[i%len(rows)] + } + + b.StopTimer() + if err := writer.Close(); err != nil { + b.Fatalf("Close() error = %v", err) + } +} + +func BenchmarkRecorderQueueHandoff(b *testing.B) { + row := testStreamRow(1, "read", false) + session := newRecordingSession(1) + recorder := &Recorder{ + active: session, + status: Status{Active: true}, + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + row.Seq = uint64(i + 1) + if err := recorder.Record(row, 0); err != nil { + b.Fatalf("Record() error = %v", err) + } + select { + case <-session.queue: + default: + b.Fatal("expected queued record request") + } + } + + b.StopTimer() + session.stop(nil) + recorder.finishSession(session, nil) +} + +func benchmarkRecords(n int) []Record { + rows := make([]Record, 0, n) + for i := 0; i < n; i++ { + row := testStreamRow(uint64(i+1), "read", i%7 == 0) + rows = append(rows, RecordFromStream(row, uint64(i%4))) + } + return rows +} diff --git a/internal/streamrow/bench_test.go b/internal/streamrow/bench_test.go new file mode 100644 index 0000000..ea75dd9 --- /dev/null +++ b/internal/streamrow/bench_test.go @@ -0,0 +1,34 @@ +package streamrow + +import ( + "testing" + + "ior/internal/event" + "ior/internal/file" + "ior/internal/types" +) + +var benchmarkRowSink Row + +func BenchmarkNew(b *testing.B) { + pair := benchmarkPair() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + benchmarkRowSink = New(uint64(i+1), pair) + } +} + +func benchmarkPair() *event.Pair { + enter := &types.OpenEvent{TraceId: types.SYS_ENTER_OPENAT, Time: 1234, Pid: 42, Tid: 84} + exit := &types.RetEvent{TraceId: types.SYS_EXIT_OPENAT, Time: 1300, Ret: 64, Pid: 42, Tid: 84} + pair := event.NewPair(enter) + pair.ExitEv = exit + pair.File = file.NewFd(7, "/tmp/bench.txt", 0) + pair.Comm = "bench" + pair.Duration = 66 + pair.DurationToPrev = 19 + pair.Bytes = 512 + return pair +} |
