diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-27 08:06:29 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-27 08:06:29 +0300 |
| commit | 60e9a8fc836dc60ca5038890f3a0fa646ee2450b (patch) | |
| tree | f037d045ac00e49d496c88a8329e41ec5109042c /internal/flamegraph | |
| parent | 3d3fcacbdc28c4296b1477e34999011b5a7d93f2 (diff) | |
fix(flamegraph): lock metric field reads in AddRecord (1p)
Diffstat (limited to 'internal/flamegraph')
| -rw-r--r-- | internal/flamegraph/livetrie.go | 5 | ||||
| -rw-r--r-- | internal/flamegraph/livetrie_test.go | 66 |
2 files changed, 70 insertions, 1 deletions
diff --git a/internal/flamegraph/livetrie.go b/internal/flamegraph/livetrie.go index a510a72..47299e6 100644 --- a/internal/flamegraph/livetrie.go +++ b/internal/flamegraph/livetrie.go @@ -102,19 +102,22 @@ func (lt *LiveTrie) Ingest(ep *event.Pair) { // AddRecord adds one already-decoded flamegraph record into the live trie. func (lt *LiveTrie) AddRecord(record IterRecord) { + lt.mu.Lock() + value, err := record.Cnt.ValueByName(lt.countField) if err != nil { + lt.mu.Unlock() return } heightValue := uint64(0) if lt.heightField != "" { heightValue, err = record.Cnt.ValueByName(lt.heightField) if err != nil { + lt.mu.Unlock() return } } - lt.mu.Lock() frames := lt.buildFrames(record) lt.addLocked(frames, value, heightValue) lt.version.Add(1) diff --git a/internal/flamegraph/livetrie_test.go b/internal/flamegraph/livetrie_test.go index a12f5d9..fc9e6a6 100644 --- a/internal/flamegraph/livetrie_test.go +++ b/internal/flamegraph/livetrie_test.go @@ -521,6 +521,72 @@ func TestLiveTrieConcurrentIngestAndSnapshot(t *testing.T) { } } +func TestLiveTrieConcurrentAddRecordAndMetricToggle(t *testing.T) { + lt := NewLiveTrie([]string{"comm", "pid"}, "count", "count") + + const iterations = 1500 + errCh := make(chan error, 2) + var wg sync.WaitGroup + wg.Add(3) + + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + lt.AddRecord(IterRecord{ + Comm: "svc", + Pid: 42, + TraceID: types.SYS_ENTER_READ, + Cnt: Counter{ + Count: 1, + Bytes: 2, + Duration: 3, + DurationToPrev: 4, + }, + }) + } + }() + + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + field := "count" + if i%2 == 1 { + field = "bytes" + } + if err := lt.SetCountField(field); err != nil { + errCh <- fmt.Errorf("set count field %q: %w", field, err) + return + } + } + }() + + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + field := "count" + if i%2 == 1 { + field = "bytes" + } + if err := lt.SetHeightField(field); err != nil { + errCh <- fmt.Errorf("set height field %q: %w", field, err) + return + } + } + }() + + wg.Wait() + close(errCh) + for err := range errCh { + t.Fatal(err) + } + + payload, _ := lt.SnapshotJSON() + var snap SnapshotNode + if err := json.Unmarshal(payload, &snap); err != nil { + t.Fatalf("unmarshal snapshot after concurrent updates: %v", err) + } +} + func TestLiveTrieStressHighRateConcurrentSnapshot(t *testing.T) { if os.Getenv("IOR_STRESS_TEST") != "1" { t.Skip("set IOR_STRESS_TEST=1 to run stress test") |
