summaryrefslogtreecommitdiff
path: root/internal/aggregate_drainer.go
blob: 87c5e1adbefc38e9e694e1f2058c4fb29eb2fe38 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package internal

import (
	"context"
	"fmt"
	"strings"
	"time"

	"ior/internal/globalfilter"
	"ior/internal/statsengine"
	"ior/internal/types"
)

type aggregateDrainResult struct {
	rows    []statsengine.SyscallAggregate
	warning string
}

type aggregateDrainer struct {
	source                syscallAggregateSource
	filter                func() globalfilter.Filter
	aggregateOnlyTraceIDs map[types.TraceId]struct{}
}

func newAggregateDrainer(
	source syscallAggregateSource,
	aggregateOnlyTraceIDs map[types.TraceId]struct{},
	filter func() globalfilter.Filter,
) *aggregateDrainer {
	return &aggregateDrainer{
		source:                source,
		filter:                filter,
		aggregateOnlyTraceIDs: aggregateOnlyTraceIDs,
	}
}

func (d *aggregateDrainer) Tick() aggregateDrainResult {
	if d == nil || d.source == nil {
		return aggregateDrainResult{}
	}
	rows, err := d.source.Drain()
	if err != nil {
		return aggregateDrainResult{warning: fmt.Sprintf("syscall aggregate drain failed: %v", err)}
	}
	rows = d.filterRowsForIngest(rows)
	if len(rows) == 0 {
		return aggregateDrainResult{}
	}
	return aggregateDrainResult{rows: rows}
}

func (d *aggregateDrainer) Start(ctx context.Context, every time.Duration, handle func(aggregateDrainResult)) func() {
	if d == nil || d.source == nil {
		return func() {}
	}

	done := make(chan struct{})
	stop := make(chan struct{})
	go func() {
		defer close(done)
		ticker := time.NewTicker(every)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case <-stop:
				return
			case <-ticker.C:
				handle(d.Tick())
			}
		}
	}()
	return func() {
		close(stop)
		<-done
		handle(d.Tick())
	}
}

func (d *aggregateDrainer) filterRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate {
	if len(rows) == 0 {
		return nil
	}
	if !aggregateIngestAllowedForFilter(d.currentFilter()) {
		return nil
	}
	if len(d.aggregateOnlyTraceIDs) == 0 {
		return nil
	}

	filtered := make([]statsengine.SyscallAggregate, 0, len(rows))
	for _, row := range rows {
		if _, ok := d.aggregateOnlyTraceIDs[row.TraceID]; ok {
			filtered = append(filtered, row)
		}
	}
	return filtered
}

func (d *aggregateDrainer) currentFilter() globalfilter.Filter {
	if d == nil || d.filter == nil {
		return globalfilter.Filter{}
	}
	return d.filter()
}

func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool {
	if filter.ErrorsOnly {
		return false
	}
	if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) {
		return false
	}
	if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil {
		return false
	}
	if filter.PID != nil {
		return false
	}
	if filter.TID != nil {
		return false
	}
	return true
}

func hasPattern(filter *globalfilter.StringFilter) bool {
	return filter != nil && strings.TrimSpace(filter.Pattern) != ""
}