diff options
| author | Paul Buetow <paul@buetow.org> | 2026-02-27 18:11:19 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-02-27 18:11:19 +0200 |
| commit | 592a645a578d68af3f0f1daae69ee8d91b5ced00 (patch) | |
| tree | 0c0701ff8a2baef009813fd2b28673d94f2b2ef7 /internal/flamegraph | |
| parent | 812267c2d82d887dc143ec10abda6d4b8474f757 (diff) | |
flamegraph: add opt-in live trie stress QA test
Diffstat (limited to 'internal/flamegraph')
| -rw-r--r-- | internal/flamegraph/livetrie_test.go | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/internal/flamegraph/livetrie_test.go b/internal/flamegraph/livetrie_test.go index 6982a86..5d32209 100644 --- a/internal/flamegraph/livetrie_test.go +++ b/internal/flamegraph/livetrie_test.go @@ -3,11 +3,16 @@ package flamegraph import ( "bytes" "encoding/json" + "fmt" "ior/internal/event" "ior/internal/file" "ior/internal/types" + "os" + "runtime" "sync" + "sync/atomic" "testing" + "time" ) func TestLiveTrieIngestAndSnapshotRoundTrip(t *testing.T) { @@ -117,6 +122,86 @@ func TestLiveTrieConcurrentIngestAndSnapshot(t *testing.T) { } } +func TestLiveTrieStressHighRateConcurrentSnapshot(t *testing.T) { + if os.Getenv("IOR_STRESS_TEST") != "1" { + t.Skip("set IOR_STRESS_TEST=1 to run stress test") + } + + const ( + totalEvents = 50000 + snapshotReaders = 3 + maxMemGrowth = 512 << 20 + ) + + lt := NewLiveTrie([]string{"path", "pid"}, "count") + var startMem runtime.MemStats + runtime.ReadMemStats(&startMem) + + done := make(chan struct{}) + var snapshots atomic.Int64 + errCh := make(chan error, snapshotReaders) + var readersWG sync.WaitGroup + readersWG.Add(snapshotReaders) + for i := 0; i < snapshotReaders; i++ { + go func() { + defer readersWG.Done() + ticker := time.NewTicker(2 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-done: + return + case <-ticker.C: + payload, _ := lt.SnapshotJSON() + var snap trieSnapshot + if err := json.Unmarshal(payload, &snap); err != nil { + errCh <- fmt.Errorf("snapshot json invalid: %w", err) + return + } + snapshots.Add(1) + } + } + }() + } + + start := time.Now() + for i := 0; i < totalEvents; i++ { + pid := uint32(1000 + (i % 32)) + tid := uint32(100000 + i) + path := fmt.Sprintf("/stress/%05d", i) + lt.Ingest(newTestPair("qa-load", pid, tid, path, 1, 1, 1)) + } + ingestDuration := time.Since(start) + + close(done) + readersWG.Wait() + close(errCh) + for err := range errCh { + t.Fatal(err) + } + + if got := lt.Version(); got != totalEvents { + t.Fatalf("version = %d, want %d", got, totalEvents) + } + if snapshots.Load() == 0 { + t.Fatalf("expected at least one concurrent snapshot read") + } + + final := decodeLiveSnapshot(t, lt) + if final.Total != totalEvents { + t.Fatalf("final total = %d, want %d", final.Total, totalEvents) + } + + var endMem runtime.MemStats + runtime.ReadMemStats(&endMem) + if endMem.Alloc > startMem.Alloc+maxMemGrowth { + t.Fatalf("memory growth too high: start=%d end=%d limit=%d", startMem.Alloc, endMem.Alloc, startMem.Alloc+maxMemGrowth) + } + + rate := float64(totalEvents) / ingestDuration.Seconds() + t.Logf("stress ingest rate: %.0f events/sec (%d events in %s)", rate, totalEvents, ingestDuration) +} + func newTestPair(comm string, pid uint32, tid uint32, path string, duration uint64, gap uint64, bytesCnt uint64) *event.Pair { enter := &types.OpenEvent{ TraceId: types.SYS_ENTER_OPENAT, |
