summaryrefslogtreecommitdiff
path: root/internal/mapr/client/aggregate.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/mapr/client/aggregate.go')
-rw-r--r--internal/mapr/client/aggregate.go29
1 files changed, 19 insertions, 10 deletions
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 10b34d4..02a6a5a 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -1,11 +1,13 @@
package client
import (
+ "fmt"
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
+ "github.com/mimecast/dtail/internal/protocol"
)
// Aggregate mapreduce data on the DTail client side.
@@ -21,7 +23,9 @@ type Aggregate struct {
}
// NewAggregate create new client aggregator.
-func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate {
+func NewAggregate(server string, query *mapr.Query,
+ globalGroup *mapr.GlobalGroupSet) *Aggregate {
+
return &Aggregate{
query: query,
group: mapr.NewGroupSet(),
@@ -31,20 +35,26 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou
}
// Aggregate data from mapr log line into local (and global) group sets.
-func (a *Aggregate) Aggregate(parts []string) {
+func (a *Aggregate) Aggregate(message string) error {
+ parts := strings.Split(message, protocol.AggregateDelimiter)
+ if len(parts) < 4 {
+ return fmt.Errorf("aggregate message without any real data")
+ }
+
groupKey := parts[0]
samples, err := strconv.Atoi(parts[1])
if err != nil {
- logger.FatalExit("Unable to parse sample count", parts[1], err, parts)
+ return fmt.Errorf("unable to parse sample count '%s': %v", parts[1], err)
}
+
fields := a.makeFields(parts[2:])
set := a.group.GetSet(groupKey)
-
var addedSamples bool
+
for _, sc := range a.query.Select {
if val, ok := fields[sc.FieldStorage]; ok {
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
continue
}
addedSamples = true
@@ -63,19 +73,18 @@ func (a *Aggregate) Aggregate(parts []string) {
// Re-init local group (make it empty again).
a.group.InitSet()
}
+ return nil
}
// Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"].
func (a *Aggregate) makeFields(parts []string) map[string]string {
fields := make(map[string]string, len(parts))
-
for _, part := range parts {
- kv := strings.SplitN(part, "=", 2)
- if len(kv) < 2 {
+ kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2)
+ if len(kv) != 2 {
continue
}
fields[kv[0]] = kv[1]
}
-
return fields
}