diff options
| author | Paul Buetow <paul@buetow.org> | 2026-02-24 12:12:31 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-02-24 12:12:31 +0200 |
| commit | 610d91472b3b37010130f33bd835c23e859caf56 (patch) | |
| tree | 48cc2cb7e425c69135095ad748389afd0192c4d1 /internal/statsengine | |
| parent | 0d4ef22478a470d86ce907beedcaa726d0d46c73 (diff) | |
statsengine: build snapshots outside engine mutex
Diffstat (limited to 'internal/statsengine')
| -rw-r--r-- | internal/statsengine/engine.go | 94 | ||||
| -rw-r--r-- | internal/statsengine/filerank.go | 37 | ||||
| -rw-r--r-- | internal/statsengine/histogram.go | 23 | ||||
| -rw-r--r-- | internal/statsengine/process.go | 38 | ||||
| -rw-r--r-- | internal/statsengine/syscall.go | 61 |
5 files changed, 200 insertions, 53 deletions
diff --git a/internal/statsengine/engine.go b/internal/statsengine/engine.go index 79123a5..18f83dd 100644 --- a/internal/statsengine/engine.go +++ b/internal/statsengine/engine.go @@ -35,6 +35,30 @@ type Engine struct { throughputSeries *ringTimeSeries } +type snapshotInputs struct { + now time.Time + startedAt time.Time + + totalSyscalls uint64 + totalErrors uint64 + totalBytes uint64 + totalReadBytes uint64 + totalWriteBytes uint64 + totalLatency uint64 + totalGap uint64 + + latencySeries []float64 + gapSeries []float64 + throughputSeries []float64 + + syscalls []syscallSnapshotInput + files []fileSnapshotInput + processes []processSnapshotInput + + latencyHist histogramSnapshotInput + gapHist histogramSnapshotInput +} + // NewEngine creates a new stats engine. func NewEngine(topN int) *Engine { return newEngineWithClock(topN, time.Now) @@ -112,41 +136,55 @@ func (e *Engine) Snapshot() *Snapshot { } e.mu.Lock() - defer e.mu.Unlock() + in := snapshotInputs{ + now: e.now(), + startedAt: e.startedAt, + totalSyscalls: e.totalSyscalls, + totalErrors: e.totalErrors, + totalBytes: e.totalBytes, + totalReadBytes: e.totalReadBytes, + totalWriteBytes: e.totalWriteBytes, + totalLatency: e.totalLatency, + totalGap: e.totalGap, + latencySeries: e.latencySeries.Values(), + gapSeries: e.gapSeries.Values(), + throughputSeries: e.throughputSeries.Values(), + syscalls: e.syscalls.snapshotInputs(), + files: e.files.snapshotInputs(), + processes: e.processes.snapshotInputs(), + latencyHist: e.latencyHist.snapshotInputs(), + gapHist: e.gapHist.snapshotInputs(), + } + e.mu.Unlock() - now := e.now() - elapsed := nonNegativeDuration(now.Sub(e.startedAt)) + elapsed := nonNegativeDuration(in.now.Sub(in.startedAt)) rateDiv := elapsed.Seconds() - latencySeries := e.latencySeries.Values() - gapSeries := e.gapSeries.Values() - throughputSeries := e.throughputSeries.Values() - snapshot := NewSnapshot( - latencySeries, - gapSeries, - throughputSeries, - e.syscalls.Snapshot(elapsed), - e.files.Snapshot(), - e.processes.Snapshot(elapsed), - e.latencyHist.Snapshot(), - e.gapHist.Snapshot(), + in.latencySeries, + in.gapSeries, + in.throughputSeries, + buildSyscallSnapshots(in.syscalls, elapsed), + buildFileSnapshots(in.files), + buildProcessSnapshots(in.processes, elapsed), + buildHistogramSnapshot(in.latencyHist), + buildHistogramSnapshot(in.gapHist), ) - snapshot.GeneratedAt = now + snapshot.GeneratedAt = in.now snapshot.Elapsed = elapsed - snapshot.TotalSyscalls = e.totalSyscalls - snapshot.TotalErrors = e.totalErrors - snapshot.TotalBytes = e.totalBytes - snapshot.SyscallRatePerSec = safeRate(e.totalSyscalls, rateDiv) - snapshot.ErrorRatePerSec = safeRate(e.totalErrors, rateDiv) - snapshot.ReadBytesPerSec = safeRate(e.totalReadBytes, rateDiv) - snapshot.WriteBytesPerSec = safeRate(e.totalWriteBytes, rateDiv) - snapshot.LatencyMeanNs = safeMean(e.totalLatency, e.totalSyscalls) - snapshot.GapMeanNs = safeMean(e.totalGap, e.totalSyscalls) - snapshot.LatencyTrend = detectTrend(latencySeries) - snapshot.GapTrend = detectTrend(gapSeries) - snapshot.ThroughputTrend = detectTrend(throughputSeries) + snapshot.TotalSyscalls = in.totalSyscalls + snapshot.TotalErrors = in.totalErrors + snapshot.TotalBytes = in.totalBytes + snapshot.SyscallRatePerSec = safeRate(in.totalSyscalls, rateDiv) + snapshot.ErrorRatePerSec = safeRate(in.totalErrors, rateDiv) + snapshot.ReadBytesPerSec = safeRate(in.totalReadBytes, rateDiv) + snapshot.WriteBytesPerSec = safeRate(in.totalWriteBytes, rateDiv) + snapshot.LatencyMeanNs = safeMean(in.totalLatency, in.totalSyscalls) + snapshot.GapMeanNs = safeMean(in.totalGap, in.totalSyscalls) + snapshot.LatencyTrend = detectTrend(in.latencySeries) + snapshot.GapTrend = detectTrend(in.gapSeries) + snapshot.ThroughputTrend = detectTrend(in.throughputSeries) return &snapshot } diff --git a/internal/statsengine/filerank.go b/internal/statsengine/filerank.go index 9054ff7..6e8f27f 100644 --- a/internal/statsengine/filerank.go +++ b/internal/statsengine/filerank.go @@ -26,6 +26,15 @@ type fileRankStats struct { heapIndex int } +type fileSnapshotInput struct { + path string + accesses uint64 + bytesRead uint64 + bytesWritten uint64 + totalLatency uint64 + maxLatency uint64 +} + type fileRankHeap []*fileRankStats func newFileRanker() *fileRanker { @@ -87,18 +96,38 @@ func (r *fileRanker) Snapshot() []FileSnapshot { return nil } - out := make([]FileSnapshot, 0, len(r.topHeap)) + return buildFileSnapshots(r.snapshotInputs()) +} + +func (r *fileRanker) snapshotInputs() []fileSnapshotInput { + if r == nil { + return nil + } + inputs := make([]fileSnapshotInput, 0, len(r.topHeap)) for _, stats := range r.topHeap { - out = append(out, stats.snapshot()) + inputs = append(inputs, fileSnapshotInput{ + path: stats.path, + accesses: stats.accesses, + bytesRead: stats.bytesRead, + bytesWritten: stats.bytesWritten, + totalLatency: stats.totalLatency, + maxLatency: stats.maxLatency, + }) } + return inputs +} +func buildFileSnapshots(inputs []fileSnapshotInput) []FileSnapshot { + out := make([]FileSnapshot, 0, len(inputs)) + for _, in := range inputs { + out = append(out, in.toSnapshot()) + } sort.Slice(out, func(i, j int) bool { if out[i].Accesses != out[j].Accesses { return out[i].Accesses > out[j].Accesses } return out[i].Path < out[j].Path }) - return out } @@ -153,7 +182,7 @@ func (r *fileRanker) compactIfNeeded() { r.byPath = kept } -func (s *fileRankStats) snapshot() FileSnapshot { +func (s fileSnapshotInput) toSnapshot() FileSnapshot { avg := 0.0 if s.accesses > 0 { avg = float64(s.totalLatency) / float64(s.accesses) diff --git a/internal/statsengine/histogram.go b/internal/statsengine/histogram.go index 550efe0..42460ea 100644 --- a/internal/statsengine/histogram.go +++ b/internal/statsengine/histogram.go @@ -7,6 +7,11 @@ type histogram struct { total uint64 } +type histogramSnapshotInput struct { + counts [histogramBucketCount]uint64 + total uint64 +} + var histogramBoundariesNs = [histogramBucketCount - 1]uint64{ 1_000, 10_000, @@ -47,6 +52,20 @@ func (h *histogram) Snapshot() HistogramSnapshot { return NewHistogramSnapshot(0, nil) } + return buildHistogramSnapshot(h.snapshotInputs()) +} + +func (h *histogram) snapshotInputs() histogramSnapshotInput { + if h == nil { + return histogramSnapshotInput{} + } + return histogramSnapshotInput{ + counts: h.counts, + total: h.total, + } +} + +func buildHistogramSnapshot(in histogramSnapshotInput) HistogramSnapshot { buckets := make([]HistogramBucketSnapshot, 0, histogramBucketCount) for i := 0; i < histogramBucketCount; i++ { lower, upper := histogramBucketRange(i) @@ -54,11 +73,11 @@ func (h *histogram) Snapshot() HistogramSnapshot { Label: histogramLabels[i], LowerNs: lower, UpperNs: upper, - Count: h.counts[i], + Count: in.counts[i], }) } - return NewHistogramSnapshot(h.total, buckets) + return NewHistogramSnapshot(in.total, buckets) } func histogramBucketIndex(durationNs uint64) int { diff --git a/internal/statsengine/process.go b/internal/statsengine/process.go index 296312b..a0d29c1 100644 --- a/internal/statsengine/process.go +++ b/internal/statsengine/process.go @@ -18,6 +18,14 @@ type processStats struct { totalLatency uint64 } +type processSnapshotInput struct { + pid uint32 + comm string + count uint64 + totalBytes uint64 + totalLatency uint64 +} + func newProcessAccumulator() *processAccumulator { return &processAccumulator{byPID: make(map[uint32]*processStats)} } @@ -53,12 +61,33 @@ func (a *processAccumulator) Snapshot(elapsed time.Duration) []ProcessSnapshot { return nil } - rateDiv := elapsed.Seconds() - result := make([]ProcessSnapshot, 0, len(a.byPID)) + return buildProcessSnapshots(a.snapshotInputs(), elapsed) +} + +func (a *processAccumulator) snapshotInputs() []processSnapshotInput { + if a == nil { + return nil + } + + inputs := make([]processSnapshotInput, 0, len(a.byPID)) for _, stats := range a.byPID { - result = append(result, stats.toSnapshot(rateDiv)) + inputs = append(inputs, processSnapshotInput{ + pid: stats.pid, + comm: stats.comm, + count: stats.count, + totalBytes: stats.totalBytes, + totalLatency: stats.totalLatency, + }) } + return inputs +} +func buildProcessSnapshots(inputs []processSnapshotInput, elapsed time.Duration) []ProcessSnapshot { + rateDiv := elapsed.Seconds() + result := make([]ProcessSnapshot, 0, len(inputs)) + for _, in := range inputs { + result = append(result, in.toSnapshot(rateDiv)) + } sort.Slice(result, func(i, j int) bool { if result[i].Syscalls != result[j].Syscalls { return result[i].Syscalls > result[j].Syscalls @@ -68,11 +97,10 @@ func (a *processAccumulator) Snapshot(elapsed time.Duration) []ProcessSnapshot { } return result[i].PID < result[j].PID }) - return result } -func (s *processStats) toSnapshot(rateDiv float64) ProcessSnapshot { +func (s processSnapshotInput) toSnapshot(rateDiv float64) ProcessSnapshot { avg := 0.0 if s.count > 0 { avg = float64(s.totalLatency) / float64(s.count) diff --git a/internal/statsengine/syscall.go b/internal/statsengine/syscall.go index b3b8c4c..fe54cb4 100644 --- a/internal/statsengine/syscall.go +++ b/internal/statsengine/syscall.go @@ -32,6 +32,18 @@ type syscallStats struct { samples []uint64 } +type syscallSnapshotInput struct { + traceID types.TraceId + name string + count uint64 + errorCount uint64 + totalBytes uint64 + totalLatency uint64 + minLatency uint64 + maxLatency uint64 + sortedSamples []uint64 +} + func newSyscallAccumulator() *syscallAccumulator { return newSyscallAccumulatorWithConfig(syscallReservoirSampleCapDefault, rand.New(rand.NewSource(time.Now().UnixNano()))) } @@ -79,19 +91,45 @@ func (a *syscallAccumulator) Snapshot(elapsed time.Duration) []SyscallSnapshot { return nil } - rateDiv := elapsed.Seconds() - result := make([]SyscallSnapshot, 0, len(a.byID)) - for _, stats := range a.byID { - result = append(result, stats.toSnapshot(rateDiv)) + return buildSyscallSnapshots(a.snapshotInputs(), elapsed) +} + +func (a *syscallAccumulator) snapshotInputs() []syscallSnapshotInput { + if a == nil { + return nil } + inputs := make([]syscallSnapshotInput, 0, len(a.byID)) + for _, stats := range a.byID { + sorted := append([]uint64(nil), stats.samples...) + sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] }) + inputs = append(inputs, syscallSnapshotInput{ + traceID: stats.traceID, + name: stats.name, + count: stats.count, + errorCount: stats.errorCount, + totalBytes: stats.totalBytes, + totalLatency: stats.totalLatency, + minLatency: stats.minLatency, + maxLatency: stats.maxLatency, + sortedSamples: sorted, + }) + } + return inputs +} + +func buildSyscallSnapshots(inputs []syscallSnapshotInput, elapsed time.Duration) []SyscallSnapshot { + rateDiv := elapsed.Seconds() + result := make([]SyscallSnapshot, 0, len(inputs)) + for _, in := range inputs { + result = append(result, in.toSnapshot(rateDiv)) + } sort.Slice(result, func(i, j int) bool { if result[i].Count != result[j].Count { return result[i].Count > result[j].Count } return result[i].Name < result[j].Name }) - return result } @@ -118,12 +156,7 @@ func (s *syscallStats) addSample(duration uint64, cap int, rng *rand.Rand) { s.samples[idx] = duration } -func (s *syscallStats) toSnapshot(rateDiv float64) SyscallSnapshot { - sortedSamples := append([]uint64(nil), s.samples...) - sort.Slice(sortedSamples, func(i, j int) bool { - return sortedSamples[i] < sortedSamples[j] - }) - +func (s syscallSnapshotInput) toSnapshot(rateDiv float64) SyscallSnapshot { return SyscallSnapshot{ TraceID: s.traceID, Name: s.name, @@ -134,9 +167,9 @@ func (s *syscallStats) toSnapshot(rateDiv float64) SyscallSnapshot { LatencyMinNs: s.minLatency, LatencyMaxNs: s.maxLatency, LatencyMeanNs: float64(s.totalLatency) / float64(maxU64(s.count, 1)), - LatencyP50Ns: samplePercentile(sortedSamples, 0.50), - LatencyP95Ns: samplePercentile(sortedSamples, 0.95), - LatencyP99Ns: samplePercentile(sortedSamples, 0.99), + LatencyP50Ns: samplePercentile(s.sortedSamples, 0.50), + LatencyP95Ns: samplePercentile(s.sortedSamples, 0.95), + LatencyP99Ns: samplePercentile(s.sortedSamples, 0.99), } } |
