diff options
Diffstat (limited to 'internal/mapr/server/aggregate.go')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 68 |
1 files changed, 52 insertions, 16 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 80a464d..1028943 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -44,15 +44,24 @@ func NewAggregate(queryStr string) (*Aggregate, error) { } s := strings.Split(fqdn, ".") - parserName := config.Server.MapreduceLogFormat - if query.Table == "" { - parserName = "generic" + var parserName string + switch query.LogFormat { + case "": + parserName = config.Server.MapreduceLogFormat + if query.Table == "" { + parserName = "generic" + } + default: + parserName = query.LogFormat } - logger.Info("Creating mapr log format parser", parserName) - logParser, err := logformat.NewParser(parserName) + logger.Info("Creating log format parser", parserName) + logParser, err := logformat.NewParser(parserName, query) if err != nil { - logger.FatalExit("Could not create mapr log format parser", err) + logger.Error("Could not create log format parser. Falling back to 'generic'", err) + if logParser, err = logformat.NewParser("generic", query); err != nil { + logger.FatalExit("Could not create log format parser", err) + } } ctx, cancel := context.WithCancel(context.Background()) @@ -76,6 +85,12 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { defer a.cancel() fieldsCh := a.linesToFields(ctx) + + // Add fields (e.g. via 'set' clause) + if len(a.query.Set) > 0 { + fieldsCh = a.addMoreFields(ctx, fieldsCh) + } + go a.fieldsToMaprLines(ctx, fieldsCh, maprLines) a.periodicAggregateTimer(ctx) } @@ -99,10 +114,10 @@ func (a *Aggregate) periodicAggregateTimer(ctx context.Context) { } func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string { - fieldsCh := make(chan map[string]string) + ch := make(chan map[string]string) go func() { - defer close(fieldsCh) + defer close(ch) for { select { @@ -113,6 +128,7 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string maprLine := strings.TrimSpace(string(line.Content)) fields, err := a.parser.MakeFields(maprLine) + logger.Debug(fields, err) if err != nil { logger.Error(err) @@ -123,7 +139,7 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string } select { - case fieldsCh <- fields: + case ch <- fields: case <-ctx.Done(): } case <-ctx.Done(): @@ -134,7 +150,33 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string } }() - return fieldsCh + return ch +} + +func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { + ch := make(chan map[string]string) + + go func() { + defer close(ch) + + for { + // fieldsCh will be closed via 'linesToFields' if ctx is done + fields, ok := <-fieldsCh + if !ok { + return + } + if err := a.query.SetClause(fields); err != nil { + logger.Error(err) + } + + select { + case ch <- fields: + case <-ctx.Done(): + } + } + }() + + return ch } func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { @@ -192,12 +234,6 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { var addedSample bool for _, sc := range a.query.Select { if val, ok := fields[sc.Field]; ok { - /* - if sc.Field == "$line" { - // Complete log line as to arrive untouched on the client side. - val = base64.StdEncoding.EncodeToString([]byte(val)) - } - */ if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { logger.Error(err) continue |
