diff options
Diffstat (limited to 'internal/statsengine/engine.go')
| -rw-r--r-- | internal/statsengine/engine.go | 128 |
1 files changed, 67 insertions, 61 deletions
diff --git a/internal/statsengine/engine.go b/internal/statsengine/engine.go index fb85558..6681c66 100644 --- a/internal/statsengine/engine.go +++ b/internal/statsengine/engine.go @@ -159,14 +159,22 @@ func (e *Engine) updateErrorAndByteClasses(pair *event.Pair) { } } -// Snapshot returns an immutable point-in-time view of all stats. -func (e *Engine) Snapshot() *Snapshot { - if e == nil { - return nil - } +// subSnapshots holds the concurrently built per-category snapshot slices. +type subSnapshots struct { + syscalls []SyscallSnapshot + files []FileSnapshot + processes []ProcessSnapshot + latencyHist HistogramSnapshot + gapHist HistogramSnapshot +} +// captureSnapshotInputs copies all engine state under the lock so that the +// subsequent (lock-free) computation does not block ingestion. +func (e *Engine) captureSnapshotInputs() snapshotInputs { e.mu.Lock() - in := snapshotInputs{ + defer e.mu.Unlock() + + return snapshotInputs{ now: e.now(), startedAt: e.startedAt, totalSyscalls: e.totalSyscalls, @@ -185,70 +193,68 @@ func (e *Engine) Snapshot() *Snapshot { latencyHist: e.latencyHist.snapshotInputs(), gapHist: e.gapHist.snapshotInputs(), } - e.mu.Unlock() - - elapsed := nonNegativeDuration(in.now.Sub(in.startedAt)) - rateDiv := elapsed.Seconds() +} +// buildSubSnapshots runs all five per-category snapshot builders concurrently +// and returns their results bundled together. +func buildSubSnapshots(in snapshotInputs, elapsed time.Duration) subSnapshots { var ( - syscallsSnap []SyscallSnapshot - filesSnap []FileSnapshot - processesSnap []ProcessSnapshot - latencyHistSnap HistogramSnapshot - gapHistSnap HistogramSnapshot + ss subSnapshots + wg sync.WaitGroup ) - var wg sync.WaitGroup wg.Add(5) - go func() { - defer wg.Done() - syscallsSnap = buildSyscallSnapshots(in.syscalls, elapsed) - }() - go func() { - defer wg.Done() - filesSnap = buildFileSnapshots(in.files) - }() - go func() { - defer wg.Done() - processesSnap = buildProcessSnapshots(in.processes, elapsed) - }() - go func() { - defer wg.Done() - latencyHistSnap = buildHistogramSnapshot(in.latencyHist) - }() - go func() { - defer wg.Done() - gapHistSnap = buildHistogramSnapshot(in.gapHist) - }() + go func() { defer wg.Done(); ss.syscalls = buildSyscallSnapshots(in.syscalls, elapsed) }() + go func() { defer wg.Done(); ss.files = buildFileSnapshots(in.files) }() + go func() { defer wg.Done(); ss.processes = buildProcessSnapshots(in.processes, elapsed) }() + go func() { defer wg.Done(); ss.latencyHist = buildHistogramSnapshot(in.latencyHist) }() + go func() { defer wg.Done(); ss.gapHist = buildHistogramSnapshot(in.gapHist) }() wg.Wait() - snapshot := NewSnapshot( - in.latencySeries, - in.gapSeries, - in.throughputSeries, - syscallsSnap, - filesSnap, - processesSnap, - latencyHistSnap, - gapHistSnap, + return ss +} + +// populateSnapshotFields fills in the scalar fields of the snapshot from the +// captured inputs and pre-computed elapsed/rateDiv values. +func populateSnapshotFields(snap *Snapshot, in snapshotInputs, elapsed time.Duration) { + rateDiv := elapsed.Seconds() + + snap.GeneratedAt = in.now + snap.Elapsed = elapsed + snap.TotalSyscalls = in.totalSyscalls + snap.TotalErrors = in.totalErrors + snap.TotalBytes = in.totalBytes + snap.SyscallRatePerSec = safeRate(in.totalSyscalls, rateDiv) + snap.ErrorRatePerSec = safeRate(in.totalErrors, rateDiv) + snap.ReadBytesPerSec = safeRate(in.totalReadBytes, rateDiv) + snap.WriteBytesPerSec = safeRate(in.totalWriteBytes, rateDiv) + snap.LatencyMeanNs = safeMean(in.totalLatency, in.totalSyscalls) + snap.GapMeanNs = safeMean(in.totalGap, in.totalSyscalls) + snap.LatencyTrend = detectTrend(in.latencySeries) + snap.GapTrend = detectTrend(in.gapSeries) + snap.ThroughputTrend = detectTrend(in.throughputSeries) +} + +// Snapshot returns an immutable point-in-time view of all stats. +// It captures engine state under the lock, then builds sub-snapshots +// concurrently, and finally assembles the result without holding the lock. +func (e *Engine) Snapshot() *Snapshot { + if e == nil { + return nil + } + + in := e.captureSnapshotInputs() + elapsed := nonNegativeDuration(in.now.Sub(in.startedAt)) + ss := buildSubSnapshots(in, elapsed) + + snap := NewSnapshot( + in.latencySeries, in.gapSeries, in.throughputSeries, + ss.syscalls, ss.files, ss.processes, + ss.latencyHist, ss.gapHist, ) + populateSnapshotFields(&snap, in, elapsed) - snapshot.GeneratedAt = in.now - snapshot.Elapsed = elapsed - snapshot.TotalSyscalls = in.totalSyscalls - snapshot.TotalErrors = in.totalErrors - snapshot.TotalBytes = in.totalBytes - snapshot.SyscallRatePerSec = safeRate(in.totalSyscalls, rateDiv) - snapshot.ErrorRatePerSec = safeRate(in.totalErrors, rateDiv) - snapshot.ReadBytesPerSec = safeRate(in.totalReadBytes, rateDiv) - snapshot.WriteBytesPerSec = safeRate(in.totalWriteBytes, rateDiv) - snapshot.LatencyMeanNs = safeMean(in.totalLatency, in.totalSyscalls) - snapshot.GapMeanNs = safeMean(in.totalGap, in.totalSyscalls) - snapshot.LatencyTrend = detectTrend(in.latencySeries) - snapshot.GapTrend = detectTrend(in.gapSeries) - snapshot.ThroughputTrend = detectTrend(in.throughputSeries) - - return &snapshot + return &snap } func safeMean(total uint64, count uint64) float64 { |
