From 592a645a578d68af3f0f1daae69ee8d91b5ced00 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Fri, 27 Feb 2026 18:11:19 +0200 Subject: flamegraph: add opt-in live trie stress QA test --- internal/flamegraph/livetrie_test.go | 85 ++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) (limited to 'internal') diff --git a/internal/flamegraph/livetrie_test.go b/internal/flamegraph/livetrie_test.go index 6982a86..5d32209 100644 --- a/internal/flamegraph/livetrie_test.go +++ b/internal/flamegraph/livetrie_test.go @@ -3,11 +3,16 @@ package flamegraph import ( "bytes" "encoding/json" + "fmt" "ior/internal/event" "ior/internal/file" "ior/internal/types" + "os" + "runtime" "sync" + "sync/atomic" "testing" + "time" ) func TestLiveTrieIngestAndSnapshotRoundTrip(t *testing.T) { @@ -117,6 +122,86 @@ func TestLiveTrieConcurrentIngestAndSnapshot(t *testing.T) { } } +func TestLiveTrieStressHighRateConcurrentSnapshot(t *testing.T) { + if os.Getenv("IOR_STRESS_TEST") != "1" { + t.Skip("set IOR_STRESS_TEST=1 to run stress test") + } + + const ( + totalEvents = 50000 + snapshotReaders = 3 + maxMemGrowth = 512 << 20 + ) + + lt := NewLiveTrie([]string{"path", "pid"}, "count") + var startMem runtime.MemStats + runtime.ReadMemStats(&startMem) + + done := make(chan struct{}) + var snapshots atomic.Int64 + errCh := make(chan error, snapshotReaders) + var readersWG sync.WaitGroup + readersWG.Add(snapshotReaders) + for i := 0; i < snapshotReaders; i++ { + go func() { + defer readersWG.Done() + ticker := time.NewTicker(2 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-done: + return + case <-ticker.C: + payload, _ := lt.SnapshotJSON() + var snap trieSnapshot + if err := json.Unmarshal(payload, &snap); err != nil { + errCh <- fmt.Errorf("snapshot json invalid: %w", err) + return + } + snapshots.Add(1) + } + } + }() + } + + start := time.Now() + for i := 0; i < totalEvents; i++ { + pid := uint32(1000 + (i % 32)) + tid := uint32(100000 + i) + path := fmt.Sprintf("/stress/%05d", i) + lt.Ingest(newTestPair("qa-load", pid, tid, path, 1, 1, 1)) + } + ingestDuration := time.Since(start) + + close(done) + readersWG.Wait() + close(errCh) + for err := range errCh { + t.Fatal(err) + } + + if got := lt.Version(); got != totalEvents { + t.Fatalf("version = %d, want %d", got, totalEvents) + } + if snapshots.Load() == 0 { + t.Fatalf("expected at least one concurrent snapshot read") + } + + final := decodeLiveSnapshot(t, lt) + if final.Total != totalEvents { + t.Fatalf("final total = %d, want %d", final.Total, totalEvents) + } + + var endMem runtime.MemStats + runtime.ReadMemStats(&endMem) + if endMem.Alloc > startMem.Alloc+maxMemGrowth { + t.Fatalf("memory growth too high: start=%d end=%d limit=%d", startMem.Alloc, endMem.Alloc, startMem.Alloc+maxMemGrowth) + } + + rate := float64(totalEvents) / ingestDuration.Seconds() + t.Logf("stress ingest rate: %.0f events/sec (%d events in %s)", rate, totalEvents, ingestDuration) +} + func newTestPair(comm string, pid uint32, tid uint32, path string, duration uint64, gap uint64, bytesCnt uint64) *event.Pair { enter := &types.OpenEvent{ TraceId: types.SYS_ENTER_OPENAT, -- cgit v1.2.3