blob: 7820502e60cb69ca52d628fb67f51e3aad9c62f1 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
package streamrow
import "sync"
const RingBufferCapacity = 10000
// RingBuffer is a fixed-capacity circular buffer of stream rows used by the
// tracing engine (write side) and the TUI stream view (read side). It
// satisfies the runtime.EventSink interface (Push + Len + Snapshot).
type RingBuffer struct {
mu sync.RWMutex
buf []Row
start int
size int
totalPushed uint64
}
// NewRingBuffer allocates an empty RingBuffer with the default capacity.
func NewRingBuffer() *RingBuffer {
return &RingBuffer{buf: make([]Row, RingBufferCapacity)}
}
// Push appends a row to the ring buffer, overwriting the oldest entry when full.
func (r *RingBuffer) Push(ev Row) {
r.mu.Lock()
defer r.mu.Unlock()
if r.size < RingBufferCapacity {
idx := (r.start + r.size) % RingBufferCapacity
r.buf[idx] = ev
r.size++
} else {
r.buf[r.start] = ev
r.start = (r.start + 1) % RingBufferCapacity
}
r.totalPushed++
}
// Snapshot returns a copy of all rows in insertion order.
func (r *RingBuffer) Snapshot() []Row {
r.mu.RLock()
defer r.mu.RUnlock()
if r.size == 0 {
return make([]Row, 0)
}
out := make([]Row, r.size)
for i := 0; i < r.size; i++ {
out[i] = r.buf[(r.start+i)%RingBufferCapacity]
}
return out
}
// Len returns the current number of rows in the buffer.
func (r *RingBuffer) Len() int {
r.mu.RLock()
defer r.mu.RUnlock()
return r.size
}
// TotalPushed returns the total number of rows pushed since construction or
// the last Reset, including those that have been overwritten.
func (r *RingBuffer) TotalPushed() uint64 {
r.mu.RLock()
defer r.mu.RUnlock()
return r.totalPushed
}
// Reset clears all rows and resets the total-pushed counter.
func (r *RingBuffer) Reset() {
r.mu.Lock()
defer r.mu.Unlock()
clear(r.buf)
r.start = 0
r.size = 0
r.totalPushed = 0
}
|