summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/parquet-recording-perf-baseline.md27
-rw-r--r--internal/bench_pipeline_test.go15
-rw-r--r--internal/ior.go17
3 files changed, 30 insertions, 29 deletions
diff --git a/docs/parquet-recording-perf-baseline.md b/docs/parquet-recording-perf-baseline.md
index 37d10e7..e1731a7 100644
--- a/docs/parquet-recording-perf-baseline.md
+++ b/docs/parquet-recording-perf-baseline.md
@@ -84,3 +84,30 @@ These are the highest-value targets for the follow-up optimization task:
- Lower TUI recording allocations by reusing stream fanout buffers and reducing ring-buffer/session setup churn.
- Revisit recorder/session and parquet writer setup costs if recordings are started frequently in short sessions.
- Only optimize parquet compression or flush behavior after confirming they dominate a focused headless profile; they are not currently the primary cost center.
+
+## Verified Follow-up Win
+
+After profiling, the first optimization pass removed the extra TUI `streamEvents` channel hop and pushed directly into the mutex-protected ring buffer.
+
+Re-run command:
+
+```bash
+env GOTOOLCHAIN=auto mage benchProf
+```
+
+Optimized pipeline artifacts:
+
+- `bench-profiles/pipeline-20260313-055321-cpu.prof`
+- `bench-profiles/pipeline-20260313-055321-mem.prof`
+- `bench-profiles/pipeline-20260313-055321-block.prof`
+
+Benchmark comparison for the changed path:
+
+| Benchmark | Before | After | Change |
+| --- | --- | --- | --- |
+| `BenchmarkPipelineTUIParquetRecording` | `19.13 ms/op`, `994016 B/op`, `19873 allocs/op` | `16.51 ms/op`, `992334 B/op`, `19866 allocs/op` | about `13.7%` faster with a small allocation reduction |
+
+Notes:
+
+- `BenchmarkPipelineHeadlessParquetCapture` also moved between runs, but that path was not changed; treat that difference as benchmark noise rather than a verified optimization win.
+- Post-change CPU samples still show the event loop and fd/path resolution dominating overall cost, so the next optimization pass should stay focused on those areas instead of tuning parquet compression first.
diff --git a/internal/bench_pipeline_test.go b/internal/bench_pipeline_test.go
index ad4edf1..b520c9c 100644
--- a/internal/bench_pipeline_test.go
+++ b/internal/bench_pipeline_test.go
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"path/filepath"
- "sync"
"testing"
"ior/internal/benchutil"
@@ -202,16 +201,6 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n
streamBuf := eventstream.NewRingBuffer()
streamSeq := eventstream.NewSequencer(0)
liveTrie := flamegraph.NewLiveTrie([]string{"comm", "tracepoint", "path"}, "count")
- streamEvents := make(chan eventstream.StreamEvent, events)
-
- var streamWG sync.WaitGroup
- streamWG.Add(1)
- go func() {
- defer streamWG.Done()
- for row := range streamEvents {
- streamBuf.Push(row)
- }
- }()
recorder := parquet.NewRecorder(parquet.RecorderConfig{})
path := filepath.Join(dir, fmt.Sprintf("tui-%d.parquet", i))
@@ -225,7 +214,7 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n
el.printCb = func(ep *event.Pair) {
row := eventstream.NewStreamEvent(streamSeq.Next(), ep)
engine.Ingest(ep)
- streamEvents <- row
+ streamBuf.Push(row)
if recordErr == nil {
recordErr = recorder.Record(row, 0)
}
@@ -237,8 +226,6 @@ func benchmarkPipelineTUIParquet(b *testing.B, mix benchutil.EventMix, events, n
el.run(context.Background(), rawCh)
b.StopTimer()
- close(streamEvents)
- streamWG.Wait()
if recordErr != nil {
b.Fatalf("recorder.Record() error = %v", recordErr)
}
diff --git a/internal/ior.go b/internal/ior.go
index 1010445..d88f0e3 100644
--- a/internal/ior.go
+++ b/internal/ior.go
@@ -244,14 +244,6 @@ func tuiTraceStarterFromRunTrace(
bindings.SetEventStreamSource(streamSource)
bindings.SetLiveTrie(liveTrie)
}
- streamEvents := make(chan eventstream.StreamEvent, appconfig.DefaultChannelBufferSize)
-
- go func() {
- for ev := range streamEvents {
- streamBuf.Push(ev)
- }
- }()
-
startedCh := make(chan struct{})
errCh := make(chan error, 1)
@@ -264,7 +256,7 @@ func tuiTraceStarterFromRunTrace(
}
row := eventstream.NewStreamEvent(streamSeq.Next(), ep)
engine.Ingest(ep)
- streamEvents <- row
+ streamBuf.Push(row)
if recorder != nil {
if err := recorder.Record(row, filterEpoch); err != nil {
recorderWarningOnce.Do(func() {
@@ -280,14 +272,9 @@ func tuiTraceStarterFromRunTrace(
ep.Recycle()
}
el.warningCb = func(message string) {
- // Drop warning notifications if the stream channel is saturated.
- select {
- case streamEvents <- eventstream.NewWarningEvent(streamSeq.Next(), message):
- default:
- }
+ streamBuf.Push(eventstream.NewWarningEvent(streamSeq.Next(), message))
}
})
- close(streamEvents)
errCh <- err
close(errCh)
}()