summaryrefslogtreecommitdiff
path: root/internal/flamegraph
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-02-27 18:11:19 +0200
committerPaul Buetow <paul@buetow.org>2026-02-27 18:11:19 +0200
commit592a645a578d68af3f0f1daae69ee8d91b5ced00 (patch)
tree0c0701ff8a2baef009813fd2b28673d94f2b2ef7 /internal/flamegraph
parent812267c2d82d887dc143ec10abda6d4b8474f757 (diff)
flamegraph: add opt-in live trie stress QA test
Diffstat (limited to 'internal/flamegraph')
-rw-r--r--internal/flamegraph/livetrie_test.go85
1 files changed, 85 insertions, 0 deletions
diff --git a/internal/flamegraph/livetrie_test.go b/internal/flamegraph/livetrie_test.go
index 6982a86..5d32209 100644
--- a/internal/flamegraph/livetrie_test.go
+++ b/internal/flamegraph/livetrie_test.go
@@ -3,11 +3,16 @@ package flamegraph
import (
"bytes"
"encoding/json"
+ "fmt"
"ior/internal/event"
"ior/internal/file"
"ior/internal/types"
+ "os"
+ "runtime"
"sync"
+ "sync/atomic"
"testing"
+ "time"
)
func TestLiveTrieIngestAndSnapshotRoundTrip(t *testing.T) {
@@ -117,6 +122,86 @@ func TestLiveTrieConcurrentIngestAndSnapshot(t *testing.T) {
}
}
+func TestLiveTrieStressHighRateConcurrentSnapshot(t *testing.T) {
+ if os.Getenv("IOR_STRESS_TEST") != "1" {
+ t.Skip("set IOR_STRESS_TEST=1 to run stress test")
+ }
+
+ const (
+ totalEvents = 50000
+ snapshotReaders = 3
+ maxMemGrowth = 512 << 20
+ )
+
+ lt := NewLiveTrie([]string{"path", "pid"}, "count")
+ var startMem runtime.MemStats
+ runtime.ReadMemStats(&startMem)
+
+ done := make(chan struct{})
+ var snapshots atomic.Int64
+ errCh := make(chan error, snapshotReaders)
+ var readersWG sync.WaitGroup
+ readersWG.Add(snapshotReaders)
+ for i := 0; i < snapshotReaders; i++ {
+ go func() {
+ defer readersWG.Done()
+ ticker := time.NewTicker(2 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-done:
+ return
+ case <-ticker.C:
+ payload, _ := lt.SnapshotJSON()
+ var snap trieSnapshot
+ if err := json.Unmarshal(payload, &snap); err != nil {
+ errCh <- fmt.Errorf("snapshot json invalid: %w", err)
+ return
+ }
+ snapshots.Add(1)
+ }
+ }
+ }()
+ }
+
+ start := time.Now()
+ for i := 0; i < totalEvents; i++ {
+ pid := uint32(1000 + (i % 32))
+ tid := uint32(100000 + i)
+ path := fmt.Sprintf("/stress/%05d", i)
+ lt.Ingest(newTestPair("qa-load", pid, tid, path, 1, 1, 1))
+ }
+ ingestDuration := time.Since(start)
+
+ close(done)
+ readersWG.Wait()
+ close(errCh)
+ for err := range errCh {
+ t.Fatal(err)
+ }
+
+ if got := lt.Version(); got != totalEvents {
+ t.Fatalf("version = %d, want %d", got, totalEvents)
+ }
+ if snapshots.Load() == 0 {
+ t.Fatalf("expected at least one concurrent snapshot read")
+ }
+
+ final := decodeLiveSnapshot(t, lt)
+ if final.Total != totalEvents {
+ t.Fatalf("final total = %d, want %d", final.Total, totalEvents)
+ }
+
+ var endMem runtime.MemStats
+ runtime.ReadMemStats(&endMem)
+ if endMem.Alloc > startMem.Alloc+maxMemGrowth {
+ t.Fatalf("memory growth too high: start=%d end=%d limit=%d", startMem.Alloc, endMem.Alloc, startMem.Alloc+maxMemGrowth)
+ }
+
+ rate := float64(totalEvents) / ingestDuration.Seconds()
+ t.Logf("stress ingest rate: %.0f events/sec (%d events in %s)", rate, totalEvents, ingestDuration)
+}
+
func newTestPair(comm string, pid uint32, tid uint32, path string, duration uint64, gap uint64, bytesCnt uint64) *event.Pair {
enter := &types.OpenEvent{
TraceId: types.SYS_ENTER_OPENAT,