summaryrefslogtreecommitdiff
path: root/internal/mapr
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-08 19:10:50 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commit16dc57e1e1c28e9d762424e596223a980770e059 (patch)
treeea5a7d5caa7f4de7bd3b21e57d0e18c0d8507c7d /internal/mapr
parentc83c9e61a08c7ea1cb528bc26dfab25b46faa866 (diff)
mapreduce tables are in colors now too
Diffstat (limited to 'internal/mapr')
-rw-r--r--internal/mapr/aggregateset.go17
-rw-r--r--internal/mapr/client/aggregate.go18
-rw-r--r--internal/mapr/globalgroupset.go1
-rw-r--r--internal/mapr/groupset.go258
-rw-r--r--internal/mapr/logformat/default.go28
-rw-r--r--internal/mapr/logformat/default_test.go15
-rw-r--r--internal/mapr/logformat/generickv.go31
-rw-r--r--internal/mapr/logformat/parser.go2
-rw-r--r--internal/mapr/server/aggregate.go17
9 files changed, 272 insertions, 115 deletions
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go
index 029d87b..47f4925 100644
--- a/internal/mapr/aggregateset.go
+++ b/internal/mapr/aggregateset.go
@@ -6,6 +6,8 @@ import (
"strconv"
"strings"
+ "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -68,22 +70,25 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
// Serialize the aggregate set so it can be sent over the wire.
func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) {
- //logger.Trace("Serialising mapr.AggregateSet", s)
- var sb strings.Builder
+ logger.Trace("Serialising mapr.AggregateSet", s)
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
sb.WriteString(groupKey)
sb.WriteString(protocol.AggregateDelimiter)
- sb.WriteString(fmt.Sprintf("%d%s", s.Samples, protocol.AggregateDelimiter))
+ sb.WriteString(fmt.Sprintf("%d", s.Samples))
+ sb.WriteString(protocol.AggregateDelimiter)
for k, v := range s.FValues {
sb.WriteString(k)
- sb.WriteString("=")
- sb.WriteString(fmt.Sprintf("%v%s", v, protocol.AggregateDelimiter))
+ sb.WriteString(protocol.AggregateKVDelimiter)
+ sb.WriteString(fmt.Sprintf("%v", v))
+ sb.WriteString(protocol.AggregateDelimiter)
}
for k, v := range s.SValues {
sb.WriteString(k)
- sb.WriteString("=")
+ sb.WriteString(protocol.AggregateKVDelimiter)
sb.WriteString(v)
sb.WriteString(protocol.AggregateDelimiter)
}
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 10b34d4..5cc09a1 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -1,11 +1,13 @@
package client
import (
+ "fmt"
"strconv"
"strings"
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
+ "github.com/mimecast/dtail/internal/protocol"
)
// Aggregate mapreduce data on the DTail client side.
@@ -31,12 +33,18 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou
}
// Aggregate data from mapr log line into local (and global) group sets.
-func (a *Aggregate) Aggregate(parts []string) {
+func (a *Aggregate) Aggregate(message string) error {
+ parts := strings.Split(message, protocol.AggregateDelimiter)
+ if len(parts) < 4 {
+ return fmt.Errorf("aggregate message without any real data")
+ }
+
groupKey := parts[0]
samples, err := strconv.Atoi(parts[1])
if err != nil {
- logger.FatalExit("Unable to parse sample count", parts[1], err, parts)
+ return fmt.Errorf("unable to parse sample count '%s': %v", parts[1], err)
}
+
fields := a.makeFields(parts[2:])
set := a.group.GetSet(groupKey)
@@ -63,6 +71,8 @@ func (a *Aggregate) Aggregate(parts []string) {
// Re-init local group (make it empty again).
a.group.InitSet()
}
+
+ return nil
}
// Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"].
@@ -70,8 +80,8 @@ func (a *Aggregate) makeFields(parts []string) map[string]string {
fields := make(map[string]string, len(parts))
for _, part := range parts {
- kv := strings.SplitN(part, "=", 2)
- if len(kv) < 2 {
+ kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2)
+ if len(kv) != 2 {
continue
}
fields[kv[0]] = kv[1]
diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go
index cfab506..21bf990 100644
--- a/internal/mapr/globalgroupset.go
+++ b/internal/mapr/globalgroupset.go
@@ -48,6 +48,7 @@ func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, erro
// Merge a group set into the global group set.
func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error {
+
for groupKey, set := range group.sets {
s := g.GetSet(groupKey)
if err := s.Merge(query, set); err != nil {
diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go
index 6ee2811..d29559a 100644
--- a/internal/mapr/groupset.go
+++ b/internal/mapr/groupset.go
@@ -4,13 +4,16 @@ import (
"context"
"errors"
"fmt"
- "io/ioutil"
"os"
"sort"
"strconv"
"strings"
+ "github.com/mimecast/dtail/internal/color"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
)
// GroupSet represents a map of aggregate sets. The group sets
@@ -22,6 +25,14 @@ type GroupSet struct {
sets map[string]*AggregateSet
}
+// Internal helper type
+type result struct {
+ groupKey string
+ values []string
+ widths []int
+ orderBy float64
+}
+
// NewGroupSet returns a new empty group set.
func NewGroupSet() *GroupSet {
g := GroupSet{}
@@ -58,17 +69,118 @@ func (g *GroupSet) Serialize(ctx context.Context, ch chan<- string) {
// Result returns a nicely formated result of the query from the group set.
func (g *GroupSet) Result(query *Query) (string, int, error) {
- return g.limitedResult(query, query.Limit, "\t", " ", false)
+ rows, widths, err := g.result(query, true)
+ if err != nil {
+ return "", 0, err
+ }
+
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
+
+ // Generate header now
+ lastIndex := len(query.Select) - 1
+ for i, sc := range query.Select {
+ format := fmt.Sprintf(" %%%ds ", widths[i])
+ str := fmt.Sprintf(format, sc.FieldStorage)
+ if config.Client.TermColorsEnable {
+ attrs := []color.Attribute{config.Client.TermColors.MaprTable.HeaderAttr}
+ if sc.FieldStorage == query.OrderBy {
+ attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderSortKeyAttr)
+ }
+ for _, groupBy := range query.GroupBy {
+ if sc.FieldStorage == groupBy {
+ attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderGroupKeyAttr)
+ break
+ }
+ }
+ color.PaintWithAttrs(sb, str,
+ config.Client.TermColors.MaprTable.HeaderFg,
+ config.Client.TermColors.MaprTable.HeaderBg,
+ attrs)
+ } else {
+ sb.WriteString(str)
+ }
+
+ if i == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+
+ for i := 0; i < len(query.Select); i++ {
+ str := fmt.Sprintf("-%s-", strings.Repeat("-", widths[i]))
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, str,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(str)
+ }
+ if i == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+
+ // And now write the data
+ for i, r := range rows {
+ if i == query.Limit {
+ break
+ }
+ for j, value := range r.values {
+ format := fmt.Sprintf(" %%%ds ", widths[j])
+ str := fmt.Sprintf(format, value)
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, str,
+ config.Client.TermColors.MaprTable.DataFg,
+ config.Client.TermColors.MaprTable.DataBg,
+ config.Client.TermColors.MaprTable.DataAttr)
+ } else {
+ sb.WriteString(str)
+ }
+
+ if j == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.DelimiterFg,
+ config.Client.TermColors.MaprTable.DelimiterBg,
+ config.Client.TermColors.MaprTable.DelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+ }
+
+ return sb.String(), len(rows), nil
}
-// WriteResult writes the result to an outfile.
+// WriteResult writes the result to an CSV outfile.
func (g *GroupSet) WriteResult(query *Query) error {
if !query.HasOutfile() {
return errors.New("No outfile specified")
}
- // -1: Don't limit the result, include all data sets
- result, _, err := g.limitedResult(query, -1, "", ",", true)
+ rows, _, err := g.result(query, false)
if err != nil {
return err
}
@@ -76,9 +188,37 @@ func (g *GroupSet) WriteResult(query *Query) error {
logger.Info("Writing outfile", query.Outfile)
tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile)
- if err := ioutil.WriteFile(tmpOutfile, []byte(result), 0644); err != nil {
+ file, err := os.Create(tmpOutfile)
+ if err != nil {
return err
}
+ defer file.Close()
+
+ // Generate header now
+ lastIndex := len(query.Select) - 1
+ for i, sc := range query.Select {
+ file.WriteString(sc.FieldStorage)
+ if i == lastIndex {
+ continue
+ }
+ file.WriteString(protocol.CSVDelimiter)
+ }
+ file.WriteString("\n")
+
+ // And now write the data
+ for i, r := range rows {
+ if i == query.Limit {
+ break
+ }
+ for j, value := range r.values {
+ file.WriteString(value)
+ if j == lastIndex {
+ continue
+ }
+ file.WriteString(protocol.CSVDelimiter)
+ }
+ file.WriteString("\n")
+ }
if err := os.Rename(tmpOutfile, query.Outfile); err != nil {
os.Remove(tmpOutfile)
@@ -88,32 +228,22 @@ func (g *GroupSet) WriteResult(query *Query) error {
return nil
}
-// Return a nicely formated result of the query from the group set.
-func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) {
- type result struct {
- groupKey string
- resultStr string
- orderBy float64
- }
+// Return a sorted result slice of the query from the group set.
+func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, error) {
+ var rows []result
+ widths := make([]int, len(query.Select))
- var resultSlice []result
+ var valueStr string
+ var value float64
for groupKey, set := range g.sets {
- var sb strings.Builder
r := result{groupKey: groupKey}
- lastIndex := len(query.Select) - 1
for i, sc := range query.Select {
- storage := sc.FieldStorage
- orderByThis := storage == query.OrderBy
-
switch sc.Operation {
case Count:
- value := set.FValues[storage]
- sb.WriteString(fmt.Sprintf("%d", int(value)))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage]
+ valueStr = fmt.Sprintf("%d", int(value))
case Len:
fallthrough
case Sum:
@@ -121,74 +251,48 @@ func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSepa
case Min:
fallthrough
case Max:
- value := set.FValues[storage]
- sb.WriteString(fmt.Sprintf("%f", value))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage]
+ valueStr = fmt.Sprintf("%f", value)
case Last:
- value := set.SValues[storage]
- if orderByThis {
- f, err := strconv.ParseFloat(value, 64)
- if err == nil {
- r.orderBy = f
- }
- }
- sb.WriteString(value)
+ valueStr = set.SValues[sc.FieldStorage]
+ value, _ = strconv.ParseFloat(valueStr, 64)
case Avg:
- value := set.FValues[storage] / float64(set.Samples)
- sb.WriteString(fmt.Sprintf("%f", value))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage] / float64(set.Samples)
+ valueStr = fmt.Sprintf("%f", value)
default:
- return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
+ return rows, widths, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
}
- if i != lastIndex {
- sb.WriteString(fieldSeparator)
+
+ if sc.FieldStorage == query.OrderBy {
+ r.orderBy = value
+ }
+ r.values = append(r.values, valueStr)
+
+ if !gatherWidths {
+ continue
+ }
+ if widths[i] < len(sc.FieldStorage) {
+ widths[i] = len(sc.FieldStorage)
+ }
+ if widths[i] < len(valueStr) {
+ widths[i] = len(valueStr)
}
}
- r.resultStr = sb.String()
- resultSlice = append(resultSlice, r)
+ rows = append(rows, r)
}
if query.OrderBy != "" {
if query.ReverseOrder {
- sort.SliceStable(resultSlice, func(i, j int) bool {
- return resultSlice[i].orderBy < resultSlice[j].orderBy
+ sort.SliceStable(rows, func(i, j int) bool {
+ return rows[i].orderBy < rows[j].orderBy
})
} else {
- sort.SliceStable(resultSlice, func(i, j int) bool {
- return resultSlice[i].orderBy > resultSlice[j].orderBy
+ sort.SliceStable(rows, func(i, j int) bool {
+ return rows[i].orderBy > rows[j].orderBy
})
}
}
- var sb strings.Builder
-
- // Write header first
- if addHeader {
- lastIndex := len(query.Select) - 1
- sb.WriteString(lineStarter)
- for i, sc := range query.Select {
- sb.WriteString(sc.FieldStorage)
- if i != lastIndex {
- sb.WriteString(fieldSeparator)
- }
- }
- sb.WriteString("\n")
- }
-
- // And now write the data
- for i, r := range resultSlice {
- if i == limit {
- break
- }
- sb.WriteString(lineStarter)
- sb.WriteString(r.resultStr)
- sb.WriteString("\n")
- }
-
- return sb.String(), len(resultSlice), nil
+ return rows, widths, nil
}
diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go
index da976bd..c67137e 100644
--- a/internal/mapr/logformat/default.go
+++ b/internal/mapr/logformat/default.go
@@ -1,16 +1,22 @@
package logformat
import (
+ "fmt"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/protocol"
)
-// MakeFieldsDEFAULT is the default log file mapreduce parser.
+// MakeFieldsDEFAULT is the default DTail log file key-value parser.
func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
splitted := strings.Split(maprLine, protocol.FieldDelimiter)
- fields := make(map[string]string, len(splitted))
+
+ if len(splitted) < 3 || !strings.HasPrefix(splitted[3], "MAPREDUCE:") || !strings.HasPrefix(splitted[0], "INFO") {
+ // Not a DTail mapreduce log line.
+ return nil, IgnoreFieldsErr
+ }
+
+ fields := make(map[string]string, len(splitted)+8)
fields["*"] = "*"
fields["$line"] = maprLine
@@ -19,20 +25,14 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
fields["$timezone"] = p.timeZoneName
fields["$timeoffset"] = p.timeZoneOffset
- kvStart := 0
- // DTail mapreduce format
- if len(splitted) > 3 && strings.HasPrefix(splitted[3], "MAPREDUCE:") {
- fields["$severity"] = splitted[0]
- // TODO: Parse time like we do at Mimecast
- fields["$time"] = splitted[1]
- kvStart = 4
- }
+ fields["$severity"] = splitted[0]
+ // TODO: Parse time like we do at Mimecast
+ fields["$time"] = splitted[1]
- for _, kv := range splitted[kvStart:] {
+ for _, kv := range splitted[4:] {
keyAndValue := strings.SplitN(kv, "=", 2)
if len(keyAndValue) != 2 {
- logger.Debug("Unable to parse key-value token, ignoring it", kv)
- continue
+ return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv)
}
fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
}
diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go
index 6284008..79911d1 100644
--- a/internal/mapr/logformat/default_test.go
+++ b/internal/mapr/logformat/default_test.go
@@ -11,8 +11,8 @@ func TestDefaultLogFormat(t *testing.T) {
}
inputs := []string{
- "foo=bar|baz=bay",
"INFO|20210907-065632|SERVER|MAPREDUCE:TEST|foo=bar|baz=bay",
+ "INFO|20210907-065732|CLIENT|MAPREDUCE:YOMAN|baz=bay|foo=bar",
}
for _, input := range inputs {
@@ -23,20 +23,21 @@ func TestDefaultLogFormat(t *testing.T) {
}
if bar, ok := fields["foo"]; !ok {
- t.Errorf("Expected field 'foo', but no such field there\n")
+ t.Errorf("Expected field 'foo', but no such field there in '%s'\n", input)
} else if bar != "bar" {
- t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar)
+ t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", bar, input)
}
if bay, ok := fields["baz"]; !ok {
- t.Errorf("Expected field 'baz', but no such field there\n")
+ t.Errorf("Expected field 'baz', but no such field there in '%s'\n", input)
} else if bay != "bay" {
- t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay)
+ t.Errorf("Expected 'bay' stored in field 'baz', but got '%s' in '%s'\n", bay, input)
}
}
- if _, err := parser.MakeFields("foo=bar|bazbay"); err == nil {
- t.Errorf("Expected error but didn't")
+ fields, err := parser.MakeFields("foozoo=bar|bazbay")
+ if _, ok := fields["foo"]; ok {
+ t.Errorf("Expected fiending field 'foo', but found it\n")
}
}
diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go
new file mode 100644
index 0000000..23d75cb
--- /dev/null
+++ b/internal/mapr/logformat/generickv.go
@@ -0,0 +1,31 @@
+package logformat
+
+import (
+ "strings"
+
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+// MakeFieldsGENERICKV is the generic key-value logfile parser.
+func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error) {
+ splitted := strings.Split(maprLine, protocol.FieldDelimiter)
+ fields := make(map[string]string, len(splitted))
+
+ fields["*"] = "*"
+ fields["$line"] = maprLine
+ fields["$empty"] = ""
+ fields["$hostname"] = p.hostname
+ fields["$timezone"] = p.timeZoneName
+ fields["$timeoffset"] = p.timeZoneOffset
+
+ for _, kv := range splitted[0:] {
+ keyAndValue := strings.SplitN(kv, "=", 2)
+ if len(keyAndValue) != 2 {
+ //logger.Debug("Unable to parse key-value token, ignoring it", kv)
+ continue
+ }
+ fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
+ }
+
+ return fields, nil
+}
diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go
index c53729a..6582b5f 100644
--- a/internal/mapr/logformat/parser.go
+++ b/internal/mapr/logformat/parser.go
@@ -12,6 +12,8 @@ import (
"github.com/mimecast/dtail/internal/mapr"
)
+var IgnoreFieldsErr error = errors.New("Ignore this field set")
+
// Parser is used to parse the mapreduce information from the server log files.
type Parser struct {
hostname string
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 9106f52..a6d6bb1 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -13,6 +13,7 @@ import (
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
+ "github.com/mimecast/dtail/internal/protocol"
)
// Aggregate is for aggregating mapreduce data on the DTail server side.
@@ -89,7 +90,6 @@ func (a *Aggregate) Shutdown() {
// Start an aggregation.
func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
-
myCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -109,6 +109,7 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
fieldsCh = a.addFields(myCtx, fieldsCh)
}
+ // Periodically pre-aggregate data every a.query.Interval seconds.
go a.aggregateTimer(myCtx)
a.makeMaprLines(myCtx, fieldsCh, maprLines)
}
@@ -139,13 +140,16 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
maprLine := strings.TrimSpace(line.Content.String())
pool.RecycleBytesBuffer(line.Content)
- fields, err := a.parser.MakeFields(maprLine)
- logger.Debug(fields, err)
+ fields, err := a.parser.MakeFields(maprLine)
if err != nil {
- logger.Error(err)
+ // Should fields be ignored anyway?
+ if err != logformat.IgnoreFieldsErr {
+ logger.Error(fields, err)
+ }
continue
}
+
if !a.query.WhereClause(fields) {
continue
}
@@ -170,7 +174,7 @@ func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]st
defer close(ch)
for {
- // fieldsCh will be closed via 'makeFields' if ctx is done
+ // fieldsCh will be closed via 'makeFields' when ctx is done
fields, ok := <-fieldsCh
if !ok {
return
@@ -219,12 +223,11 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin
}
func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
- //logger.Trace("Aggregating", group, fields)
var sb strings.Builder
for i, field := range a.query.GroupBy {
if i > 0 {
- sb.WriteString(" ")
+ sb.WriteString(protocol.AggregateGroupKeyCombinator)
}
if val, ok := fields[field]; ok {
sb.WriteString(val)