summaryrefslogtreecommitdiff
path: root/internal/streamrow/ringbuffer.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-12 22:12:32 +0300
committerPaul Buetow <paul@buetow.org>2026-05-12 22:12:32 +0300
commit8a4cb57703845c1d8ffbc9318a4125818a72a545 (patch)
tree5bd9dbc8f77b99de7dced2e867c36ccbf653e533 /internal/streamrow/ringbuffer.go
parenta256cbf9f54ab89aeae0aa9408c1c2b25622fa9d (diff)
invert dependency: internal no longer imports internal/tui
Introduce internal/runtime as a neutral contract package that both the core engine (internal) and the TUI layer (internal/tui) depend on. - internal/runtime: defines TraceStarter, StreamSource, EventSink, LiveTrieSource, SnapshotSource, ProbeManager, RuntimePublisher, RuntimeState, TraceRuntimeBindings, and all context key/helper functions (RuntimeBindingsFromContext, RuntimePublisherFromContext, ContextWithRuntimeBindings, ContextWithTraceFilters, TraceFiltersFromContext). These were previously defined in internal/tui, forcing the core package to import the TUI layer. - internal/streamrow: add RingBuffer (moved from internal/tui/eventstream) so the core tracing engine can use the ring buffer without importing the TUI layer. - internal/tui/eventstream/ringbuffer.go: change to a thin re-export of streamrow.RingBuffer via a type alias, preserving the existing API for all callers within the TUI layer. - internal/tui/tui.go: replace locally-defined interface declarations (TraceStarter, SnapshotSource, ProbeManager, RuntimePublisher, RuntimeState, TraceRuntimeBindings) with type aliases from internal/runtime. Delegate all context helper functions to runtime. - internal/ior.go, internal/ior_bpfsetup.go: remove imports of internal/tui and internal/tui/eventstream; use internal/runtime and internal/streamrow instead. Add SetTUIRunners injection point so the concrete TUI runner functions are wired in by cmd/ior/main.go. - cmd/ior/main.go: call internal.SetTUIRunners with the concrete TUI runner functions before internal.Run, completing the wiring without creating a cycle. - Test files (internal/ior_mode_test.go, internal/bench_pipeline_test.go): updated to use runtime.* and streamrow.* types in place of tui.* and eventstream.* types. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'internal/streamrow/ringbuffer.go')
-rw-r--r--internal/streamrow/ringbuffer.go79
1 files changed, 79 insertions, 0 deletions
diff --git a/internal/streamrow/ringbuffer.go b/internal/streamrow/ringbuffer.go
new file mode 100644
index 0000000..7820502
--- /dev/null
+++ b/internal/streamrow/ringbuffer.go
@@ -0,0 +1,79 @@
+package streamrow
+
+import "sync"
+
+const RingBufferCapacity = 10000
+
+// RingBuffer is a fixed-capacity circular buffer of stream rows used by the
+// tracing engine (write side) and the TUI stream view (read side). It
+// satisfies the runtime.EventSink interface (Push + Len + Snapshot).
+type RingBuffer struct {
+ mu sync.RWMutex
+ buf []Row
+ start int
+ size int
+ totalPushed uint64
+}
+
+// NewRingBuffer allocates an empty RingBuffer with the default capacity.
+func NewRingBuffer() *RingBuffer {
+ return &RingBuffer{buf: make([]Row, RingBufferCapacity)}
+}
+
+// Push appends a row to the ring buffer, overwriting the oldest entry when full.
+func (r *RingBuffer) Push(ev Row) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ if r.size < RingBufferCapacity {
+ idx := (r.start + r.size) % RingBufferCapacity
+ r.buf[idx] = ev
+ r.size++
+ } else {
+ r.buf[r.start] = ev
+ r.start = (r.start + 1) % RingBufferCapacity
+ }
+ r.totalPushed++
+}
+
+// Snapshot returns a copy of all rows in insertion order.
+func (r *RingBuffer) Snapshot() []Row {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ if r.size == 0 {
+ return make([]Row, 0)
+ }
+
+ out := make([]Row, r.size)
+ for i := 0; i < r.size; i++ {
+ out[i] = r.buf[(r.start+i)%RingBufferCapacity]
+ }
+ return out
+}
+
+// Len returns the current number of rows in the buffer.
+func (r *RingBuffer) Len() int {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return r.size
+}
+
+// TotalPushed returns the total number of rows pushed since construction or
+// the last Reset, including those that have been overwritten.
+func (r *RingBuffer) TotalPushed() uint64 {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return r.totalPushed
+}
+
+// Reset clears all rows and resets the total-pushed counter.
+func (r *RingBuffer) Reset() {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ clear(r.buf)
+ r.start = 0
+ r.size = 0
+ r.totalPushed = 0
+}