diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-09 21:10:29 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-10 13:36:41 +0300 |
| commit | 97747ea0f3178f7f5890512d483fdccaa82846b0 (patch) | |
| tree | 9ff1335ca26afc90e55fd6de416457e252d75a35 /internal/mapr/server/aggregate.go | |
| parent | 7a7169791a64190e1002e38bc9c04ad0d5c1ce1f (diff) | |
vetting and linting and some code restyling
Diffstat (limited to 'internal/mapr/server/aggregate.go')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 30 |
1 files changed, 12 insertions, 18 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 1f5d1c3..97fee11 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -63,16 +63,14 @@ func NewAggregate(queryStr string) (*Aggregate, error) { } } - a := Aggregate{ + return &Aggregate{ done: internal.NewDone(), NextLinesCh: make(chan chan line.Line, 10), serialize: make(chan struct{}), hostname: s[0], query: query, parser: logParser, - } - - return &a, nil + }, nil } // Shutdown the aggregation engine. @@ -95,12 +93,10 @@ func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) { }() fieldsCh := a.fieldsFromLines(myCtx) - // Add fields (e.g. via 'set' clause) if len(a.query.Set) > 0 { fieldsCh = a.setAdditionalFields(myCtx, fieldsCh) } - // Periodically pre-aggregate data every a.query.Interval seconds. go a.aggregateTimer(myCtx) a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages) @@ -147,17 +143,18 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin maprLine := strings.TrimSpace(line.Content.String()) fields, err := a.parser.MakeFields(maprLine) - // Can not recycle here for some rason. + // Can't recycle it here yet, as field slices are still + // TODO: Add unit test reading from multiple mapreduce files lines. + // TODO: Add capability to recycle this bytes buffer. //pool.RecycleBytesBuffer(line.Content) if err != nil { // Should fields be ignored anyway? - if err != logformat.IgnoreFieldsErr { + if err != logformat.ErrIgnoreFields { dlog.Common.Error(fields, err) } continue } - if !a.query.WhereClause(fields) { continue } @@ -175,12 +172,12 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin return fieldsCh } -func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { - newFieldsCh := make(chan map[string]string) +func (a *Aggregate) setAdditionalFields(ctx context.Context, + fieldsCh <-chan map[string]string) <-chan map[string]string { + newFieldsCh := make(chan map[string]string) go func() { defer close(newFieldsCh) - for { fields, ok := <-fieldsCh if !ok { @@ -196,19 +193,18 @@ func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map } } }() - return newFieldsCh } -func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan map[string]string, maprMessages chan<- string) { - group := mapr.NewGroupSet() +func (a *Aggregate) aggregateAndSerialize(ctx context.Context, + fieldsCh <-chan map[string]string, maprMessages chan<- string) { + group := mapr.NewGroupSet() serialize := func() { dlog.Common.Info("Serializing mapreduce result") group.Serialize(ctx, maprMessages) group = mapr.NewGroupSet() } - for { select { case fields, ok := <-fieldsCh: @@ -227,7 +223,6 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan m func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { var sb strings.Builder - for i, field := range a.query.GroupBy { if i > 0 { sb.WriteString(protocol.AggregateGroupKeyCombinator) @@ -254,7 +249,6 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { set.Samples++ return } - dlog.Common.Trace("Aggregated data locally without adding new samples") } |
