1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
package internal
import (
"context"
"fmt"
"strings"
"time"
"ior/internal/globalfilter"
"ior/internal/statsengine"
"ior/internal/types"
)
type aggregateDrainResult struct {
rows []statsengine.SyscallAggregate
warning string
}
type aggregateDrainer struct {
source syscallAggregateSource
filter func() globalfilter.Filter
aggregateOnlyTraceIDs map[types.TraceId]struct{}
}
func newAggregateDrainer(
source syscallAggregateSource,
aggregateOnlyTraceIDs map[types.TraceId]struct{},
filter func() globalfilter.Filter,
) *aggregateDrainer {
return &aggregateDrainer{
source: source,
filter: filter,
aggregateOnlyTraceIDs: aggregateOnlyTraceIDs,
}
}
func (d *aggregateDrainer) Tick() aggregateDrainResult {
if d == nil || d.source == nil {
return aggregateDrainResult{}
}
rows, err := d.source.Drain()
if err != nil {
return aggregateDrainResult{warning: fmt.Sprintf("syscall aggregate drain failed: %v", err)}
}
rows = d.filterRowsForIngest(rows)
if len(rows) == 0 {
return aggregateDrainResult{}
}
return aggregateDrainResult{rows: rows}
}
func (d *aggregateDrainer) Start(ctx context.Context, every time.Duration, handle func(aggregateDrainResult)) func() {
if d == nil || d.source == nil {
return func() {}
}
done := make(chan struct{})
stop := make(chan struct{})
go func() {
defer close(done)
ticker := time.NewTicker(every)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-stop:
return
case <-ticker.C:
handle(d.Tick())
}
}
}()
return func() {
close(stop)
<-done
handle(d.Tick())
}
}
func (d *aggregateDrainer) filterRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate {
if len(rows) == 0 {
return nil
}
if !aggregateIngestAllowedForFilter(d.currentFilter()) {
return nil
}
if len(d.aggregateOnlyTraceIDs) == 0 {
return nil
}
filtered := make([]statsengine.SyscallAggregate, 0, len(rows))
for _, row := range rows {
if _, ok := d.aggregateOnlyTraceIDs[row.TraceID]; ok {
filtered = append(filtered, row)
}
}
return filtered
}
func (d *aggregateDrainer) currentFilter() globalfilter.Filter {
if d == nil || d.filter == nil {
return globalfilter.Filter{}
}
return d.filter()
}
func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool {
if filter.ErrorsOnly {
return false
}
if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) {
return false
}
if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil {
return false
}
if filter.PID != nil {
return false
}
if filter.TID != nil {
return false
}
return true
}
func hasPattern(filter *globalfilter.StringFilter) bool {
return filter != nil && strings.TrimSpace(filter.Pattern) != ""
}
|