summaryrefslogtreecommitdiff
path: root/internal/mapr/server/aggregate.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/mapr/server/aggregate.go')
-rw-r--r--internal/mapr/server/aggregate.go68
1 files changed, 52 insertions, 16 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 80a464d..1028943 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -44,15 +44,24 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
}
s := strings.Split(fqdn, ".")
- parserName := config.Server.MapreduceLogFormat
- if query.Table == "" {
- parserName = "generic"
+ var parserName string
+ switch query.LogFormat {
+ case "":
+ parserName = config.Server.MapreduceLogFormat
+ if query.Table == "" {
+ parserName = "generic"
+ }
+ default:
+ parserName = query.LogFormat
}
- logger.Info("Creating mapr log format parser", parserName)
- logParser, err := logformat.NewParser(parserName)
+ logger.Info("Creating log format parser", parserName)
+ logParser, err := logformat.NewParser(parserName, query)
if err != nil {
- logger.FatalExit("Could not create mapr log format parser", err)
+ logger.Error("Could not create log format parser. Falling back to 'generic'", err)
+ if logParser, err = logformat.NewParser("generic", query); err != nil {
+ logger.FatalExit("Could not create log format parser", err)
+ }
}
ctx, cancel := context.WithCancel(context.Background())
@@ -76,6 +85,12 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
defer a.cancel()
fieldsCh := a.linesToFields(ctx)
+
+ // Add fields (e.g. via 'set' clause)
+ if len(a.query.Set) > 0 {
+ fieldsCh = a.addMoreFields(ctx, fieldsCh)
+ }
+
go a.fieldsToMaprLines(ctx, fieldsCh, maprLines)
a.periodicAggregateTimer(ctx)
}
@@ -99,10 +114,10 @@ func (a *Aggregate) periodicAggregateTimer(ctx context.Context) {
}
func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string {
- fieldsCh := make(chan map[string]string)
+ ch := make(chan map[string]string)
go func() {
- defer close(fieldsCh)
+ defer close(ch)
for {
select {
@@ -113,6 +128,7 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string
maprLine := strings.TrimSpace(string(line.Content))
fields, err := a.parser.MakeFields(maprLine)
+ logger.Debug(fields, err)
if err != nil {
logger.Error(err)
@@ -123,7 +139,7 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string
}
select {
- case fieldsCh <- fields:
+ case ch <- fields:
case <-ctx.Done():
}
case <-ctx.Done():
@@ -134,7 +150,33 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string
}
}()
- return fieldsCh
+ return ch
+}
+
+func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string {
+ ch := make(chan map[string]string)
+
+ go func() {
+ defer close(ch)
+
+ for {
+ // fieldsCh will be closed via 'linesToFields' if ctx is done
+ fields, ok := <-fieldsCh
+ if !ok {
+ return
+ }
+ if err := a.query.SetClause(fields); err != nil {
+ logger.Error(err)
+ }
+
+ select {
+ case ch <- fields:
+ case <-ctx.Done():
+ }
+ }
+ }()
+
+ return ch
}
func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) {
@@ -192,12 +234,6 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
var addedSample bool
for _, sc := range a.query.Select {
if val, ok := fields[sc.Field]; ok {
- /*
- if sc.Field == "$line" {
- // Complete log line as to arrive untouched on the client side.
- val = base64.StdEncoding.EncodeToString([]byte(val))
- }
- */
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
logger.Error(err)
continue