diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/c/maps.h | 2 | ||||
| -rw-r--r-- | internal/syscall_aggregate_consumer.go | 201 | ||||
| -rw-r--r-- | internal/syscall_aggregate_consumer_test.go | 280 |
3 files changed, 467 insertions, 16 deletions
diff --git a/internal/c/maps.h b/internal/c/maps.h index 79a1367..24f24e7 100644 --- a/internal/c/maps.h +++ b/internal/c/maps.h @@ -61,7 +61,7 @@ struct { } syscall_enter_state_map SEC(".maps"); struct { - __uint(type, BPF_MAP_TYPE_HASH); + __uint(type, BPF_MAP_TYPE_PERCPU_HASH); __uint(max_entries, 4096); __type(key, __u32); __type(value, struct syscall_aggregate); diff --git a/internal/syscall_aggregate_consumer.go b/internal/syscall_aggregate_consumer.go index dadd38e..e246eac 100644 --- a/internal/syscall_aggregate_consumer.go +++ b/internal/syscall_aggregate_consumer.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "errors" "fmt" - "syscall" "unsafe" "ior/internal/flags" @@ -20,6 +19,28 @@ const ( syscallSamplingRateMapName = "syscall_sampling_rate_map" ) +var syscallAggregateLatencyBucketLowerNs = [8]uint64{ + 0, + 1_000, + 10_000, + 100_000, + 1_000_000, + 10_000_000, + 100_000_000, + 1_000_000_000, +} + +var syscallAggregateLatencyBucketUpperNs = [8]uint64{ + 1_000, + 10_000, + 100_000, + 1_000_000, + 10_000_000, + 100_000_000, + 1_000_000_000, + 0, +} + type rawSyscallAggregate struct { Count uint64 Errors uint64 @@ -30,7 +51,27 @@ type rawSyscallAggregate struct { } type syscallAggregateConsumer struct { - aggregateMap *bpf.BPFMap + aggregateMap syscallAggregateMap + last map[types.TraceId]rawSyscallAggregate +} + +type syscallAggregateMap interface { + Iterator() syscallAggregateIterator + GetValue(unsafe.Pointer) ([]byte, error) +} + +type syscallAggregateIterator interface { + Next() bool + Key() []byte + Err() error +} + +type bpfSyscallAggregateMap struct { + *bpf.BPFMap +} + +func (m bpfSyscallAggregateMap) Iterator() syscallAggregateIterator { + return m.BPFMap.Iterator() } func newSyscallAggregateConsumer(module *bpf.Module) (*syscallAggregateConsumer, error) { @@ -41,7 +82,10 @@ func newSyscallAggregateConsumer(module *bpf.Module) (*syscallAggregateConsumer, if err != nil { return nil, fmt.Errorf("get %s: %w", syscallAggregateMapName, err) } - return &syscallAggregateConsumer{aggregateMap: aggregateMap}, nil + return &syscallAggregateConsumer{ + aggregateMap: bpfSyscallAggregateMap{BPFMap: aggregateMap}, + last: make(map[types.TraceId]rawSyscallAggregate), + }, nil } func (c *syscallAggregateConsumer) Drain() ([]statsengine.SyscallAggregate, error) { @@ -57,25 +101,28 @@ func (c *syscallAggregateConsumer) Drain() ([]statsengine.SyscallAggregate, erro continue } key := binary.LittleEndian.Uint32(keyRaw) - valueRaw, err := c.aggregateMap.GetValueAndDeleteKey(unsafe.Pointer(&key)) + valueRaw, err := c.aggregateMap.GetValue(unsafe.Pointer(&key)) if err != nil { - if errors.Is(err, syscall.ENOENT) { - continue - } return nil, fmt.Errorf("drain aggregate for trace id %d: %w", key, err) } - raw, err := decodeRawSyscallAggregate(valueRaw) + raw, err := decodeRawSyscallAggregateValue(valueRaw) if err != nil { return nil, fmt.Errorf("decode aggregate for trace id %d: %w", key, err) } + traceID := types.TraceId(key) + delta := raw.diff(c.last[traceID]) + c.last[traceID] = raw + if delta.Count == 0 { + continue + } rows = append(rows, statsengine.SyscallAggregate{ - TraceID: types.TraceId(key), - Count: raw.Count, - Errors: raw.Errors, - TotalLatencyNs: raw.TotalDuration, - MinLatencyNs: raw.MinDuration, - MaxLatencyNs: raw.MaxDuration, - LatencyHistogramNs: raw.Histogram, + TraceID: traceID, + Count: delta.Count, + Errors: delta.Errors, + TotalLatencyNs: delta.TotalDuration, + MinLatencyNs: delta.MinDuration, + MaxLatencyNs: delta.MaxDuration, + LatencyHistogramNs: delta.Histogram, }) } if err := iter.Err(); err != nil { @@ -96,6 +143,130 @@ func decodeRawSyscallAggregate(raw []byte) (rawSyscallAggregate, error) { return out, nil } +func decodeRawSyscallAggregateValue(raw []byte) (rawSyscallAggregate, error) { + var sample rawSyscallAggregate + valueSize := binary.Size(sample) + if len(raw) == valueSize { + return decodeRawSyscallAggregate(raw) + } + return decodeRawSyscallAggregatePerCPU(raw) +} + +func decodeRawSyscallAggregatePerCPU(raw []byte) (rawSyscallAggregate, error) { + var sample rawSyscallAggregate + valueSize := binary.Size(sample) + stride := roundUp(valueSize, 8) + if valueSize <= 0 || stride <= 0 || len(raw) == 0 || len(raw)%stride != 0 { + return rawSyscallAggregate{}, fmt.Errorf("invalid per-cpu aggregate value size %d (element size %d, stride %d)", len(raw), valueSize, stride) + } + + var out rawSyscallAggregate + for offset := 0; offset < len(raw); offset += stride { + next, err := decodeRawSyscallAggregate(raw[offset : offset+valueSize]) + if err != nil { + return rawSyscallAggregate{}, err + } + out = out.add(next) + } + return out, nil +} + +func roundUp(value, alignment int) int { + if alignment <= 0 { + return value + } + remainder := value % alignment + if remainder == 0 { + return value + } + return value + alignment - remainder +} + +func (r rawSyscallAggregate) add(next rawSyscallAggregate) rawSyscallAggregate { + if next.Count == 0 { + return r + } + if r.Count == 0 || next.MinDuration < r.MinDuration { + r.MinDuration = next.MinDuration + } + if next.MaxDuration > r.MaxDuration { + r.MaxDuration = next.MaxDuration + } + r.Count += next.Count + r.Errors += next.Errors + r.TotalDuration += next.TotalDuration + for i, value := range next.Histogram { + r.Histogram[i] += value + } + return r +} + +func (r rawSyscallAggregate) diff(prev rawSyscallAggregate) rawSyscallAggregate { + if r.Count < prev.Count || prev.Count == 0 { + return r + } + histogram := diffHistogram(r.Histogram, prev.Histogram) + minLatency, maxLatency, ok := latencyExtremaFromHistogram(histogram) + if !ok { + minLatency = r.MinDuration + maxLatency = r.MaxDuration + } + if r.MinDuration < prev.MinDuration { + minLatency = r.MinDuration + } + if r.MaxDuration > prev.MaxDuration { + maxLatency = r.MaxDuration + } + return rawSyscallAggregate{ + Count: r.Count - prev.Count, + Errors: subtractU64(r.Errors, prev.Errors), + TotalDuration: subtractU64(r.TotalDuration, prev.TotalDuration), + MinDuration: minLatency, + MaxDuration: maxLatency, + Histogram: histogram, + } +} + +func latencyExtremaFromHistogram(histogram [8]uint64) (minLatency uint64, maxLatency uint64, ok bool) { + minIndex := -1 + maxIndex := -1 + for i, count := range histogram { + if count == 0 { + continue + } + if minIndex < 0 { + minIndex = i + } + maxIndex = i + } + if minIndex < 0 { + return 0, 0, false + } + minLatency = syscallAggregateLatencyBucketLowerNs[minIndex] + maxLatency = syscallAggregateLatencyBucketUpperNs[maxIndex] + if maxLatency == 0 { + maxLatency = syscallAggregateLatencyBucketLowerNs[maxIndex] + } else { + maxLatency-- + } + return minLatency, maxLatency, true +} + +func diffHistogram(current, previous [8]uint64) [8]uint64 { + var out [8]uint64 + for i, value := range current { + out[i] = subtractU64(value, previous[i]) + } + return out +} + +func subtractU64(current, previous uint64) uint64 { + if current < previous { + return 0 + } + return current - previous +} + func applySyscallSamplingRates(cfg flags.Config, module *bpf.Module) error { samplingMap, err := module.GetMap(syscallSamplingRateMapName) if err != nil { diff --git a/internal/syscall_aggregate_consumer_test.go b/internal/syscall_aggregate_consumer_test.go index 362dfba..f8ef217 100644 --- a/internal/syscall_aggregate_consumer_test.go +++ b/internal/syscall_aggregate_consumer_test.go @@ -3,9 +3,12 @@ package internal import ( "bytes" "encoding/binary" + "fmt" "testing" + "unsafe" "ior/internal/flags" + "ior/internal/statsengine" "ior/internal/types" ) @@ -69,3 +72,280 @@ func TestDecodeRawSyscallAggregateRejectsBadSize(t *testing.T) { t.Fatal("expected error for short value") } } + +func TestDecodeRawSyscallAggregatePerCPUSumsActiveCPUs(t *testing.T) { + raw := encodeRawAggregates(t, + rawSyscallAggregate{ + Count: 2, + Errors: 1, + TotalDuration: 30, + MinDuration: 10, + MaxDuration: 20, + Histogram: [8]uint64{1, 0, 1}, + }, + rawSyscallAggregate{}, + rawSyscallAggregate{ + Count: 3, + Errors: 0, + TotalDuration: 90, + MinDuration: 5, + MaxDuration: 50, + Histogram: [8]uint64{0, 2, 1}, + }, + ) + + got, err := decodeRawSyscallAggregatePerCPU(raw) + if err != nil { + t.Fatalf("decodeRawSyscallAggregatePerCPU error: %v", err) + } + + want := rawSyscallAggregate{ + Count: 5, + Errors: 1, + TotalDuration: 120, + MinDuration: 5, + MaxDuration: 50, + Histogram: [8]uint64{1, 2, 2}, + } + if got != want { + t.Fatalf("per-cpu aggregate = %+v, want %+v", got, want) + } +} + +func TestDecodeRawSyscallAggregatePerCPURejectsBadSize(t *testing.T) { + raw := encodeRawAggregates(t, rawSyscallAggregate{Count: 1}) + raw = append(raw, 0) + + if _, err := decodeRawSyscallAggregatePerCPU(raw); err == nil { + t.Fatal("expected error for non-stride-aligned value") + } +} + +func TestDecodeRawSyscallAggregatePerCPURejectsEmptyValue(t *testing.T) { + if _, err := decodeRawSyscallAggregatePerCPU(nil); err == nil { + t.Fatal("expected error for empty per-cpu value") + } +} + +func TestSyscallAggregateConsumerDrainEmitsDeltas(t *testing.T) { + const traceID = uint32(types.SYS_ENTER_FUTEX) + fakeMap := newFakeSyscallAggregateMap(traceID, encodeRawAggregates(t, + rawSyscallAggregate{ + Count: 2, + Errors: 1, + TotalDuration: 30, + MinDuration: 10, + MaxDuration: 20, + Histogram: [8]uint64{1, 0, 1}, + }, + rawSyscallAggregate{ + Count: 3, + TotalDuration: 90, + MinDuration: 5, + MaxDuration: 50, + Histogram: [8]uint64{0, 2, 1}, + }, + )) + consumer := &syscallAggregateConsumer{ + aggregateMap: fakeMap, + last: make(map[types.TraceId]rawSyscallAggregate), + } + + rows, err := consumer.Drain() + if err != nil { + t.Fatalf("first Drain error: %v", err) + } + assertAggregateRows(t, rows, statsengine.SyscallAggregate{ + TraceID: types.TraceId(traceID), + Count: 5, + Errors: 1, + TotalLatencyNs: 120, + MinLatencyNs: 5, + MaxLatencyNs: 50, + LatencyHistogramNs: [8]uint64{ + 1, 2, 2, + }, + }) + + fakeMap.values[traceID] = encodeRawAggregates(t, + rawSyscallAggregate{ + Count: 4, + Errors: 2, + TotalDuration: 80, + MinDuration: 4, + MaxDuration: 40, + Histogram: [8]uint64{2, 1, 1}, + }, + rawSyscallAggregate{ + Count: 3, + TotalDuration: 110, + MinDuration: 5, + MaxDuration: 70, + Histogram: [8]uint64{0, 2, 1, 1}, + }, + ) + rows, err = consumer.Drain() + if err != nil { + t.Fatalf("second Drain error: %v", err) + } + assertAggregateRows(t, rows, statsengine.SyscallAggregate{ + TraceID: types.TraceId(traceID), + Count: 2, + Errors: 1, + TotalLatencyNs: 70, + MinLatencyNs: 4, + MaxLatencyNs: 70, + LatencyHistogramNs: [8]uint64{ + 1, 1, 0, 1, + }, + }) + + fakeMap.values[traceID] = encodeRawAggregates(t, + rawSyscallAggregate{ + Count: 5, + Errors: 2, + TotalDuration: 100, + MinDuration: 4, + MaxDuration: 40, + Histogram: [8]uint64{3, 1, 1}, + }, + rawSyscallAggregate{ + Count: 3, + TotalDuration: 110, + MinDuration: 5, + MaxDuration: 70, + Histogram: [8]uint64{0, 2, 1, 1}, + }, + ) + rows, err = consumer.Drain() + if err != nil { + t.Fatalf("third Drain error: %v", err) + } + assertAggregateRows(t, rows, statsengine.SyscallAggregate{ + TraceID: types.TraceId(traceID), + Count: 1, + Errors: 0, + TotalLatencyNs: 20, + MinLatencyNs: 0, + MaxLatencyNs: 999, + LatencyHistogramNs: [8]uint64{ + 1, + }, + }) + + rows, err = consumer.Drain() + if err != nil { + t.Fatalf("fourth Drain error: %v", err) + } + if len(rows) != 0 { + t.Fatalf("fourth Drain rows = %+v, want none for zero delta", rows) + } +} + +func TestRawSyscallAggregateDiffReturnsOnlyNewCounts(t *testing.T) { + prev := rawSyscallAggregate{ + Count: 5, + Errors: 1, + TotalDuration: 100, + MinDuration: 10, + MaxDuration: 40, + Histogram: [8]uint64{1, 2, 2}, + } + current := rawSyscallAggregate{ + Count: 9, + Errors: 3, + TotalDuration: 190, + MinDuration: 5, + MaxDuration: 80, + Histogram: [8]uint64{2, 5, 2, 1}, + } + + got := current.diff(prev) + want := rawSyscallAggregate{ + Count: 4, + Errors: 2, + TotalDuration: 90, + MinDuration: 5, + MaxDuration: 80, + Histogram: [8]uint64{1, 3, 0, 1}, + } + if got != want { + t.Fatalf("aggregate diff = %+v, want %+v", got, want) + } +} + +func encodeRawAggregates(t *testing.T, values ...rawSyscallAggregate) []byte { + t.Helper() + + var buf bytes.Buffer + for _, value := range values { + if err := binary.Write(&buf, binary.LittleEndian, value); err != nil { + t.Fatalf("binary write: %v", err) + } + } + return buf.Bytes() +} + +func assertAggregateRows(t *testing.T, got []statsengine.SyscallAggregate, want statsengine.SyscallAggregate) { + t.Helper() + + if len(got) != 1 { + t.Fatalf("Drain rows = %+v, want one row", got) + } + if got[0] != want { + t.Fatalf("Drain row = %+v, want %+v", got[0], want) + } +} + +type fakeSyscallAggregateMap struct { + keys [][]byte + values map[uint32][]byte +} + +func newFakeSyscallAggregateMap(traceID uint32, value []byte) *fakeSyscallAggregateMap { + key := make([]byte, 4) + binary.LittleEndian.PutUint32(key, traceID) + return &fakeSyscallAggregateMap{ + keys: [][]byte{key}, + values: map[uint32][]byte{ + traceID: value, + }, + } +} + +func (m *fakeSyscallAggregateMap) Iterator() syscallAggregateIterator { + return &fakeSyscallAggregateIterator{keys: m.keys} +} + +func (m *fakeSyscallAggregateMap) GetValue(keyPtr unsafe.Pointer) ([]byte, error) { + key := *(*uint32)(keyPtr) + value, ok := m.values[key] + if !ok { + return nil, fmt.Errorf("missing value for key %d", key) + } + return append([]byte(nil), value...), nil +} + +type fakeSyscallAggregateIterator struct { + keys [][]byte + next int +} + +func (i *fakeSyscallAggregateIterator) Next() bool { + if i.next >= len(i.keys) { + return false + } + i.next++ + return i.next <= len(i.keys) +} + +func (i *fakeSyscallAggregateIterator) Key() []byte { + if i.next == 0 || i.next > len(i.keys) { + return nil + } + return i.keys[i.next-1] +} + +func (i *fakeSyscallAggregateIterator) Err() error { + return nil +} |
