summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-02-27 18:17:44 +0200
committerPaul Buetow <paul@buetow.org>2026-02-27 18:17:44 +0200
commit126b5fcce718f8befccdceb21e3e0ae80cd7b32e (patch)
tree21357f6c4d8e8bc9bcbc25b5d35a9be116713682 /internal
parent592a645a578d68af3f0f1daae69ee8d91b5ced00 (diff)
flamegraph: add SSE edge-case QA tests
Diffstat (limited to 'internal')
-rw-r--r--internal/flamegraph/liveserver.go2
-rw-r--r--internal/flamegraph/liveserver_test.go198
2 files changed, 199 insertions, 1 deletions
diff --git a/internal/flamegraph/liveserver.go b/internal/flamegraph/liveserver.go
index 6b964a1..6b9a72b 100644
--- a/internal/flamegraph/liveserver.go
+++ b/internal/flamegraph/liveserver.go
@@ -71,7 +71,7 @@ func handleSSE(lt *LiveTrie, interval time.Duration) http.HandlerFunc {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
- lastVersion, err := sendSnapshot(w, flusher, lt, 0)
+ lastVersion, err := sendSnapshot(w, flusher, lt, ^uint64(0))
if err != nil {
return
}
diff --git a/internal/flamegraph/liveserver_test.go b/internal/flamegraph/liveserver_test.go
new file mode 100644
index 0000000..09472c5
--- /dev/null
+++ b/internal/flamegraph/liveserver_test.go
@@ -0,0 +1,198 @@
+package flamegraph
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestHandleSSEContentTypeFormatAndEmptyTrie(t *testing.T) {
+ lt := NewLiveTrie([]string{"comm"}, "count")
+ srv := httptest.NewServer(handleSSE(lt, 5*time.Millisecond))
+ defer srv.Close()
+
+ resp := connectSSE(t, srv.URL)
+ defer resp.Body.Close()
+
+ contentType := resp.Header.Get("Content-Type")
+ if !strings.HasPrefix(contentType, "text/event-stream") {
+ t.Fatalf("Content-Type = %q, want text/event-stream", contentType)
+ }
+
+ data := readFirstSSEData(t, resp.Body)
+ snap := decodeSSESnapshot(t, data)
+ if snap.Total != 0 {
+ t.Fatalf("empty trie snapshot total = %d, want 0", snap.Total)
+ }
+}
+
+func TestHandleSSEMultipleClientsReceiveInitialSnapshot(t *testing.T) {
+ lt := NewLiveTrie([]string{"comm"}, "count")
+ lt.Ingest(newTestPair("multi", 42, 1001, "/tmp/multi", 1, 1, 1))
+ srv := httptest.NewServer(handleSSE(lt, 5*time.Millisecond))
+ defer srv.Close()
+
+ const clients = 4
+ var wg sync.WaitGroup
+ errCh := make(chan error, clients)
+
+ wg.Add(clients)
+ for i := 0; i < clients; i++ {
+ go func() {
+ defer wg.Done()
+ resp := connectSSE(t, srv.URL)
+ defer resp.Body.Close()
+ data := readFirstSSEData(t, resp.Body)
+ snap := decodeSSESnapshot(t, data)
+ if snap.Total == 0 {
+ errCh <- fmt.Errorf("received empty snapshot")
+ }
+ }()
+ }
+
+ wg.Wait()
+ close(errCh)
+ for err := range errCh {
+ t.Fatal(err)
+ }
+}
+
+func TestHandleSSEReconnectAfterDisconnectGetsLatestSnapshot(t *testing.T) {
+ lt := NewLiveTrie([]string{"path"}, "count")
+ lt.Ingest(newTestPair("reconnect", 1, 1001, "/tmp/a", 1, 1, 1))
+ srv := httptest.NewServer(handleSSE(lt, 5*time.Millisecond))
+ defer srv.Close()
+
+ resp1 := connectSSE(t, srv.URL)
+ first := decodeSSESnapshot(t, readFirstSSEData(t, resp1.Body))
+ _ = resp1.Body.Close()
+ if first.Total != 1 {
+ t.Fatalf("first snapshot total = %d, want 1", first.Total)
+ }
+
+ lt.Ingest(newTestPair("reconnect", 1, 1002, "/tmp/b", 1, 1, 1))
+
+ resp2 := connectSSE(t, srv.URL)
+ defer resp2.Body.Close()
+ second := decodeSSESnapshot(t, readFirstSSEData(t, resp2.Body))
+ if second.Total != 2 {
+ t.Fatalf("reconnected snapshot total = %d, want 2", second.Total)
+ }
+}
+
+func TestHandleSSERestartedServerAcceptsNewConnection(t *testing.T) {
+ lt := NewLiveTrie([]string{"comm"}, "count")
+ lt.Ingest(newTestPair("restart", 1, 1001, "/tmp/a", 1, 1, 1))
+
+ srv1 := httptest.NewServer(handleSSE(lt, 5*time.Millisecond))
+ resp1 := connectSSE(t, srv1.URL)
+ first := decodeSSESnapshot(t, readFirstSSEData(t, resp1.Body))
+ _ = resp1.Body.Close()
+ srv1.Close()
+ if first.Total != 1 {
+ t.Fatalf("first server snapshot total = %d, want 1", first.Total)
+ }
+
+ lt.Ingest(newTestPair("restart", 1, 1002, "/tmp/b", 1, 1, 1))
+
+ srv2 := httptest.NewServer(handleSSE(lt, 5*time.Millisecond))
+ defer srv2.Close()
+ resp2 := connectSSE(t, srv2.URL)
+ defer resp2.Body.Close()
+ second := decodeSSESnapshot(t, readFirstSSEData(t, resp2.Body))
+ if second.Total != 2 {
+ t.Fatalf("second server snapshot total = %d, want 2", second.Total)
+ }
+}
+
+func TestHandleSSEDelayedClientLargeTrieGetsValidSnapshot(t *testing.T) {
+ lt := NewLiveTrie([]string{"path"}, "count")
+ const events = 12000
+ for i := 0; i < events; i++ {
+ lt.Ingest(newTestPair("late", 7, uint32(10000+i), fmt.Sprintf("/late/%05d", i), 1, 1, 1))
+ }
+
+ srv := httptest.NewServer(handleSSE(lt, 5*time.Millisecond))
+ defer srv.Close()
+
+ resp := connectSSE(t, srv.URL)
+ defer resp.Body.Close()
+ snap := decodeSSESnapshot(t, readFirstSSEData(t, resp.Body))
+ if snap.Total != events {
+ t.Fatalf("late client snapshot total = %d, want %d", snap.Total, events)
+ }
+}
+
+func connectSSE(t *testing.T, url string) *http.Response {
+ t.Helper()
+ client := &http.Client{Timeout: 5 * time.Second}
+ resp, err := client.Get(url)
+ if err != nil {
+ t.Fatalf("connect sse: %v", err)
+ }
+ if resp.StatusCode != http.StatusOK {
+ _ = resp.Body.Close()
+ t.Fatalf("unexpected status: %s", resp.Status)
+ }
+ return resp
+}
+
+func readFirstSSEData(t *testing.T, body io.ReadCloser) string {
+ t.Helper()
+ type result struct {
+ data string
+ err error
+ }
+ ch := make(chan result, 1)
+
+ go func() {
+ reader := bufio.NewReader(body)
+ line, err := reader.ReadString('\n')
+ if err != nil {
+ ch <- result{err: err}
+ return
+ }
+ if !strings.HasPrefix(line, "data: ") {
+ ch <- result{err: fmt.Errorf("invalid sse data line: %q", line)}
+ return
+ }
+ separator, err := reader.ReadString('\n')
+ if err != nil {
+ ch <- result{err: err}
+ return
+ }
+ if separator != "\n" {
+ ch <- result{err: fmt.Errorf("missing sse blank-line separator: %q", separator)}
+ return
+ }
+ ch <- result{data: strings.TrimSuffix(strings.TrimPrefix(line, "data: "), "\n")}
+ }()
+
+ select {
+ case out := <-ch:
+ if out.err != nil {
+ t.Fatalf("read sse event: %v", out.err)
+ }
+ return out.data
+ case <-time.After(3 * time.Second):
+ _ = body.Close()
+ t.Fatalf("timeout waiting for first sse event")
+ return ""
+ }
+}
+
+func decodeSSESnapshot(t *testing.T, data string) trieSnapshot {
+ t.Helper()
+ var snap trieSnapshot
+ if err := json.Unmarshal([]byte(data), &snap); err != nil {
+ t.Fatalf("invalid snapshot json: %v", err)
+ }
+ return snap
+}