From 8a4cb57703845c1d8ffbc9318a4125818a72a545 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 12 May 2026 22:12:32 +0300 Subject: invert dependency: internal no longer imports internal/tui Introduce internal/runtime as a neutral contract package that both the core engine (internal) and the TUI layer (internal/tui) depend on. - internal/runtime: defines TraceStarter, StreamSource, EventSink, LiveTrieSource, SnapshotSource, ProbeManager, RuntimePublisher, RuntimeState, TraceRuntimeBindings, and all context key/helper functions (RuntimeBindingsFromContext, RuntimePublisherFromContext, ContextWithRuntimeBindings, ContextWithTraceFilters, TraceFiltersFromContext). These were previously defined in internal/tui, forcing the core package to import the TUI layer. - internal/streamrow: add RingBuffer (moved from internal/tui/eventstream) so the core tracing engine can use the ring buffer without importing the TUI layer. - internal/tui/eventstream/ringbuffer.go: change to a thin re-export of streamrow.RingBuffer via a type alias, preserving the existing API for all callers within the TUI layer. - internal/tui/tui.go: replace locally-defined interface declarations (TraceStarter, SnapshotSource, ProbeManager, RuntimePublisher, RuntimeState, TraceRuntimeBindings) with type aliases from internal/runtime. Delegate all context helper functions to runtime. - internal/ior.go, internal/ior_bpfsetup.go: remove imports of internal/tui and internal/tui/eventstream; use internal/runtime and internal/streamrow instead. Add SetTUIRunners injection point so the concrete TUI runner functions are wired in by cmd/ior/main.go. - cmd/ior/main.go: call internal.SetTUIRunners with the concrete TUI runner functions before internal.Run, completing the wiring without creating a cycle. - Test files (internal/ior_mode_test.go, internal/bench_pipeline_test.go): updated to use runtime.* and streamrow.* types in place of tui.* and eventstream.* types. Co-Authored-By: Claude Sonnet 4.6 --- internal/streamrow/ringbuffer.go | 79 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 internal/streamrow/ringbuffer.go (limited to 'internal/streamrow/ringbuffer.go') diff --git a/internal/streamrow/ringbuffer.go b/internal/streamrow/ringbuffer.go new file mode 100644 index 0000000..7820502 --- /dev/null +++ b/internal/streamrow/ringbuffer.go @@ -0,0 +1,79 @@ +package streamrow + +import "sync" + +const RingBufferCapacity = 10000 + +// RingBuffer is a fixed-capacity circular buffer of stream rows used by the +// tracing engine (write side) and the TUI stream view (read side). It +// satisfies the runtime.EventSink interface (Push + Len + Snapshot). +type RingBuffer struct { + mu sync.RWMutex + buf []Row + start int + size int + totalPushed uint64 +} + +// NewRingBuffer allocates an empty RingBuffer with the default capacity. +func NewRingBuffer() *RingBuffer { + return &RingBuffer{buf: make([]Row, RingBufferCapacity)} +} + +// Push appends a row to the ring buffer, overwriting the oldest entry when full. +func (r *RingBuffer) Push(ev Row) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.size < RingBufferCapacity { + idx := (r.start + r.size) % RingBufferCapacity + r.buf[idx] = ev + r.size++ + } else { + r.buf[r.start] = ev + r.start = (r.start + 1) % RingBufferCapacity + } + r.totalPushed++ +} + +// Snapshot returns a copy of all rows in insertion order. +func (r *RingBuffer) Snapshot() []Row { + r.mu.RLock() + defer r.mu.RUnlock() + + if r.size == 0 { + return make([]Row, 0) + } + + out := make([]Row, r.size) + for i := 0; i < r.size; i++ { + out[i] = r.buf[(r.start+i)%RingBufferCapacity] + } + return out +} + +// Len returns the current number of rows in the buffer. +func (r *RingBuffer) Len() int { + r.mu.RLock() + defer r.mu.RUnlock() + return r.size +} + +// TotalPushed returns the total number of rows pushed since construction or +// the last Reset, including those that have been overwritten. +func (r *RingBuffer) TotalPushed() uint64 { + r.mu.RLock() + defer r.mu.RUnlock() + return r.totalPushed +} + +// Reset clears all rows and resets the total-pushed counter. +func (r *RingBuffer) Reset() { + r.mu.Lock() + defer r.mu.Unlock() + + clear(r.buf) + r.start = 0 + r.size = 0 + r.totalPushed = 0 +} -- cgit v1.2.3