blob: 87dacaef7dc775c6a8967f81db8cfbd40e675ff7 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
package eventstream
import "sync"
const ringBufferCapacity = 10000
type RingBuffer struct {
mu sync.RWMutex
buf []StreamEvent
start int
size int
totalPushed uint64
}
func NewRingBuffer() *RingBuffer {
return &RingBuffer{buf: make([]StreamEvent, ringBufferCapacity)}
}
func (r *RingBuffer) Push(ev StreamEvent) {
r.mu.Lock()
defer r.mu.Unlock()
if r.size < ringBufferCapacity {
idx := (r.start + r.size) % ringBufferCapacity
r.buf[idx] = ev
r.size++
} else {
r.buf[r.start] = ev
r.start = (r.start + 1) % ringBufferCapacity
}
r.totalPushed++
}
func (r *RingBuffer) Snapshot() []StreamEvent {
r.mu.RLock()
defer r.mu.RUnlock()
if r.size == 0 {
return make([]StreamEvent, 0)
}
out := make([]StreamEvent, r.size)
for i := 0; i < r.size; i++ {
out[i] = r.buf[(r.start+i)%ringBufferCapacity]
}
return out
}
func (r *RingBuffer) Len() int {
r.mu.RLock()
defer r.mu.RUnlock()
return r.size
}
func (r *RingBuffer) TotalPushed() uint64 {
r.mu.RLock()
defer r.mu.RUnlock()
return r.totalPushed
}
func (r *RingBuffer) Reset() {
r.mu.Lock()
defer r.mu.Unlock()
clear(r.buf)
r.start = 0
r.size = 0
r.totalPushed = 0
}
|