summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/flamegraph/livetrie.go5
-rw-r--r--internal/flamegraph/livetrie_test.go66
2 files changed, 70 insertions, 1 deletions
diff --git a/internal/flamegraph/livetrie.go b/internal/flamegraph/livetrie.go
index a510a72..47299e6 100644
--- a/internal/flamegraph/livetrie.go
+++ b/internal/flamegraph/livetrie.go
@@ -102,19 +102,22 @@ func (lt *LiveTrie) Ingest(ep *event.Pair) {
// AddRecord adds one already-decoded flamegraph record into the live trie.
func (lt *LiveTrie) AddRecord(record IterRecord) {
+ lt.mu.Lock()
+
value, err := record.Cnt.ValueByName(lt.countField)
if err != nil {
+ lt.mu.Unlock()
return
}
heightValue := uint64(0)
if lt.heightField != "" {
heightValue, err = record.Cnt.ValueByName(lt.heightField)
if err != nil {
+ lt.mu.Unlock()
return
}
}
- lt.mu.Lock()
frames := lt.buildFrames(record)
lt.addLocked(frames, value, heightValue)
lt.version.Add(1)
diff --git a/internal/flamegraph/livetrie_test.go b/internal/flamegraph/livetrie_test.go
index a12f5d9..fc9e6a6 100644
--- a/internal/flamegraph/livetrie_test.go
+++ b/internal/flamegraph/livetrie_test.go
@@ -521,6 +521,72 @@ func TestLiveTrieConcurrentIngestAndSnapshot(t *testing.T) {
}
}
+func TestLiveTrieConcurrentAddRecordAndMetricToggle(t *testing.T) {
+ lt := NewLiveTrie([]string{"comm", "pid"}, "count", "count")
+
+ const iterations = 1500
+ errCh := make(chan error, 2)
+ var wg sync.WaitGroup
+ wg.Add(3)
+
+ go func() {
+ defer wg.Done()
+ for i := 0; i < iterations; i++ {
+ lt.AddRecord(IterRecord{
+ Comm: "svc",
+ Pid: 42,
+ TraceID: types.SYS_ENTER_READ,
+ Cnt: Counter{
+ Count: 1,
+ Bytes: 2,
+ Duration: 3,
+ DurationToPrev: 4,
+ },
+ })
+ }
+ }()
+
+ go func() {
+ defer wg.Done()
+ for i := 0; i < iterations; i++ {
+ field := "count"
+ if i%2 == 1 {
+ field = "bytes"
+ }
+ if err := lt.SetCountField(field); err != nil {
+ errCh <- fmt.Errorf("set count field %q: %w", field, err)
+ return
+ }
+ }
+ }()
+
+ go func() {
+ defer wg.Done()
+ for i := 0; i < iterations; i++ {
+ field := "count"
+ if i%2 == 1 {
+ field = "bytes"
+ }
+ if err := lt.SetHeightField(field); err != nil {
+ errCh <- fmt.Errorf("set height field %q: %w", field, err)
+ return
+ }
+ }
+ }()
+
+ wg.Wait()
+ close(errCh)
+ for err := range errCh {
+ t.Fatal(err)
+ }
+
+ payload, _ := lt.SnapshotJSON()
+ var snap SnapshotNode
+ if err := json.Unmarshal(payload, &snap); err != nil {
+ t.Fatalf("unmarshal snapshot after concurrent updates: %v", err)
+ }
+}
+
func TestLiveTrieStressHighRateConcurrentSnapshot(t *testing.T) {
if os.Getenv("IOR_STRESS_TEST") != "1" {
t.Skip("set IOR_STRESS_TEST=1 to run stress test")