summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-02-25 08:25:42 +0200
committerPaul Buetow <paul@buetow.org>2026-02-25 08:25:42 +0200
commit8d4f799220632411784037783c9275964df98718 (patch)
treeef6fee8e9d2a0dd32b7a6e3ed96d2eacb711a6bd /internal
parent615f305f308c849c1a7b6cadb93e5e520455e422 (diff)
Add thread-safe eventstream ring buffer
Diffstat (limited to 'internal')
-rw-r--r--internal/tui/eventstream/ringbuffer.go59
-rw-r--r--internal/tui/eventstream/ringbuffer_test.go104
2 files changed, 163 insertions, 0 deletions
diff --git a/internal/tui/eventstream/ringbuffer.go b/internal/tui/eventstream/ringbuffer.go
new file mode 100644
index 0000000..a2ec1dc
--- /dev/null
+++ b/internal/tui/eventstream/ringbuffer.go
@@ -0,0 +1,59 @@
+package eventstream
+
+import "sync"
+
+const ringBufferCapacity = 10000
+
+type RingBuffer struct {
+ mu sync.RWMutex
+ buf []StreamEvent
+ start int
+ size int
+ totalPushed uint64
+}
+
+func NewRingBuffer() *RingBuffer {
+ return &RingBuffer{buf: make([]StreamEvent, ringBufferCapacity)}
+}
+
+func (r *RingBuffer) Push(ev StreamEvent) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ if r.size < ringBufferCapacity {
+ idx := (r.start + r.size) % ringBufferCapacity
+ r.buf[idx] = ev
+ r.size++
+ } else {
+ r.buf[r.start] = ev
+ r.start = (r.start + 1) % ringBufferCapacity
+ }
+ r.totalPushed++
+}
+
+func (r *RingBuffer) Snapshot() []StreamEvent {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ if r.size == 0 {
+ return make([]StreamEvent, 0)
+ }
+
+ out := make([]StreamEvent, r.size)
+ for i := 0; i < r.size; i++ {
+ out[i] = r.buf[(r.start+i)%ringBufferCapacity]
+ }
+ return out
+}
+
+func (r *RingBuffer) Len() int {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return r.size
+}
+
+func (r *RingBuffer) TotalPushed() uint64 {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ return r.totalPushed
+}
diff --git a/internal/tui/eventstream/ringbuffer_test.go b/internal/tui/eventstream/ringbuffer_test.go
new file mode 100644
index 0000000..c60e85a
--- /dev/null
+++ b/internal/tui/eventstream/ringbuffer_test.go
@@ -0,0 +1,104 @@
+package eventstream
+
+import (
+ "sync"
+ "testing"
+)
+
+func TestRingBufferWrapsAtCapacity(t *testing.T) {
+ rb := NewRingBuffer()
+
+ for i := 0; i < ringBufferCapacity+5; i++ {
+ rb.Push(StreamEvent{Seq: uint64(i)})
+ }
+
+ if got := rb.Len(); got != ringBufferCapacity {
+ t.Fatalf("Len() = %d, want %d", got, ringBufferCapacity)
+ }
+ if got := rb.TotalPushed(); got != uint64(ringBufferCapacity+5) {
+ t.Fatalf("TotalPushed() = %d, want %d", got, ringBufferCapacity+5)
+ }
+
+ snap := rb.Snapshot()
+ if len(snap) != ringBufferCapacity {
+ t.Fatalf("len(Snapshot()) = %d, want %d", len(snap), ringBufferCapacity)
+ }
+ if snap[0].Seq != 5 {
+ t.Fatalf("first seq = %d, want 5", snap[0].Seq)
+ }
+ if snap[len(snap)-1].Seq != uint64(ringBufferCapacity+4) {
+ t.Fatalf("last seq = %d, want %d", snap[len(snap)-1].Seq, ringBufferCapacity+4)
+ }
+}
+
+func TestRingBufferSnapshotOldestFirst(t *testing.T) {
+ rb := NewRingBuffer()
+
+ rb.Push(StreamEvent{Seq: 10})
+ rb.Push(StreamEvent{Seq: 11})
+ rb.Push(StreamEvent{Seq: 12})
+
+ snap := rb.Snapshot()
+ if len(snap) != 3 {
+ t.Fatalf("len(Snapshot()) = %d, want 3", len(snap))
+ }
+ if snap[0].Seq != 10 || snap[1].Seq != 11 || snap[2].Seq != 12 {
+ t.Fatalf("snapshot order = [%d,%d,%d], want [10,11,12]", snap[0].Seq, snap[1].Seq, snap[2].Seq)
+ }
+}
+
+func TestRingBufferConcurrentPushAndSnapshot(t *testing.T) {
+ rb := NewRingBuffer()
+ const pushes = 20000
+
+ var pushWG sync.WaitGroup
+ pushWG.Add(1)
+ go func() {
+ defer pushWG.Done()
+ for i := 0; i < pushes; i++ {
+ rb.Push(StreamEvent{Seq: uint64(i)})
+ }
+ }()
+
+ done := make(chan struct{})
+ var readWG sync.WaitGroup
+ for i := 0; i < 4; i++ {
+ readWG.Add(1)
+ go func() {
+ defer readWG.Done()
+ for {
+ select {
+ case <-done:
+ return
+ default:
+ _ = rb.Snapshot()
+ _ = rb.Len()
+ _ = rb.TotalPushed()
+ }
+ }
+ }()
+ }
+
+ pushWG.Wait()
+ close(done)
+ readWG.Wait()
+
+ if got := rb.TotalPushed(); got != pushes {
+ t.Fatalf("TotalPushed() = %d, want %d", got, pushes)
+ }
+ if got := rb.Len(); got > ringBufferCapacity {
+ t.Fatalf("Len() = %d, want <= %d", got, ringBufferCapacity)
+ }
+}
+
+func TestRingBufferSnapshotEmpty(t *testing.T) {
+ rb := NewRingBuffer()
+
+ snap := rb.Snapshot()
+ if snap == nil {
+ t.Fatalf("Snapshot() = nil, want empty slice")
+ }
+ if len(snap) != 0 {
+ t.Fatalf("len(Snapshot()) = %d, want 0", len(snap))
+ }
+}