diff options
| author | Paul Buetow <paul@buetow.org> | 2026-02-25 08:25:42 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-02-25 08:25:42 +0200 |
| commit | 8d4f799220632411784037783c9275964df98718 (patch) | |
| tree | ef6fee8e9d2a0dd32b7a6e3ed96d2eacb711a6bd | |
| parent | 615f305f308c849c1a7b6cadb93e5e520455e422 (diff) | |
Add thread-safe eventstream ring buffer
| -rw-r--r-- | internal/tui/eventstream/ringbuffer.go | 59 | ||||
| -rw-r--r-- | internal/tui/eventstream/ringbuffer_test.go | 104 |
2 files changed, 163 insertions, 0 deletions
diff --git a/internal/tui/eventstream/ringbuffer.go b/internal/tui/eventstream/ringbuffer.go new file mode 100644 index 0000000..a2ec1dc --- /dev/null +++ b/internal/tui/eventstream/ringbuffer.go @@ -0,0 +1,59 @@ +package eventstream + +import "sync" + +const ringBufferCapacity = 10000 + +type RingBuffer struct { + mu sync.RWMutex + buf []StreamEvent + start int + size int + totalPushed uint64 +} + +func NewRingBuffer() *RingBuffer { + return &RingBuffer{buf: make([]StreamEvent, ringBufferCapacity)} +} + +func (r *RingBuffer) Push(ev StreamEvent) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.size < ringBufferCapacity { + idx := (r.start + r.size) % ringBufferCapacity + r.buf[idx] = ev + r.size++ + } else { + r.buf[r.start] = ev + r.start = (r.start + 1) % ringBufferCapacity + } + r.totalPushed++ +} + +func (r *RingBuffer) Snapshot() []StreamEvent { + r.mu.RLock() + defer r.mu.RUnlock() + + if r.size == 0 { + return make([]StreamEvent, 0) + } + + out := make([]StreamEvent, r.size) + for i := 0; i < r.size; i++ { + out[i] = r.buf[(r.start+i)%ringBufferCapacity] + } + return out +} + +func (r *RingBuffer) Len() int { + r.mu.RLock() + defer r.mu.RUnlock() + return r.size +} + +func (r *RingBuffer) TotalPushed() uint64 { + r.mu.RLock() + defer r.mu.RUnlock() + return r.totalPushed +} diff --git a/internal/tui/eventstream/ringbuffer_test.go b/internal/tui/eventstream/ringbuffer_test.go new file mode 100644 index 0000000..c60e85a --- /dev/null +++ b/internal/tui/eventstream/ringbuffer_test.go @@ -0,0 +1,104 @@ +package eventstream + +import ( + "sync" + "testing" +) + +func TestRingBufferWrapsAtCapacity(t *testing.T) { + rb := NewRingBuffer() + + for i := 0; i < ringBufferCapacity+5; i++ { + rb.Push(StreamEvent{Seq: uint64(i)}) + } + + if got := rb.Len(); got != ringBufferCapacity { + t.Fatalf("Len() = %d, want %d", got, ringBufferCapacity) + } + if got := rb.TotalPushed(); got != uint64(ringBufferCapacity+5) { + t.Fatalf("TotalPushed() = %d, want %d", got, ringBufferCapacity+5) + } + + snap := rb.Snapshot() + if len(snap) != ringBufferCapacity { + t.Fatalf("len(Snapshot()) = %d, want %d", len(snap), ringBufferCapacity) + } + if snap[0].Seq != 5 { + t.Fatalf("first seq = %d, want 5", snap[0].Seq) + } + if snap[len(snap)-1].Seq != uint64(ringBufferCapacity+4) { + t.Fatalf("last seq = %d, want %d", snap[len(snap)-1].Seq, ringBufferCapacity+4) + } +} + +func TestRingBufferSnapshotOldestFirst(t *testing.T) { + rb := NewRingBuffer() + + rb.Push(StreamEvent{Seq: 10}) + rb.Push(StreamEvent{Seq: 11}) + rb.Push(StreamEvent{Seq: 12}) + + snap := rb.Snapshot() + if len(snap) != 3 { + t.Fatalf("len(Snapshot()) = %d, want 3", len(snap)) + } + if snap[0].Seq != 10 || snap[1].Seq != 11 || snap[2].Seq != 12 { + t.Fatalf("snapshot order = [%d,%d,%d], want [10,11,12]", snap[0].Seq, snap[1].Seq, snap[2].Seq) + } +} + +func TestRingBufferConcurrentPushAndSnapshot(t *testing.T) { + rb := NewRingBuffer() + const pushes = 20000 + + var pushWG sync.WaitGroup + pushWG.Add(1) + go func() { + defer pushWG.Done() + for i := 0; i < pushes; i++ { + rb.Push(StreamEvent{Seq: uint64(i)}) + } + }() + + done := make(chan struct{}) + var readWG sync.WaitGroup + for i := 0; i < 4; i++ { + readWG.Add(1) + go func() { + defer readWG.Done() + for { + select { + case <-done: + return + default: + _ = rb.Snapshot() + _ = rb.Len() + _ = rb.TotalPushed() + } + } + }() + } + + pushWG.Wait() + close(done) + readWG.Wait() + + if got := rb.TotalPushed(); got != pushes { + t.Fatalf("TotalPushed() = %d, want %d", got, pushes) + } + if got := rb.Len(); got > ringBufferCapacity { + t.Fatalf("Len() = %d, want <= %d", got, ringBufferCapacity) + } +} + +func TestRingBufferSnapshotEmpty(t *testing.T) { + rb := NewRingBuffer() + + snap := rb.Snapshot() + if snap == nil { + t.Fatalf("Snapshot() = nil, want empty slice") + } + if len(snap) != 0 { + t.Fatalf("len(Snapshot()) = %d, want 0", len(snap)) + } +} |
