summaryrefslogtreecommitdiff
path: root/internal/eventloop_runtime.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/eventloop_runtime.go')
-rw-r--r--internal/eventloop_runtime.go82
1 files changed, 7 insertions, 75 deletions
diff --git a/internal/eventloop_runtime.go b/internal/eventloop_runtime.go
index 24214ca..c4fa6b0 100644
--- a/internal/eventloop_runtime.go
+++ b/internal/eventloop_runtime.go
@@ -4,13 +4,10 @@ import (
"context"
"fmt"
"runtime/debug"
- "strings"
"time"
"ior/internal/event"
"ior/internal/file"
- "ior/internal/globalfilter"
- "ior/internal/statsengine"
"ior/internal/types"
)
@@ -49,84 +46,19 @@ func (e *eventLoop) startAggregateDrainLoop(ctx context.Context) func() {
return func() {}
}
- done := make(chan struct{})
- stop := make(chan struct{})
- go func() {
- defer close(done)
- ticker := time.NewTicker(e.cfg.aggregateDrainEvery)
- defer ticker.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-stop:
- return
- case <-ticker.C:
- e.drainAggregatesOnce()
- }
- }
- }()
- return func() {
- close(stop)
- <-done
- e.drainAggregatesOnce()
- }
+ drainer := newAggregateDrainer(e.aggregateSrc, e.cfg.aggregateOnlyTraceIDs, e.Filter)
+ return drainer.Start(ctx, e.cfg.aggregateDrainEvery, e.handleAggregateDrainResult)
}
-func (e *eventLoop) drainAggregatesOnce() {
- rows, err := e.aggregateSrc.Drain()
- if err != nil {
- e.notifyWarning(fmt.Sprintf("syscall aggregate drain failed: %v", err))
+func (e *eventLoop) handleAggregateDrainResult(result aggregateDrainResult) {
+ if result.warning != "" {
+ e.notifyWarning(result.warning)
return
}
- rows = e.filterAggregateRowsForIngest(rows)
- if len(rows) == 0 {
+ if len(result.rows) == 0 {
return
}
- e.aggregateSink.IngestSyscallAggregates(rows)
-}
-
-func (e *eventLoop) filterAggregateRowsForIngest(rows []statsengine.SyscallAggregate) []statsengine.SyscallAggregate {
- if len(rows) == 0 {
- return nil
- }
- if !aggregateIngestAllowedForFilter(e.Filter()) {
- return nil
- }
- if len(e.cfg.aggregateOnlyTraceIDs) == 0 {
- return nil
- }
-
- filtered := make([]statsengine.SyscallAggregate, 0, len(rows))
- for _, row := range rows {
- if _, ok := e.cfg.aggregateOnlyTraceIDs[row.TraceID]; ok {
- filtered = append(filtered, row)
- }
- }
- return filtered
-}
-
-func aggregateIngestAllowedForFilter(filter globalfilter.Filter) bool {
- if filter.ErrorsOnly {
- return false
- }
- if hasPattern(filter.Syscall) || hasPattern(filter.Comm) || hasPattern(filter.File) {
- return false
- }
- if filter.FD != nil || filter.LatencyNs != nil || filter.GapNs != nil || filter.Bytes != nil || filter.RetVal != nil {
- return false
- }
- if filter.PID != nil {
- return false
- }
- if filter.TID != nil {
- return false
- }
- return true
-}
-
-func hasPattern(filter *globalfilter.StringFilter) bool {
- return filter != nil && strings.TrimSpace(filter.Pattern) != ""
+ e.aggregateSink.IngestSyscallAggregates(result.rows)
}
func (e *eventLoop) runSynchronously(ctx context.Context, rawCh <-chan []byte) {