summaryrefslogtreecommitdiff
path: root/internal/mapr/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-09 21:10:29 +0300
committerPaul Buetow <paul@buetow.org>2021-10-10 13:36:41 +0300
commit97747ea0f3178f7f5890512d483fdccaa82846b0 (patch)
tree9ff1335ca26afc90e55fd6de416457e252d75a35 /internal/mapr/server
parent7a7169791a64190e1002e38bc9c04ad0d5c1ce1f (diff)
vetting and linting and some code restyling
Diffstat (limited to 'internal/mapr/server')
-rw-r--r--internal/mapr/server/aggregate.go30
1 files changed, 12 insertions, 18 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 1f5d1c3..97fee11 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -63,16 +63,14 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
}
}
- a := Aggregate{
+ return &Aggregate{
done: internal.NewDone(),
NextLinesCh: make(chan chan line.Line, 10),
serialize: make(chan struct{}),
hostname: s[0],
query: query,
parser: logParser,
- }
-
- return &a, nil
+ }, nil
}
// Shutdown the aggregation engine.
@@ -95,12 +93,10 @@ func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) {
}()
fieldsCh := a.fieldsFromLines(myCtx)
-
// Add fields (e.g. via 'set' clause)
if len(a.query.Set) > 0 {
fieldsCh = a.setAdditionalFields(myCtx, fieldsCh)
}
-
// Periodically pre-aggregate data every a.query.Interval seconds.
go a.aggregateTimer(myCtx)
a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages)
@@ -147,17 +143,18 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
maprLine := strings.TrimSpace(line.Content.String())
fields, err := a.parser.MakeFields(maprLine)
- // Can not recycle here for some rason.
+ // Can't recycle it here yet, as field slices are still
+ // TODO: Add unit test reading from multiple mapreduce files lines.
+ // TODO: Add capability to recycle this bytes buffer.
//pool.RecycleBytesBuffer(line.Content)
if err != nil {
// Should fields be ignored anyway?
- if err != logformat.IgnoreFieldsErr {
+ if err != logformat.ErrIgnoreFields {
dlog.Common.Error(fields, err)
}
continue
}
-
if !a.query.WhereClause(fields) {
continue
}
@@ -175,12 +172,12 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
return fieldsCh
}
-func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string {
- newFieldsCh := make(chan map[string]string)
+func (a *Aggregate) setAdditionalFields(ctx context.Context,
+ fieldsCh <-chan map[string]string) <-chan map[string]string {
+ newFieldsCh := make(chan map[string]string)
go func() {
defer close(newFieldsCh)
-
for {
fields, ok := <-fieldsCh
if !ok {
@@ -196,19 +193,18 @@ func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map
}
}
}()
-
return newFieldsCh
}
-func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan map[string]string, maprMessages chan<- string) {
- group := mapr.NewGroupSet()
+func (a *Aggregate) aggregateAndSerialize(ctx context.Context,
+ fieldsCh <-chan map[string]string, maprMessages chan<- string) {
+ group := mapr.NewGroupSet()
serialize := func() {
dlog.Common.Info("Serializing mapreduce result")
group.Serialize(ctx, maprMessages)
group = mapr.NewGroupSet()
}
-
for {
select {
case fields, ok := <-fieldsCh:
@@ -227,7 +223,6 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan m
func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
var sb strings.Builder
-
for i, field := range a.query.GroupBy {
if i > 0 {
sb.WriteString(protocol.AggregateGroupKeyCombinator)
@@ -254,7 +249,6 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
set.Samples++
return
}
-
dlog.Common.Trace("Aggregated data locally without adding new samples")
}