diff options
| author | Paul Buetow <paul@buetow.org> | 2021-09-08 19:10:50 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-02 12:26:29 +0300 |
| commit | 16dc57e1e1c28e9d762424e596223a980770e059 (patch) | |
| tree | ea5a7d5caa7f4de7bd3b21e57d0e18c0d8507c7d /internal/mapr/server | |
| parent | c83c9e61a08c7ea1cb528bc26dfab25b46faa866 (diff) | |
mapreduce tables are in colors now too
Diffstat (limited to 'internal/mapr/server')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 17 |
1 files changed, 10 insertions, 7 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 9106f52..a6d6bb1 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -13,6 +13,7 @@ import ( "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" + "github.com/mimecast/dtail/internal/protocol" ) // Aggregate is for aggregating mapreduce data on the DTail server side. @@ -89,7 +90,6 @@ func (a *Aggregate) Shutdown() { // Start an aggregation. func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { - myCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -109,6 +109,7 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { fieldsCh = a.addFields(myCtx, fieldsCh) } + // Periodically pre-aggregate data every a.query.Interval seconds. go a.aggregateTimer(myCtx) a.makeMaprLines(myCtx, fieldsCh, maprLines) } @@ -139,13 +140,16 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { maprLine := strings.TrimSpace(line.Content.String()) pool.RecycleBytesBuffer(line.Content) - fields, err := a.parser.MakeFields(maprLine) - logger.Debug(fields, err) + fields, err := a.parser.MakeFields(maprLine) if err != nil { - logger.Error(err) + // Should fields be ignored anyway? + if err != logformat.IgnoreFieldsErr { + logger.Error(fields, err) + } continue } + if !a.query.WhereClause(fields) { continue } @@ -170,7 +174,7 @@ func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]st defer close(ch) for { - // fieldsCh will be closed via 'makeFields' if ctx is done + // fieldsCh will be closed via 'makeFields' when ctx is done fields, ok := <-fieldsCh if !ok { return @@ -219,12 +223,11 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin } func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { - //logger.Trace("Aggregating", group, fields) var sb strings.Builder for i, field := range a.query.GroupBy { if i > 0 { - sb.WriteString(" ") + sb.WriteString(protocol.AggregateGroupKeyCombinator) } if val, ok := fields[field]; ok { sb.WriteString(val) |
