summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-03 10:03:22 +0200
committerPaul Buetow <paul@buetow.org>2026-03-03 10:03:22 +0200
commitf2e589aacb25607215b03b21c5d8e0fffd6561e2 (patch)
tree57713a7f105e0390f414d36b400281518c49fccd
parent6d37606c75de3a993c167af33f12def0e2e4549c (diff)
Add full pipeline benchmarks
-rw-r--r--internal/bench_pipeline_test.go103
1 files changed, 103 insertions, 0 deletions
diff --git a/internal/bench_pipeline_test.go b/internal/bench_pipeline_test.go
new file mode 100644
index 0000000..822e5a2
--- /dev/null
+++ b/internal/bench_pipeline_test.go
@@ -0,0 +1,103 @@
+package internal
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "ior/internal/benchutil"
+ "ior/internal/event"
+)
+
+const (
+ benchPipelineBaseTid = 2000
+ benchPipelineSize = 10_000
+)
+
+func BenchmarkPipelineReadHeavy(b *testing.B) {
+ benchmarkPipelineMix(b, benchutil.ReadHeavy, benchPipelineSize, 10)
+}
+
+func BenchmarkPipelineWriteHeavy(b *testing.B) {
+ benchmarkPipelineMix(b, benchutil.WriteHeavy, benchPipelineSize, 10)
+}
+
+func BenchmarkPipelineMetadataHeavy(b *testing.B) {
+ benchmarkPipelineMix(b, benchutil.MetadataHeavy, benchPipelineSize, 10)
+}
+
+func BenchmarkPipelineDiverseAllTypes(b *testing.B) {
+ benchmarkPipelineMix(b, benchutil.DiverseAllTypes, benchPipelineSize, 10)
+}
+
+func BenchmarkPipelineScaling(b *testing.B) {
+ for _, events := range []int{100, 1000, 10000, 100000} {
+ events := events
+ b.Run(fmt.Sprintf("events_%d", events), func(b *testing.B) {
+ benchmarkPipelineMix(b, benchutil.DiverseAllTypes, events, 10)
+ })
+ }
+}
+
+func BenchmarkPipelineThreadScaling(b *testing.B) {
+ for _, threads := range []int{1, 10, 100, 1000} {
+ threads := threads
+ b.Run(fmt.Sprintf("threads_%d", threads), func(b *testing.B) {
+ benchmarkPipelineMix(b, benchutil.DiverseAllTypes, benchPipelineSize, threads)
+ })
+ }
+}
+
+func benchmarkPipelineMix(b *testing.B, mix benchutil.EventMix, events, numThreads int) {
+ b.Helper()
+ b.ReportAllocs()
+
+ gen := benchutil.NewEventGenerator()
+ stream := mix.GenerateStream(gen, events, numThreads)
+ if len(stream) == 0 {
+ b.Fatal("generated empty benchmark stream")
+ }
+
+ var totalPairs int64
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ b.StopTimer()
+
+ rawCh := make(chan []byte, len(stream))
+ for _, raw := range stream {
+ rawCh <- raw
+ }
+ close(rawCh)
+
+ var pairCount int64
+ el := newEventLoop(eventLoopConfig{})
+ preseedBenchComms(el, numThreads)
+ el.printCb = func(ep *event.Pair) {
+ pairCount++
+ ep.Recycle()
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ b.StartTimer()
+ el.run(ctx, rawCh)
+ b.StopTimer()
+
+ cancel()
+ totalPairs += pairCount
+ }
+
+ if b.N > 0 {
+ b.ReportMetric(float64(totalPairs)/float64(b.N), "pairs/op")
+ }
+}
+
+func preseedBenchComms(el *eventLoop, numThreads int) {
+ threadCount := numThreads
+ if threadCount < 1 {
+ threadCount = 1
+ }
+ for i := 0; i < threadCount; i++ {
+ el.setCachedComm(benchPipelineBaseTid+uint32(i), "bench")
+ }
+}