summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-05-22 09:34:05 +0300
committerPaul Buetow <paul@buetow.org>2026-05-22 09:34:05 +0300
commitb0914837b96afb99971e600d6c46e55875f04854 (patch)
treefb9c92c675bd3e1c65f87b31ca508c0910fe3c30 /internal
parent74a241b702a595b86cabc5350bb5bf7c52d8e27b (diff)
xb make syscall aggregates per-cpu deltas
Diffstat (limited to 'internal')
-rw-r--r--internal/c/maps.h2
-rw-r--r--internal/syscall_aggregate_consumer.go201
-rw-r--r--internal/syscall_aggregate_consumer_test.go280
3 files changed, 467 insertions, 16 deletions
diff --git a/internal/c/maps.h b/internal/c/maps.h
index 79a1367..24f24e7 100644
--- a/internal/c/maps.h
+++ b/internal/c/maps.h
@@ -61,7 +61,7 @@ struct {
} syscall_enter_state_map SEC(".maps");
struct {
- __uint(type, BPF_MAP_TYPE_HASH);
+ __uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__uint(max_entries, 4096);
__type(key, __u32);
__type(value, struct syscall_aggregate);
diff --git a/internal/syscall_aggregate_consumer.go b/internal/syscall_aggregate_consumer.go
index dadd38e..e246eac 100644
--- a/internal/syscall_aggregate_consumer.go
+++ b/internal/syscall_aggregate_consumer.go
@@ -5,7 +5,6 @@ import (
"encoding/binary"
"errors"
"fmt"
- "syscall"
"unsafe"
"ior/internal/flags"
@@ -20,6 +19,28 @@ const (
syscallSamplingRateMapName = "syscall_sampling_rate_map"
)
+var syscallAggregateLatencyBucketLowerNs = [8]uint64{
+ 0,
+ 1_000,
+ 10_000,
+ 100_000,
+ 1_000_000,
+ 10_000_000,
+ 100_000_000,
+ 1_000_000_000,
+}
+
+var syscallAggregateLatencyBucketUpperNs = [8]uint64{
+ 1_000,
+ 10_000,
+ 100_000,
+ 1_000_000,
+ 10_000_000,
+ 100_000_000,
+ 1_000_000_000,
+ 0,
+}
+
type rawSyscallAggregate struct {
Count uint64
Errors uint64
@@ -30,7 +51,27 @@ type rawSyscallAggregate struct {
}
type syscallAggregateConsumer struct {
- aggregateMap *bpf.BPFMap
+ aggregateMap syscallAggregateMap
+ last map[types.TraceId]rawSyscallAggregate
+}
+
+type syscallAggregateMap interface {
+ Iterator() syscallAggregateIterator
+ GetValue(unsafe.Pointer) ([]byte, error)
+}
+
+type syscallAggregateIterator interface {
+ Next() bool
+ Key() []byte
+ Err() error
+}
+
+type bpfSyscallAggregateMap struct {
+ *bpf.BPFMap
+}
+
+func (m bpfSyscallAggregateMap) Iterator() syscallAggregateIterator {
+ return m.BPFMap.Iterator()
}
func newSyscallAggregateConsumer(module *bpf.Module) (*syscallAggregateConsumer, error) {
@@ -41,7 +82,10 @@ func newSyscallAggregateConsumer(module *bpf.Module) (*syscallAggregateConsumer,
if err != nil {
return nil, fmt.Errorf("get %s: %w", syscallAggregateMapName, err)
}
- return &syscallAggregateConsumer{aggregateMap: aggregateMap}, nil
+ return &syscallAggregateConsumer{
+ aggregateMap: bpfSyscallAggregateMap{BPFMap: aggregateMap},
+ last: make(map[types.TraceId]rawSyscallAggregate),
+ }, nil
}
func (c *syscallAggregateConsumer) Drain() ([]statsengine.SyscallAggregate, error) {
@@ -57,25 +101,28 @@ func (c *syscallAggregateConsumer) Drain() ([]statsengine.SyscallAggregate, erro
continue
}
key := binary.LittleEndian.Uint32(keyRaw)
- valueRaw, err := c.aggregateMap.GetValueAndDeleteKey(unsafe.Pointer(&key))
+ valueRaw, err := c.aggregateMap.GetValue(unsafe.Pointer(&key))
if err != nil {
- if errors.Is(err, syscall.ENOENT) {
- continue
- }
return nil, fmt.Errorf("drain aggregate for trace id %d: %w", key, err)
}
- raw, err := decodeRawSyscallAggregate(valueRaw)
+ raw, err := decodeRawSyscallAggregateValue(valueRaw)
if err != nil {
return nil, fmt.Errorf("decode aggregate for trace id %d: %w", key, err)
}
+ traceID := types.TraceId(key)
+ delta := raw.diff(c.last[traceID])
+ c.last[traceID] = raw
+ if delta.Count == 0 {
+ continue
+ }
rows = append(rows, statsengine.SyscallAggregate{
- TraceID: types.TraceId(key),
- Count: raw.Count,
- Errors: raw.Errors,
- TotalLatencyNs: raw.TotalDuration,
- MinLatencyNs: raw.MinDuration,
- MaxLatencyNs: raw.MaxDuration,
- LatencyHistogramNs: raw.Histogram,
+ TraceID: traceID,
+ Count: delta.Count,
+ Errors: delta.Errors,
+ TotalLatencyNs: delta.TotalDuration,
+ MinLatencyNs: delta.MinDuration,
+ MaxLatencyNs: delta.MaxDuration,
+ LatencyHistogramNs: delta.Histogram,
})
}
if err := iter.Err(); err != nil {
@@ -96,6 +143,130 @@ func decodeRawSyscallAggregate(raw []byte) (rawSyscallAggregate, error) {
return out, nil
}
+func decodeRawSyscallAggregateValue(raw []byte) (rawSyscallAggregate, error) {
+ var sample rawSyscallAggregate
+ valueSize := binary.Size(sample)
+ if len(raw) == valueSize {
+ return decodeRawSyscallAggregate(raw)
+ }
+ return decodeRawSyscallAggregatePerCPU(raw)
+}
+
+func decodeRawSyscallAggregatePerCPU(raw []byte) (rawSyscallAggregate, error) {
+ var sample rawSyscallAggregate
+ valueSize := binary.Size(sample)
+ stride := roundUp(valueSize, 8)
+ if valueSize <= 0 || stride <= 0 || len(raw) == 0 || len(raw)%stride != 0 {
+ return rawSyscallAggregate{}, fmt.Errorf("invalid per-cpu aggregate value size %d (element size %d, stride %d)", len(raw), valueSize, stride)
+ }
+
+ var out rawSyscallAggregate
+ for offset := 0; offset < len(raw); offset += stride {
+ next, err := decodeRawSyscallAggregate(raw[offset : offset+valueSize])
+ if err != nil {
+ return rawSyscallAggregate{}, err
+ }
+ out = out.add(next)
+ }
+ return out, nil
+}
+
+func roundUp(value, alignment int) int {
+ if alignment <= 0 {
+ return value
+ }
+ remainder := value % alignment
+ if remainder == 0 {
+ return value
+ }
+ return value + alignment - remainder
+}
+
+func (r rawSyscallAggregate) add(next rawSyscallAggregate) rawSyscallAggregate {
+ if next.Count == 0 {
+ return r
+ }
+ if r.Count == 0 || next.MinDuration < r.MinDuration {
+ r.MinDuration = next.MinDuration
+ }
+ if next.MaxDuration > r.MaxDuration {
+ r.MaxDuration = next.MaxDuration
+ }
+ r.Count += next.Count
+ r.Errors += next.Errors
+ r.TotalDuration += next.TotalDuration
+ for i, value := range next.Histogram {
+ r.Histogram[i] += value
+ }
+ return r
+}
+
+func (r rawSyscallAggregate) diff(prev rawSyscallAggregate) rawSyscallAggregate {
+ if r.Count < prev.Count || prev.Count == 0 {
+ return r
+ }
+ histogram := diffHistogram(r.Histogram, prev.Histogram)
+ minLatency, maxLatency, ok := latencyExtremaFromHistogram(histogram)
+ if !ok {
+ minLatency = r.MinDuration
+ maxLatency = r.MaxDuration
+ }
+ if r.MinDuration < prev.MinDuration {
+ minLatency = r.MinDuration
+ }
+ if r.MaxDuration > prev.MaxDuration {
+ maxLatency = r.MaxDuration
+ }
+ return rawSyscallAggregate{
+ Count: r.Count - prev.Count,
+ Errors: subtractU64(r.Errors, prev.Errors),
+ TotalDuration: subtractU64(r.TotalDuration, prev.TotalDuration),
+ MinDuration: minLatency,
+ MaxDuration: maxLatency,
+ Histogram: histogram,
+ }
+}
+
+func latencyExtremaFromHistogram(histogram [8]uint64) (minLatency uint64, maxLatency uint64, ok bool) {
+ minIndex := -1
+ maxIndex := -1
+ for i, count := range histogram {
+ if count == 0 {
+ continue
+ }
+ if minIndex < 0 {
+ minIndex = i
+ }
+ maxIndex = i
+ }
+ if minIndex < 0 {
+ return 0, 0, false
+ }
+ minLatency = syscallAggregateLatencyBucketLowerNs[minIndex]
+ maxLatency = syscallAggregateLatencyBucketUpperNs[maxIndex]
+ if maxLatency == 0 {
+ maxLatency = syscallAggregateLatencyBucketLowerNs[maxIndex]
+ } else {
+ maxLatency--
+ }
+ return minLatency, maxLatency, true
+}
+
+func diffHistogram(current, previous [8]uint64) [8]uint64 {
+ var out [8]uint64
+ for i, value := range current {
+ out[i] = subtractU64(value, previous[i])
+ }
+ return out
+}
+
+func subtractU64(current, previous uint64) uint64 {
+ if current < previous {
+ return 0
+ }
+ return current - previous
+}
+
func applySyscallSamplingRates(cfg flags.Config, module *bpf.Module) error {
samplingMap, err := module.GetMap(syscallSamplingRateMapName)
if err != nil {
diff --git a/internal/syscall_aggregate_consumer_test.go b/internal/syscall_aggregate_consumer_test.go
index 362dfba..f8ef217 100644
--- a/internal/syscall_aggregate_consumer_test.go
+++ b/internal/syscall_aggregate_consumer_test.go
@@ -3,9 +3,12 @@ package internal
import (
"bytes"
"encoding/binary"
+ "fmt"
"testing"
+ "unsafe"
"ior/internal/flags"
+ "ior/internal/statsengine"
"ior/internal/types"
)
@@ -69,3 +72,280 @@ func TestDecodeRawSyscallAggregateRejectsBadSize(t *testing.T) {
t.Fatal("expected error for short value")
}
}
+
+func TestDecodeRawSyscallAggregatePerCPUSumsActiveCPUs(t *testing.T) {
+ raw := encodeRawAggregates(t,
+ rawSyscallAggregate{
+ Count: 2,
+ Errors: 1,
+ TotalDuration: 30,
+ MinDuration: 10,
+ MaxDuration: 20,
+ Histogram: [8]uint64{1, 0, 1},
+ },
+ rawSyscallAggregate{},
+ rawSyscallAggregate{
+ Count: 3,
+ Errors: 0,
+ TotalDuration: 90,
+ MinDuration: 5,
+ MaxDuration: 50,
+ Histogram: [8]uint64{0, 2, 1},
+ },
+ )
+
+ got, err := decodeRawSyscallAggregatePerCPU(raw)
+ if err != nil {
+ t.Fatalf("decodeRawSyscallAggregatePerCPU error: %v", err)
+ }
+
+ want := rawSyscallAggregate{
+ Count: 5,
+ Errors: 1,
+ TotalDuration: 120,
+ MinDuration: 5,
+ MaxDuration: 50,
+ Histogram: [8]uint64{1, 2, 2},
+ }
+ if got != want {
+ t.Fatalf("per-cpu aggregate = %+v, want %+v", got, want)
+ }
+}
+
+func TestDecodeRawSyscallAggregatePerCPURejectsBadSize(t *testing.T) {
+ raw := encodeRawAggregates(t, rawSyscallAggregate{Count: 1})
+ raw = append(raw, 0)
+
+ if _, err := decodeRawSyscallAggregatePerCPU(raw); err == nil {
+ t.Fatal("expected error for non-stride-aligned value")
+ }
+}
+
+func TestDecodeRawSyscallAggregatePerCPURejectsEmptyValue(t *testing.T) {
+ if _, err := decodeRawSyscallAggregatePerCPU(nil); err == nil {
+ t.Fatal("expected error for empty per-cpu value")
+ }
+}
+
+func TestSyscallAggregateConsumerDrainEmitsDeltas(t *testing.T) {
+ const traceID = uint32(types.SYS_ENTER_FUTEX)
+ fakeMap := newFakeSyscallAggregateMap(traceID, encodeRawAggregates(t,
+ rawSyscallAggregate{
+ Count: 2,
+ Errors: 1,
+ TotalDuration: 30,
+ MinDuration: 10,
+ MaxDuration: 20,
+ Histogram: [8]uint64{1, 0, 1},
+ },
+ rawSyscallAggregate{
+ Count: 3,
+ TotalDuration: 90,
+ MinDuration: 5,
+ MaxDuration: 50,
+ Histogram: [8]uint64{0, 2, 1},
+ },
+ ))
+ consumer := &syscallAggregateConsumer{
+ aggregateMap: fakeMap,
+ last: make(map[types.TraceId]rawSyscallAggregate),
+ }
+
+ rows, err := consumer.Drain()
+ if err != nil {
+ t.Fatalf("first Drain error: %v", err)
+ }
+ assertAggregateRows(t, rows, statsengine.SyscallAggregate{
+ TraceID: types.TraceId(traceID),
+ Count: 5,
+ Errors: 1,
+ TotalLatencyNs: 120,
+ MinLatencyNs: 5,
+ MaxLatencyNs: 50,
+ LatencyHistogramNs: [8]uint64{
+ 1, 2, 2,
+ },
+ })
+
+ fakeMap.values[traceID] = encodeRawAggregates(t,
+ rawSyscallAggregate{
+ Count: 4,
+ Errors: 2,
+ TotalDuration: 80,
+ MinDuration: 4,
+ MaxDuration: 40,
+ Histogram: [8]uint64{2, 1, 1},
+ },
+ rawSyscallAggregate{
+ Count: 3,
+ TotalDuration: 110,
+ MinDuration: 5,
+ MaxDuration: 70,
+ Histogram: [8]uint64{0, 2, 1, 1},
+ },
+ )
+ rows, err = consumer.Drain()
+ if err != nil {
+ t.Fatalf("second Drain error: %v", err)
+ }
+ assertAggregateRows(t, rows, statsengine.SyscallAggregate{
+ TraceID: types.TraceId(traceID),
+ Count: 2,
+ Errors: 1,
+ TotalLatencyNs: 70,
+ MinLatencyNs: 4,
+ MaxLatencyNs: 70,
+ LatencyHistogramNs: [8]uint64{
+ 1, 1, 0, 1,
+ },
+ })
+
+ fakeMap.values[traceID] = encodeRawAggregates(t,
+ rawSyscallAggregate{
+ Count: 5,
+ Errors: 2,
+ TotalDuration: 100,
+ MinDuration: 4,
+ MaxDuration: 40,
+ Histogram: [8]uint64{3, 1, 1},
+ },
+ rawSyscallAggregate{
+ Count: 3,
+ TotalDuration: 110,
+ MinDuration: 5,
+ MaxDuration: 70,
+ Histogram: [8]uint64{0, 2, 1, 1},
+ },
+ )
+ rows, err = consumer.Drain()
+ if err != nil {
+ t.Fatalf("third Drain error: %v", err)
+ }
+ assertAggregateRows(t, rows, statsengine.SyscallAggregate{
+ TraceID: types.TraceId(traceID),
+ Count: 1,
+ Errors: 0,
+ TotalLatencyNs: 20,
+ MinLatencyNs: 0,
+ MaxLatencyNs: 999,
+ LatencyHistogramNs: [8]uint64{
+ 1,
+ },
+ })
+
+ rows, err = consumer.Drain()
+ if err != nil {
+ t.Fatalf("fourth Drain error: %v", err)
+ }
+ if len(rows) != 0 {
+ t.Fatalf("fourth Drain rows = %+v, want none for zero delta", rows)
+ }
+}
+
+func TestRawSyscallAggregateDiffReturnsOnlyNewCounts(t *testing.T) {
+ prev := rawSyscallAggregate{
+ Count: 5,
+ Errors: 1,
+ TotalDuration: 100,
+ MinDuration: 10,
+ MaxDuration: 40,
+ Histogram: [8]uint64{1, 2, 2},
+ }
+ current := rawSyscallAggregate{
+ Count: 9,
+ Errors: 3,
+ TotalDuration: 190,
+ MinDuration: 5,
+ MaxDuration: 80,
+ Histogram: [8]uint64{2, 5, 2, 1},
+ }
+
+ got := current.diff(prev)
+ want := rawSyscallAggregate{
+ Count: 4,
+ Errors: 2,
+ TotalDuration: 90,
+ MinDuration: 5,
+ MaxDuration: 80,
+ Histogram: [8]uint64{1, 3, 0, 1},
+ }
+ if got != want {
+ t.Fatalf("aggregate diff = %+v, want %+v", got, want)
+ }
+}
+
+func encodeRawAggregates(t *testing.T, values ...rawSyscallAggregate) []byte {
+ t.Helper()
+
+ var buf bytes.Buffer
+ for _, value := range values {
+ if err := binary.Write(&buf, binary.LittleEndian, value); err != nil {
+ t.Fatalf("binary write: %v", err)
+ }
+ }
+ return buf.Bytes()
+}
+
+func assertAggregateRows(t *testing.T, got []statsengine.SyscallAggregate, want statsengine.SyscallAggregate) {
+ t.Helper()
+
+ if len(got) != 1 {
+ t.Fatalf("Drain rows = %+v, want one row", got)
+ }
+ if got[0] != want {
+ t.Fatalf("Drain row = %+v, want %+v", got[0], want)
+ }
+}
+
+type fakeSyscallAggregateMap struct {
+ keys [][]byte
+ values map[uint32][]byte
+}
+
+func newFakeSyscallAggregateMap(traceID uint32, value []byte) *fakeSyscallAggregateMap {
+ key := make([]byte, 4)
+ binary.LittleEndian.PutUint32(key, traceID)
+ return &fakeSyscallAggregateMap{
+ keys: [][]byte{key},
+ values: map[uint32][]byte{
+ traceID: value,
+ },
+ }
+}
+
+func (m *fakeSyscallAggregateMap) Iterator() syscallAggregateIterator {
+ return &fakeSyscallAggregateIterator{keys: m.keys}
+}
+
+func (m *fakeSyscallAggregateMap) GetValue(keyPtr unsafe.Pointer) ([]byte, error) {
+ key := *(*uint32)(keyPtr)
+ value, ok := m.values[key]
+ if !ok {
+ return nil, fmt.Errorf("missing value for key %d", key)
+ }
+ return append([]byte(nil), value...), nil
+}
+
+type fakeSyscallAggregateIterator struct {
+ keys [][]byte
+ next int
+}
+
+func (i *fakeSyscallAggregateIterator) Next() bool {
+ if i.next >= len(i.keys) {
+ return false
+ }
+ i.next++
+ return i.next <= len(i.keys)
+}
+
+func (i *fakeSyscallAggregateIterator) Key() []byte {
+ if i.next == 0 || i.next > len(i.keys) {
+ return nil
+ }
+ return i.keys[i.next-1]
+}
+
+func (i *fakeSyscallAggregateIterator) Err() error {
+ return nil
+}