summaryrefslogtreecommitdiff
path: root/internal/tui/eventstream/ringbuffer_test.go
blob: c60e85a46a21e698a5624f174c6e0c121be9f767 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package eventstream

import (
	"sync"
	"testing"
)

func TestRingBufferWrapsAtCapacity(t *testing.T) {
	rb := NewRingBuffer()

	for i := 0; i < ringBufferCapacity+5; i++ {
		rb.Push(StreamEvent{Seq: uint64(i)})
	}

	if got := rb.Len(); got != ringBufferCapacity {
		t.Fatalf("Len() = %d, want %d", got, ringBufferCapacity)
	}
	if got := rb.TotalPushed(); got != uint64(ringBufferCapacity+5) {
		t.Fatalf("TotalPushed() = %d, want %d", got, ringBufferCapacity+5)
	}

	snap := rb.Snapshot()
	if len(snap) != ringBufferCapacity {
		t.Fatalf("len(Snapshot()) = %d, want %d", len(snap), ringBufferCapacity)
	}
	if snap[0].Seq != 5 {
		t.Fatalf("first seq = %d, want 5", snap[0].Seq)
	}
	if snap[len(snap)-1].Seq != uint64(ringBufferCapacity+4) {
		t.Fatalf("last seq = %d, want %d", snap[len(snap)-1].Seq, ringBufferCapacity+4)
	}
}

func TestRingBufferSnapshotOldestFirst(t *testing.T) {
	rb := NewRingBuffer()

	rb.Push(StreamEvent{Seq: 10})
	rb.Push(StreamEvent{Seq: 11})
	rb.Push(StreamEvent{Seq: 12})

	snap := rb.Snapshot()
	if len(snap) != 3 {
		t.Fatalf("len(Snapshot()) = %d, want 3", len(snap))
	}
	if snap[0].Seq != 10 || snap[1].Seq != 11 || snap[2].Seq != 12 {
		t.Fatalf("snapshot order = [%d,%d,%d], want [10,11,12]", snap[0].Seq, snap[1].Seq, snap[2].Seq)
	}
}

func TestRingBufferConcurrentPushAndSnapshot(t *testing.T) {
	rb := NewRingBuffer()
	const pushes = 20000

	var pushWG sync.WaitGroup
	pushWG.Add(1)
	go func() {
		defer pushWG.Done()
		for i := 0; i < pushes; i++ {
			rb.Push(StreamEvent{Seq: uint64(i)})
		}
	}()

	done := make(chan struct{})
	var readWG sync.WaitGroup
	for i := 0; i < 4; i++ {
		readWG.Add(1)
		go func() {
			defer readWG.Done()
			for {
				select {
				case <-done:
					return
				default:
					_ = rb.Snapshot()
					_ = rb.Len()
					_ = rb.TotalPushed()
				}
			}
		}()
	}

	pushWG.Wait()
	close(done)
	readWG.Wait()

	if got := rb.TotalPushed(); got != pushes {
		t.Fatalf("TotalPushed() = %d, want %d", got, pushes)
	}
	if got := rb.Len(); got > ringBufferCapacity {
		t.Fatalf("Len() = %d, want <= %d", got, ringBufferCapacity)
	}
}

func TestRingBufferSnapshotEmpty(t *testing.T) {
	rb := NewRingBuffer()

	snap := rb.Snapshot()
	if snap == nil {
		t.Fatalf("Snapshot() = nil, want empty slice")
	}
	if len(snap) != 0 {
		t.Fatalf("len(Snapshot()) = %d, want 0", len(snap))
	}
}