summaryrefslogtreecommitdiff
path: root/internal/syscall_aggregate_consumer.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-22 09:34:05 +0300
committerPaul Buetow <paul@buetow.org>2026-05-22 09:34:05 +0300
commitb0914837b96afb99971e600d6c46e55875f04854 (patch)
treefb9c92c675bd3e1c65f87b31ca508c0910fe3c30 /internal/syscall_aggregate_consumer.go
parent74a241b702a595b86cabc5350bb5bf7c52d8e27b (diff)
xb make syscall aggregates per-cpu deltas
Diffstat (limited to 'internal/syscall_aggregate_consumer.go')
-rw-r--r--internal/syscall_aggregate_consumer.go201
1 files changed, 186 insertions, 15 deletions
diff --git a/internal/syscall_aggregate_consumer.go b/internal/syscall_aggregate_consumer.go
index dadd38e..e246eac 100644
--- a/internal/syscall_aggregate_consumer.go
+++ b/internal/syscall_aggregate_consumer.go
@@ -5,7 +5,6 @@ import (
"encoding/binary"
"errors"
"fmt"
- "syscall"
"unsafe"
"ior/internal/flags"
@@ -20,6 +19,28 @@ const (
syscallSamplingRateMapName = "syscall_sampling_rate_map"
)
+var syscallAggregateLatencyBucketLowerNs = [8]uint64{
+ 0,
+ 1_000,
+ 10_000,
+ 100_000,
+ 1_000_000,
+ 10_000_000,
+ 100_000_000,
+ 1_000_000_000,
+}
+
+var syscallAggregateLatencyBucketUpperNs = [8]uint64{
+ 1_000,
+ 10_000,
+ 100_000,
+ 1_000_000,
+ 10_000_000,
+ 100_000_000,
+ 1_000_000_000,
+ 0,
+}
+
type rawSyscallAggregate struct {
Count uint64
Errors uint64
@@ -30,7 +51,27 @@ type rawSyscallAggregate struct {
}
type syscallAggregateConsumer struct {
- aggregateMap *bpf.BPFMap
+ aggregateMap syscallAggregateMap
+ last map[types.TraceId]rawSyscallAggregate
+}
+
+type syscallAggregateMap interface {
+ Iterator() syscallAggregateIterator
+ GetValue(unsafe.Pointer) ([]byte, error)
+}
+
+type syscallAggregateIterator interface {
+ Next() bool
+ Key() []byte
+ Err() error
+}
+
+type bpfSyscallAggregateMap struct {
+ *bpf.BPFMap
+}
+
+func (m bpfSyscallAggregateMap) Iterator() syscallAggregateIterator {
+ return m.BPFMap.Iterator()
}
func newSyscallAggregateConsumer(module *bpf.Module) (*syscallAggregateConsumer, error) {
@@ -41,7 +82,10 @@ func newSyscallAggregateConsumer(module *bpf.Module) (*syscallAggregateConsumer,
if err != nil {
return nil, fmt.Errorf("get %s: %w", syscallAggregateMapName, err)
}
- return &syscallAggregateConsumer{aggregateMap: aggregateMap}, nil
+ return &syscallAggregateConsumer{
+ aggregateMap: bpfSyscallAggregateMap{BPFMap: aggregateMap},
+ last: make(map[types.TraceId]rawSyscallAggregate),
+ }, nil
}
func (c *syscallAggregateConsumer) Drain() ([]statsengine.SyscallAggregate, error) {
@@ -57,25 +101,28 @@ func (c *syscallAggregateConsumer) Drain() ([]statsengine.SyscallAggregate, erro
continue
}
key := binary.LittleEndian.Uint32(keyRaw)
- valueRaw, err := c.aggregateMap.GetValueAndDeleteKey(unsafe.Pointer(&key))
+ valueRaw, err := c.aggregateMap.GetValue(unsafe.Pointer(&key))
if err != nil {
- if errors.Is(err, syscall.ENOENT) {
- continue
- }
return nil, fmt.Errorf("drain aggregate for trace id %d: %w", key, err)
}
- raw, err := decodeRawSyscallAggregate(valueRaw)
+ raw, err := decodeRawSyscallAggregateValue(valueRaw)
if err != nil {
return nil, fmt.Errorf("decode aggregate for trace id %d: %w", key, err)
}
+ traceID := types.TraceId(key)
+ delta := raw.diff(c.last[traceID])
+ c.last[traceID] = raw
+ if delta.Count == 0 {
+ continue
+ }
rows = append(rows, statsengine.SyscallAggregate{
- TraceID: types.TraceId(key),
- Count: raw.Count,
- Errors: raw.Errors,
- TotalLatencyNs: raw.TotalDuration,
- MinLatencyNs: raw.MinDuration,
- MaxLatencyNs: raw.MaxDuration,
- LatencyHistogramNs: raw.Histogram,
+ TraceID: traceID,
+ Count: delta.Count,
+ Errors: delta.Errors,
+ TotalLatencyNs: delta.TotalDuration,
+ MinLatencyNs: delta.MinDuration,
+ MaxLatencyNs: delta.MaxDuration,
+ LatencyHistogramNs: delta.Histogram,
})
}
if err := iter.Err(); err != nil {
@@ -96,6 +143,130 @@ func decodeRawSyscallAggregate(raw []byte) (rawSyscallAggregate, error) {
return out, nil
}
+func decodeRawSyscallAggregateValue(raw []byte) (rawSyscallAggregate, error) {
+ var sample rawSyscallAggregate
+ valueSize := binary.Size(sample)
+ if len(raw) == valueSize {
+ return decodeRawSyscallAggregate(raw)
+ }
+ return decodeRawSyscallAggregatePerCPU(raw)
+}
+
+func decodeRawSyscallAggregatePerCPU(raw []byte) (rawSyscallAggregate, error) {
+ var sample rawSyscallAggregate
+ valueSize := binary.Size(sample)
+ stride := roundUp(valueSize, 8)
+ if valueSize <= 0 || stride <= 0 || len(raw) == 0 || len(raw)%stride != 0 {
+ return rawSyscallAggregate{}, fmt.Errorf("invalid per-cpu aggregate value size %d (element size %d, stride %d)", len(raw), valueSize, stride)
+ }
+
+ var out rawSyscallAggregate
+ for offset := 0; offset < len(raw); offset += stride {
+ next, err := decodeRawSyscallAggregate(raw[offset : offset+valueSize])
+ if err != nil {
+ return rawSyscallAggregate{}, err
+ }
+ out = out.add(next)
+ }
+ return out, nil
+}
+
+func roundUp(value, alignment int) int {
+ if alignment <= 0 {
+ return value
+ }
+ remainder := value % alignment
+ if remainder == 0 {
+ return value
+ }
+ return value + alignment - remainder
+}
+
+func (r rawSyscallAggregate) add(next rawSyscallAggregate) rawSyscallAggregate {
+ if next.Count == 0 {
+ return r
+ }
+ if r.Count == 0 || next.MinDuration < r.MinDuration {
+ r.MinDuration = next.MinDuration
+ }
+ if next.MaxDuration > r.MaxDuration {
+ r.MaxDuration = next.MaxDuration
+ }
+ r.Count += next.Count
+ r.Errors += next.Errors
+ r.TotalDuration += next.TotalDuration
+ for i, value := range next.Histogram {
+ r.Histogram[i] += value
+ }
+ return r
+}
+
+func (r rawSyscallAggregate) diff(prev rawSyscallAggregate) rawSyscallAggregate {
+ if r.Count < prev.Count || prev.Count == 0 {
+ return r
+ }
+ histogram := diffHistogram(r.Histogram, prev.Histogram)
+ minLatency, maxLatency, ok := latencyExtremaFromHistogram(histogram)
+ if !ok {
+ minLatency = r.MinDuration
+ maxLatency = r.MaxDuration
+ }
+ if r.MinDuration < prev.MinDuration {
+ minLatency = r.MinDuration
+ }
+ if r.MaxDuration > prev.MaxDuration {
+ maxLatency = r.MaxDuration
+ }
+ return rawSyscallAggregate{
+ Count: r.Count - prev.Count,
+ Errors: subtractU64(r.Errors, prev.Errors),
+ TotalDuration: subtractU64(r.TotalDuration, prev.TotalDuration),
+ MinDuration: minLatency,
+ MaxDuration: maxLatency,
+ Histogram: histogram,
+ }
+}
+
+func latencyExtremaFromHistogram(histogram [8]uint64) (minLatency uint64, maxLatency uint64, ok bool) {
+ minIndex := -1
+ maxIndex := -1
+ for i, count := range histogram {
+ if count == 0 {
+ continue
+ }
+ if minIndex < 0 {
+ minIndex = i
+ }
+ maxIndex = i
+ }
+ if minIndex < 0 {
+ return 0, 0, false
+ }
+ minLatency = syscallAggregateLatencyBucketLowerNs[minIndex]
+ maxLatency = syscallAggregateLatencyBucketUpperNs[maxIndex]
+ if maxLatency == 0 {
+ maxLatency = syscallAggregateLatencyBucketLowerNs[maxIndex]
+ } else {
+ maxLatency--
+ }
+ return minLatency, maxLatency, true
+}
+
+func diffHistogram(current, previous [8]uint64) [8]uint64 {
+ var out [8]uint64
+ for i, value := range current {
+ out[i] = subtractU64(value, previous[i])
+ }
+ return out
+}
+
+func subtractU64(current, previous uint64) uint64 {
+ if current < previous {
+ return 0
+ }
+ return current - previous
+}
+
func applySyscallSamplingRates(cfg flags.Config, module *bpf.Module) error {
samplingMap, err := module.GetMap(syscallSamplingRateMapName)
if err != nil {