diff options
| author | Paul Buetow <paul@buetow.org> | 2026-05-22 09:34:05 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-05-22 09:34:05 +0300 |
| commit | b0914837b96afb99971e600d6c46e55875f04854 (patch) | |
| tree | fb9c92c675bd3e1c65f87b31ca508c0910fe3c30 /internal/syscall_aggregate_consumer.go | |
| parent | 74a241b702a595b86cabc5350bb5bf7c52d8e27b (diff) | |
xb make syscall aggregates per-cpu deltas
Diffstat (limited to 'internal/syscall_aggregate_consumer.go')
| -rw-r--r-- | internal/syscall_aggregate_consumer.go | 201 |
1 files changed, 186 insertions, 15 deletions
diff --git a/internal/syscall_aggregate_consumer.go b/internal/syscall_aggregate_consumer.go index dadd38e..e246eac 100644 --- a/internal/syscall_aggregate_consumer.go +++ b/internal/syscall_aggregate_consumer.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "errors" "fmt" - "syscall" "unsafe" "ior/internal/flags" @@ -20,6 +19,28 @@ const ( syscallSamplingRateMapName = "syscall_sampling_rate_map" ) +var syscallAggregateLatencyBucketLowerNs = [8]uint64{ + 0, + 1_000, + 10_000, + 100_000, + 1_000_000, + 10_000_000, + 100_000_000, + 1_000_000_000, +} + +var syscallAggregateLatencyBucketUpperNs = [8]uint64{ + 1_000, + 10_000, + 100_000, + 1_000_000, + 10_000_000, + 100_000_000, + 1_000_000_000, + 0, +} + type rawSyscallAggregate struct { Count uint64 Errors uint64 @@ -30,7 +51,27 @@ type rawSyscallAggregate struct { } type syscallAggregateConsumer struct { - aggregateMap *bpf.BPFMap + aggregateMap syscallAggregateMap + last map[types.TraceId]rawSyscallAggregate +} + +type syscallAggregateMap interface { + Iterator() syscallAggregateIterator + GetValue(unsafe.Pointer) ([]byte, error) +} + +type syscallAggregateIterator interface { + Next() bool + Key() []byte + Err() error +} + +type bpfSyscallAggregateMap struct { + *bpf.BPFMap +} + +func (m bpfSyscallAggregateMap) Iterator() syscallAggregateIterator { + return m.BPFMap.Iterator() } func newSyscallAggregateConsumer(module *bpf.Module) (*syscallAggregateConsumer, error) { @@ -41,7 +82,10 @@ func newSyscallAggregateConsumer(module *bpf.Module) (*syscallAggregateConsumer, if err != nil { return nil, fmt.Errorf("get %s: %w", syscallAggregateMapName, err) } - return &syscallAggregateConsumer{aggregateMap: aggregateMap}, nil + return &syscallAggregateConsumer{ + aggregateMap: bpfSyscallAggregateMap{BPFMap: aggregateMap}, + last: make(map[types.TraceId]rawSyscallAggregate), + }, nil } func (c *syscallAggregateConsumer) Drain() ([]statsengine.SyscallAggregate, error) { @@ -57,25 +101,28 @@ func (c *syscallAggregateConsumer) Drain() ([]statsengine.SyscallAggregate, erro continue } key := binary.LittleEndian.Uint32(keyRaw) - valueRaw, err := c.aggregateMap.GetValueAndDeleteKey(unsafe.Pointer(&key)) + valueRaw, err := c.aggregateMap.GetValue(unsafe.Pointer(&key)) if err != nil { - if errors.Is(err, syscall.ENOENT) { - continue - } return nil, fmt.Errorf("drain aggregate for trace id %d: %w", key, err) } - raw, err := decodeRawSyscallAggregate(valueRaw) + raw, err := decodeRawSyscallAggregateValue(valueRaw) if err != nil { return nil, fmt.Errorf("decode aggregate for trace id %d: %w", key, err) } + traceID := types.TraceId(key) + delta := raw.diff(c.last[traceID]) + c.last[traceID] = raw + if delta.Count == 0 { + continue + } rows = append(rows, statsengine.SyscallAggregate{ - TraceID: types.TraceId(key), - Count: raw.Count, - Errors: raw.Errors, - TotalLatencyNs: raw.TotalDuration, - MinLatencyNs: raw.MinDuration, - MaxLatencyNs: raw.MaxDuration, - LatencyHistogramNs: raw.Histogram, + TraceID: traceID, + Count: delta.Count, + Errors: delta.Errors, + TotalLatencyNs: delta.TotalDuration, + MinLatencyNs: delta.MinDuration, + MaxLatencyNs: delta.MaxDuration, + LatencyHistogramNs: delta.Histogram, }) } if err := iter.Err(); err != nil { @@ -96,6 +143,130 @@ func decodeRawSyscallAggregate(raw []byte) (rawSyscallAggregate, error) { return out, nil } +func decodeRawSyscallAggregateValue(raw []byte) (rawSyscallAggregate, error) { + var sample rawSyscallAggregate + valueSize := binary.Size(sample) + if len(raw) == valueSize { + return decodeRawSyscallAggregate(raw) + } + return decodeRawSyscallAggregatePerCPU(raw) +} + +func decodeRawSyscallAggregatePerCPU(raw []byte) (rawSyscallAggregate, error) { + var sample rawSyscallAggregate + valueSize := binary.Size(sample) + stride := roundUp(valueSize, 8) + if valueSize <= 0 || stride <= 0 || len(raw) == 0 || len(raw)%stride != 0 { + return rawSyscallAggregate{}, fmt.Errorf("invalid per-cpu aggregate value size %d (element size %d, stride %d)", len(raw), valueSize, stride) + } + + var out rawSyscallAggregate + for offset := 0; offset < len(raw); offset += stride { + next, err := decodeRawSyscallAggregate(raw[offset : offset+valueSize]) + if err != nil { + return rawSyscallAggregate{}, err + } + out = out.add(next) + } + return out, nil +} + +func roundUp(value, alignment int) int { + if alignment <= 0 { + return value + } + remainder := value % alignment + if remainder == 0 { + return value + } + return value + alignment - remainder +} + +func (r rawSyscallAggregate) add(next rawSyscallAggregate) rawSyscallAggregate { + if next.Count == 0 { + return r + } + if r.Count == 0 || next.MinDuration < r.MinDuration { + r.MinDuration = next.MinDuration + } + if next.MaxDuration > r.MaxDuration { + r.MaxDuration = next.MaxDuration + } + r.Count += next.Count + r.Errors += next.Errors + r.TotalDuration += next.TotalDuration + for i, value := range next.Histogram { + r.Histogram[i] += value + } + return r +} + +func (r rawSyscallAggregate) diff(prev rawSyscallAggregate) rawSyscallAggregate { + if r.Count < prev.Count || prev.Count == 0 { + return r + } + histogram := diffHistogram(r.Histogram, prev.Histogram) + minLatency, maxLatency, ok := latencyExtremaFromHistogram(histogram) + if !ok { + minLatency = r.MinDuration + maxLatency = r.MaxDuration + } + if r.MinDuration < prev.MinDuration { + minLatency = r.MinDuration + } + if r.MaxDuration > prev.MaxDuration { + maxLatency = r.MaxDuration + } + return rawSyscallAggregate{ + Count: r.Count - prev.Count, + Errors: subtractU64(r.Errors, prev.Errors), + TotalDuration: subtractU64(r.TotalDuration, prev.TotalDuration), + MinDuration: minLatency, + MaxDuration: maxLatency, + Histogram: histogram, + } +} + +func latencyExtremaFromHistogram(histogram [8]uint64) (minLatency uint64, maxLatency uint64, ok bool) { + minIndex := -1 + maxIndex := -1 + for i, count := range histogram { + if count == 0 { + continue + } + if minIndex < 0 { + minIndex = i + } + maxIndex = i + } + if minIndex < 0 { + return 0, 0, false + } + minLatency = syscallAggregateLatencyBucketLowerNs[minIndex] + maxLatency = syscallAggregateLatencyBucketUpperNs[maxIndex] + if maxLatency == 0 { + maxLatency = syscallAggregateLatencyBucketLowerNs[maxIndex] + } else { + maxLatency-- + } + return minLatency, maxLatency, true +} + +func diffHistogram(current, previous [8]uint64) [8]uint64 { + var out [8]uint64 + for i, value := range current { + out[i] = subtractU64(value, previous[i]) + } + return out +} + +func subtractU64(current, previous uint64) uint64 { + if current < previous { + return 0 + } + return current - previous +} + func applySyscallSamplingRates(cfg flags.Config, module *bpf.Module) error { samplingMap, err := module.GetMap(syscallSamplingRateMapName) if err != nil { |
