diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-07-03 14:13:13 +0100 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-08-13 11:37:24 +0100 |
| commit | c5a0ba7d29da7effa0ae18bffa10fc0be359b8e7 (patch) | |
| tree | de4874740a5ddeb6eb29c887f6e121c61a1f8f3c /internal/mapr/server | |
| parent | 8f9f9766cecec4a42ffb4d14ba9b7efc2ed204ad (diff) | |
bump up version to 3.0.0. can run continuous background mapreduce queries, useful for log file monitorig for example. breaking protocol change which allows to mapreduce aggreate messages containing the default field separator |. add of more unit tests. add logformat mapreduce query keyword. add set mapreduce clause support and support to evaluate built-in functions such as md5sum() and maskdigits().v3.0.0
Diffstat (limited to 'internal/mapr/server')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 68 |
1 files changed, 52 insertions, 16 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 80a464d..1028943 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -44,15 +44,24 @@ func NewAggregate(queryStr string) (*Aggregate, error) { } s := strings.Split(fqdn, ".") - parserName := config.Server.MapreduceLogFormat - if query.Table == "" { - parserName = "generic" + var parserName string + switch query.LogFormat { + case "": + parserName = config.Server.MapreduceLogFormat + if query.Table == "" { + parserName = "generic" + } + default: + parserName = query.LogFormat } - logger.Info("Creating mapr log format parser", parserName) - logParser, err := logformat.NewParser(parserName) + logger.Info("Creating log format parser", parserName) + logParser, err := logformat.NewParser(parserName, query) if err != nil { - logger.FatalExit("Could not create mapr log format parser", err) + logger.Error("Could not create log format parser. Falling back to 'generic'", err) + if logParser, err = logformat.NewParser("generic", query); err != nil { + logger.FatalExit("Could not create log format parser", err) + } } ctx, cancel := context.WithCancel(context.Background()) @@ -76,6 +85,12 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { defer a.cancel() fieldsCh := a.linesToFields(ctx) + + // Add fields (e.g. via 'set' clause) + if len(a.query.Set) > 0 { + fieldsCh = a.addMoreFields(ctx, fieldsCh) + } + go a.fieldsToMaprLines(ctx, fieldsCh, maprLines) a.periodicAggregateTimer(ctx) } @@ -99,10 +114,10 @@ func (a *Aggregate) periodicAggregateTimer(ctx context.Context) { } func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string { - fieldsCh := make(chan map[string]string) + ch := make(chan map[string]string) go func() { - defer close(fieldsCh) + defer close(ch) for { select { @@ -113,6 +128,7 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string maprLine := strings.TrimSpace(string(line.Content)) fields, err := a.parser.MakeFields(maprLine) + logger.Debug(fields, err) if err != nil { logger.Error(err) @@ -123,7 +139,7 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string } select { - case fieldsCh <- fields: + case ch <- fields: case <-ctx.Done(): } case <-ctx.Done(): @@ -134,7 +150,33 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string } }() - return fieldsCh + return ch +} + +func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { + ch := make(chan map[string]string) + + go func() { + defer close(ch) + + for { + // fieldsCh will be closed via 'linesToFields' if ctx is done + fields, ok := <-fieldsCh + if !ok { + return + } + if err := a.query.SetClause(fields); err != nil { + logger.Error(err) + } + + select { + case ch <- fields: + case <-ctx.Done(): + } + } + }() + + return ch } func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { @@ -192,12 +234,6 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { var addedSample bool for _, sc := range a.query.Select { if val, ok := fields[sc.Field]; ok { - /* - if sc.Field == "$line" { - // Complete log line as to arrive untouched on the client side. - val = base64.StdEncoding.EncodeToString([]byte(val)) - } - */ if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { logger.Error(err) continue |
