From 60e9a8fc836dc60ca5038890f3a0fa646ee2450b Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 27 May 2026 08:06:29 +0300 Subject: fix(flamegraph): lock metric field reads in AddRecord (1p) --- internal/flamegraph/livetrie.go | 5 ++- internal/flamegraph/livetrie_test.go | 66 ++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) (limited to 'internal/flamegraph') diff --git a/internal/flamegraph/livetrie.go b/internal/flamegraph/livetrie.go index a510a72..47299e6 100644 --- a/internal/flamegraph/livetrie.go +++ b/internal/flamegraph/livetrie.go @@ -102,19 +102,22 @@ func (lt *LiveTrie) Ingest(ep *event.Pair) { // AddRecord adds one already-decoded flamegraph record into the live trie. func (lt *LiveTrie) AddRecord(record IterRecord) { + lt.mu.Lock() + value, err := record.Cnt.ValueByName(lt.countField) if err != nil { + lt.mu.Unlock() return } heightValue := uint64(0) if lt.heightField != "" { heightValue, err = record.Cnt.ValueByName(lt.heightField) if err != nil { + lt.mu.Unlock() return } } - lt.mu.Lock() frames := lt.buildFrames(record) lt.addLocked(frames, value, heightValue) lt.version.Add(1) diff --git a/internal/flamegraph/livetrie_test.go b/internal/flamegraph/livetrie_test.go index a12f5d9..fc9e6a6 100644 --- a/internal/flamegraph/livetrie_test.go +++ b/internal/flamegraph/livetrie_test.go @@ -521,6 +521,72 @@ func TestLiveTrieConcurrentIngestAndSnapshot(t *testing.T) { } } +func TestLiveTrieConcurrentAddRecordAndMetricToggle(t *testing.T) { + lt := NewLiveTrie([]string{"comm", "pid"}, "count", "count") + + const iterations = 1500 + errCh := make(chan error, 2) + var wg sync.WaitGroup + wg.Add(3) + + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + lt.AddRecord(IterRecord{ + Comm: "svc", + Pid: 42, + TraceID: types.SYS_ENTER_READ, + Cnt: Counter{ + Count: 1, + Bytes: 2, + Duration: 3, + DurationToPrev: 4, + }, + }) + } + }() + + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + field := "count" + if i%2 == 1 { + field = "bytes" + } + if err := lt.SetCountField(field); err != nil { + errCh <- fmt.Errorf("set count field %q: %w", field, err) + return + } + } + }() + + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + field := "count" + if i%2 == 1 { + field = "bytes" + } + if err := lt.SetHeightField(field); err != nil { + errCh <- fmt.Errorf("set height field %q: %w", field, err) + return + } + } + }() + + wg.Wait() + close(errCh) + for err := range errCh { + t.Fatal(err) + } + + payload, _ := lt.SnapshotJSON() + var snap SnapshotNode + if err := json.Unmarshal(payload, &snap); err != nil { + t.Fatalf("unmarshal snapshot after concurrent updates: %v", err) + } +} + func TestLiveTrieStressHighRateConcurrentSnapshot(t *testing.T) { if os.Getenv("IOR_STRESS_TEST") != "1" { t.Skip("set IOR_STRESS_TEST=1 to run stress test") -- cgit v1.2.3