summaryrefslogtreecommitdiff
path: root/internal/mapr/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-08 19:10:50 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commit16dc57e1e1c28e9d762424e596223a980770e059 (patch)
treeea5a7d5caa7f4de7bd3b21e57d0e18c0d8507c7d /internal/mapr/server
parentc83c9e61a08c7ea1cb528bc26dfab25b46faa866 (diff)
mapreduce tables are in colors now too
Diffstat (limited to 'internal/mapr/server')
-rw-r--r--internal/mapr/server/aggregate.go17
1 files changed, 10 insertions, 7 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 9106f52..a6d6bb1 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -13,6 +13,7 @@ import (
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
+ "github.com/mimecast/dtail/internal/protocol"
)
// Aggregate is for aggregating mapreduce data on the DTail server side.
@@ -89,7 +90,6 @@ func (a *Aggregate) Shutdown() {
// Start an aggregation.
func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
-
myCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -109,6 +109,7 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
fieldsCh = a.addFields(myCtx, fieldsCh)
}
+ // Periodically pre-aggregate data every a.query.Interval seconds.
go a.aggregateTimer(myCtx)
a.makeMaprLines(myCtx, fieldsCh, maprLines)
}
@@ -139,13 +140,16 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
maprLine := strings.TrimSpace(line.Content.String())
pool.RecycleBytesBuffer(line.Content)
- fields, err := a.parser.MakeFields(maprLine)
- logger.Debug(fields, err)
+ fields, err := a.parser.MakeFields(maprLine)
if err != nil {
- logger.Error(err)
+ // Should fields be ignored anyway?
+ if err != logformat.IgnoreFieldsErr {
+ logger.Error(fields, err)
+ }
continue
}
+
if !a.query.WhereClause(fields) {
continue
}
@@ -170,7 +174,7 @@ func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]st
defer close(ch)
for {
- // fieldsCh will be closed via 'makeFields' if ctx is done
+ // fieldsCh will be closed via 'makeFields' when ctx is done
fields, ok := <-fieldsCh
if !ok {
return
@@ -219,12 +223,11 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin
}
func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
- //logger.Trace("Aggregating", group, fields)
var sb strings.Builder
for i, field := range a.query.GroupBy {
if i > 0 {
- sb.WriteString(" ")
+ sb.WriteString(protocol.AggregateGroupKeyCombinator)
}
if val, ok := fields[field]; ok {
sb.WriteString(val)