summaryrefslogtreecommitdiff
path: root/internal/statsengine
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-02-24 12:12:31 +0200
committerPaul Buetow <paul@buetow.org>2026-02-24 12:12:31 +0200
commit610d91472b3b37010130f33bd835c23e859caf56 (patch)
tree48cc2cb7e425c69135095ad748389afd0192c4d1 /internal/statsengine
parent0d4ef22478a470d86ce907beedcaa726d0d46c73 (diff)
statsengine: build snapshots outside engine mutex
Diffstat (limited to 'internal/statsengine')
-rw-r--r--internal/statsengine/engine.go94
-rw-r--r--internal/statsengine/filerank.go37
-rw-r--r--internal/statsengine/histogram.go23
-rw-r--r--internal/statsengine/process.go38
-rw-r--r--internal/statsengine/syscall.go61
5 files changed, 200 insertions, 53 deletions
diff --git a/internal/statsengine/engine.go b/internal/statsengine/engine.go
index 79123a5..18f83dd 100644
--- a/internal/statsengine/engine.go
+++ b/internal/statsengine/engine.go
@@ -35,6 +35,30 @@ type Engine struct {
throughputSeries *ringTimeSeries
}
+type snapshotInputs struct {
+ now time.Time
+ startedAt time.Time
+
+ totalSyscalls uint64
+ totalErrors uint64
+ totalBytes uint64
+ totalReadBytes uint64
+ totalWriteBytes uint64
+ totalLatency uint64
+ totalGap uint64
+
+ latencySeries []float64
+ gapSeries []float64
+ throughputSeries []float64
+
+ syscalls []syscallSnapshotInput
+ files []fileSnapshotInput
+ processes []processSnapshotInput
+
+ latencyHist histogramSnapshotInput
+ gapHist histogramSnapshotInput
+}
+
// NewEngine creates a new stats engine.
func NewEngine(topN int) *Engine {
return newEngineWithClock(topN, time.Now)
@@ -112,41 +136,55 @@ func (e *Engine) Snapshot() *Snapshot {
}
e.mu.Lock()
- defer e.mu.Unlock()
+ in := snapshotInputs{
+ now: e.now(),
+ startedAt: e.startedAt,
+ totalSyscalls: e.totalSyscalls,
+ totalErrors: e.totalErrors,
+ totalBytes: e.totalBytes,
+ totalReadBytes: e.totalReadBytes,
+ totalWriteBytes: e.totalWriteBytes,
+ totalLatency: e.totalLatency,
+ totalGap: e.totalGap,
+ latencySeries: e.latencySeries.Values(),
+ gapSeries: e.gapSeries.Values(),
+ throughputSeries: e.throughputSeries.Values(),
+ syscalls: e.syscalls.snapshotInputs(),
+ files: e.files.snapshotInputs(),
+ processes: e.processes.snapshotInputs(),
+ latencyHist: e.latencyHist.snapshotInputs(),
+ gapHist: e.gapHist.snapshotInputs(),
+ }
+ e.mu.Unlock()
- now := e.now()
- elapsed := nonNegativeDuration(now.Sub(e.startedAt))
+ elapsed := nonNegativeDuration(in.now.Sub(in.startedAt))
rateDiv := elapsed.Seconds()
- latencySeries := e.latencySeries.Values()
- gapSeries := e.gapSeries.Values()
- throughputSeries := e.throughputSeries.Values()
-
snapshot := NewSnapshot(
- latencySeries,
- gapSeries,
- throughputSeries,
- e.syscalls.Snapshot(elapsed),
- e.files.Snapshot(),
- e.processes.Snapshot(elapsed),
- e.latencyHist.Snapshot(),
- e.gapHist.Snapshot(),
+ in.latencySeries,
+ in.gapSeries,
+ in.throughputSeries,
+ buildSyscallSnapshots(in.syscalls, elapsed),
+ buildFileSnapshots(in.files),
+ buildProcessSnapshots(in.processes, elapsed),
+ buildHistogramSnapshot(in.latencyHist),
+ buildHistogramSnapshot(in.gapHist),
)
- snapshot.GeneratedAt = now
+ snapshot.GeneratedAt = in.now
snapshot.Elapsed = elapsed
- snapshot.TotalSyscalls = e.totalSyscalls
- snapshot.TotalErrors = e.totalErrors
- snapshot.TotalBytes = e.totalBytes
- snapshot.SyscallRatePerSec = safeRate(e.totalSyscalls, rateDiv)
- snapshot.ErrorRatePerSec = safeRate(e.totalErrors, rateDiv)
- snapshot.ReadBytesPerSec = safeRate(e.totalReadBytes, rateDiv)
- snapshot.WriteBytesPerSec = safeRate(e.totalWriteBytes, rateDiv)
- snapshot.LatencyMeanNs = safeMean(e.totalLatency, e.totalSyscalls)
- snapshot.GapMeanNs = safeMean(e.totalGap, e.totalSyscalls)
- snapshot.LatencyTrend = detectTrend(latencySeries)
- snapshot.GapTrend = detectTrend(gapSeries)
- snapshot.ThroughputTrend = detectTrend(throughputSeries)
+ snapshot.TotalSyscalls = in.totalSyscalls
+ snapshot.TotalErrors = in.totalErrors
+ snapshot.TotalBytes = in.totalBytes
+ snapshot.SyscallRatePerSec = safeRate(in.totalSyscalls, rateDiv)
+ snapshot.ErrorRatePerSec = safeRate(in.totalErrors, rateDiv)
+ snapshot.ReadBytesPerSec = safeRate(in.totalReadBytes, rateDiv)
+ snapshot.WriteBytesPerSec = safeRate(in.totalWriteBytes, rateDiv)
+ snapshot.LatencyMeanNs = safeMean(in.totalLatency, in.totalSyscalls)
+ snapshot.GapMeanNs = safeMean(in.totalGap, in.totalSyscalls)
+ snapshot.LatencyTrend = detectTrend(in.latencySeries)
+ snapshot.GapTrend = detectTrend(in.gapSeries)
+ snapshot.ThroughputTrend = detectTrend(in.throughputSeries)
return &snapshot
}
diff --git a/internal/statsengine/filerank.go b/internal/statsengine/filerank.go
index 9054ff7..6e8f27f 100644
--- a/internal/statsengine/filerank.go
+++ b/internal/statsengine/filerank.go
@@ -26,6 +26,15 @@ type fileRankStats struct {
heapIndex int
}
+type fileSnapshotInput struct {
+ path string
+ accesses uint64
+ bytesRead uint64
+ bytesWritten uint64
+ totalLatency uint64
+ maxLatency uint64
+}
+
type fileRankHeap []*fileRankStats
func newFileRanker() *fileRanker {
@@ -87,18 +96,38 @@ func (r *fileRanker) Snapshot() []FileSnapshot {
return nil
}
- out := make([]FileSnapshot, 0, len(r.topHeap))
+ return buildFileSnapshots(r.snapshotInputs())
+}
+
+func (r *fileRanker) snapshotInputs() []fileSnapshotInput {
+ if r == nil {
+ return nil
+ }
+ inputs := make([]fileSnapshotInput, 0, len(r.topHeap))
for _, stats := range r.topHeap {
- out = append(out, stats.snapshot())
+ inputs = append(inputs, fileSnapshotInput{
+ path: stats.path,
+ accesses: stats.accesses,
+ bytesRead: stats.bytesRead,
+ bytesWritten: stats.bytesWritten,
+ totalLatency: stats.totalLatency,
+ maxLatency: stats.maxLatency,
+ })
}
+ return inputs
+}
+func buildFileSnapshots(inputs []fileSnapshotInput) []FileSnapshot {
+ out := make([]FileSnapshot, 0, len(inputs))
+ for _, in := range inputs {
+ out = append(out, in.toSnapshot())
+ }
sort.Slice(out, func(i, j int) bool {
if out[i].Accesses != out[j].Accesses {
return out[i].Accesses > out[j].Accesses
}
return out[i].Path < out[j].Path
})
-
return out
}
@@ -153,7 +182,7 @@ func (r *fileRanker) compactIfNeeded() {
r.byPath = kept
}
-func (s *fileRankStats) snapshot() FileSnapshot {
+func (s fileSnapshotInput) toSnapshot() FileSnapshot {
avg := 0.0
if s.accesses > 0 {
avg = float64(s.totalLatency) / float64(s.accesses)
diff --git a/internal/statsengine/histogram.go b/internal/statsengine/histogram.go
index 550efe0..42460ea 100644
--- a/internal/statsengine/histogram.go
+++ b/internal/statsengine/histogram.go
@@ -7,6 +7,11 @@ type histogram struct {
total uint64
}
+type histogramSnapshotInput struct {
+ counts [histogramBucketCount]uint64
+ total uint64
+}
+
var histogramBoundariesNs = [histogramBucketCount - 1]uint64{
1_000,
10_000,
@@ -47,6 +52,20 @@ func (h *histogram) Snapshot() HistogramSnapshot {
return NewHistogramSnapshot(0, nil)
}
+ return buildHistogramSnapshot(h.snapshotInputs())
+}
+
+func (h *histogram) snapshotInputs() histogramSnapshotInput {
+ if h == nil {
+ return histogramSnapshotInput{}
+ }
+ return histogramSnapshotInput{
+ counts: h.counts,
+ total: h.total,
+ }
+}
+
+func buildHistogramSnapshot(in histogramSnapshotInput) HistogramSnapshot {
buckets := make([]HistogramBucketSnapshot, 0, histogramBucketCount)
for i := 0; i < histogramBucketCount; i++ {
lower, upper := histogramBucketRange(i)
@@ -54,11 +73,11 @@ func (h *histogram) Snapshot() HistogramSnapshot {
Label: histogramLabels[i],
LowerNs: lower,
UpperNs: upper,
- Count: h.counts[i],
+ Count: in.counts[i],
})
}
- return NewHistogramSnapshot(h.total, buckets)
+ return NewHistogramSnapshot(in.total, buckets)
}
func histogramBucketIndex(durationNs uint64) int {
diff --git a/internal/statsengine/process.go b/internal/statsengine/process.go
index 296312b..a0d29c1 100644
--- a/internal/statsengine/process.go
+++ b/internal/statsengine/process.go
@@ -18,6 +18,14 @@ type processStats struct {
totalLatency uint64
}
+type processSnapshotInput struct {
+ pid uint32
+ comm string
+ count uint64
+ totalBytes uint64
+ totalLatency uint64
+}
+
func newProcessAccumulator() *processAccumulator {
return &processAccumulator{byPID: make(map[uint32]*processStats)}
}
@@ -53,12 +61,33 @@ func (a *processAccumulator) Snapshot(elapsed time.Duration) []ProcessSnapshot {
return nil
}
- rateDiv := elapsed.Seconds()
- result := make([]ProcessSnapshot, 0, len(a.byPID))
+ return buildProcessSnapshots(a.snapshotInputs(), elapsed)
+}
+
+func (a *processAccumulator) snapshotInputs() []processSnapshotInput {
+ if a == nil {
+ return nil
+ }
+
+ inputs := make([]processSnapshotInput, 0, len(a.byPID))
for _, stats := range a.byPID {
- result = append(result, stats.toSnapshot(rateDiv))
+ inputs = append(inputs, processSnapshotInput{
+ pid: stats.pid,
+ comm: stats.comm,
+ count: stats.count,
+ totalBytes: stats.totalBytes,
+ totalLatency: stats.totalLatency,
+ })
}
+ return inputs
+}
+func buildProcessSnapshots(inputs []processSnapshotInput, elapsed time.Duration) []ProcessSnapshot {
+ rateDiv := elapsed.Seconds()
+ result := make([]ProcessSnapshot, 0, len(inputs))
+ for _, in := range inputs {
+ result = append(result, in.toSnapshot(rateDiv))
+ }
sort.Slice(result, func(i, j int) bool {
if result[i].Syscalls != result[j].Syscalls {
return result[i].Syscalls > result[j].Syscalls
@@ -68,11 +97,10 @@ func (a *processAccumulator) Snapshot(elapsed time.Duration) []ProcessSnapshot {
}
return result[i].PID < result[j].PID
})
-
return result
}
-func (s *processStats) toSnapshot(rateDiv float64) ProcessSnapshot {
+func (s processSnapshotInput) toSnapshot(rateDiv float64) ProcessSnapshot {
avg := 0.0
if s.count > 0 {
avg = float64(s.totalLatency) / float64(s.count)
diff --git a/internal/statsengine/syscall.go b/internal/statsengine/syscall.go
index b3b8c4c..fe54cb4 100644
--- a/internal/statsengine/syscall.go
+++ b/internal/statsengine/syscall.go
@@ -32,6 +32,18 @@ type syscallStats struct {
samples []uint64
}
+type syscallSnapshotInput struct {
+ traceID types.TraceId
+ name string
+ count uint64
+ errorCount uint64
+ totalBytes uint64
+ totalLatency uint64
+ minLatency uint64
+ maxLatency uint64
+ sortedSamples []uint64
+}
+
func newSyscallAccumulator() *syscallAccumulator {
return newSyscallAccumulatorWithConfig(syscallReservoirSampleCapDefault, rand.New(rand.NewSource(time.Now().UnixNano())))
}
@@ -79,19 +91,45 @@ func (a *syscallAccumulator) Snapshot(elapsed time.Duration) []SyscallSnapshot {
return nil
}
- rateDiv := elapsed.Seconds()
- result := make([]SyscallSnapshot, 0, len(a.byID))
- for _, stats := range a.byID {
- result = append(result, stats.toSnapshot(rateDiv))
+ return buildSyscallSnapshots(a.snapshotInputs(), elapsed)
+}
+
+func (a *syscallAccumulator) snapshotInputs() []syscallSnapshotInput {
+ if a == nil {
+ return nil
}
+ inputs := make([]syscallSnapshotInput, 0, len(a.byID))
+ for _, stats := range a.byID {
+ sorted := append([]uint64(nil), stats.samples...)
+ sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] })
+ inputs = append(inputs, syscallSnapshotInput{
+ traceID: stats.traceID,
+ name: stats.name,
+ count: stats.count,
+ errorCount: stats.errorCount,
+ totalBytes: stats.totalBytes,
+ totalLatency: stats.totalLatency,
+ minLatency: stats.minLatency,
+ maxLatency: stats.maxLatency,
+ sortedSamples: sorted,
+ })
+ }
+ return inputs
+}
+
+func buildSyscallSnapshots(inputs []syscallSnapshotInput, elapsed time.Duration) []SyscallSnapshot {
+ rateDiv := elapsed.Seconds()
+ result := make([]SyscallSnapshot, 0, len(inputs))
+ for _, in := range inputs {
+ result = append(result, in.toSnapshot(rateDiv))
+ }
sort.Slice(result, func(i, j int) bool {
if result[i].Count != result[j].Count {
return result[i].Count > result[j].Count
}
return result[i].Name < result[j].Name
})
-
return result
}
@@ -118,12 +156,7 @@ func (s *syscallStats) addSample(duration uint64, cap int, rng *rand.Rand) {
s.samples[idx] = duration
}
-func (s *syscallStats) toSnapshot(rateDiv float64) SyscallSnapshot {
- sortedSamples := append([]uint64(nil), s.samples...)
- sort.Slice(sortedSamples, func(i, j int) bool {
- return sortedSamples[i] < sortedSamples[j]
- })
-
+func (s syscallSnapshotInput) toSnapshot(rateDiv float64) SyscallSnapshot {
return SyscallSnapshot{
TraceID: s.traceID,
Name: s.name,
@@ -134,9 +167,9 @@ func (s *syscallStats) toSnapshot(rateDiv float64) SyscallSnapshot {
LatencyMinNs: s.minLatency,
LatencyMaxNs: s.maxLatency,
LatencyMeanNs: float64(s.totalLatency) / float64(maxU64(s.count, 1)),
- LatencyP50Ns: samplePercentile(sortedSamples, 0.50),
- LatencyP95Ns: samplePercentile(sortedSamples, 0.95),
- LatencyP99Ns: samplePercentile(sortedSamples, 0.99),
+ LatencyP50Ns: samplePercentile(s.sortedSamples, 0.50),
+ LatencyP95Ns: samplePercentile(s.sortedSamples, 0.95),
+ LatencyP99Ns: samplePercentile(s.sortedSamples, 0.99),
}
}