From 16dc57e1e1c28e9d762424e596223a980770e059 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 8 Sep 2021 19:10:50 +0300 Subject: mapreduce tables are in colors now too --- internal/mapr/client/aggregate.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) (limited to 'internal/mapr/client') diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 10b34d4..5cc09a1 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -1,11 +1,13 @@ package client import ( + "fmt" "strconv" "strings" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" + "github.com/mimecast/dtail/internal/protocol" ) // Aggregate mapreduce data on the DTail client side. @@ -31,12 +33,18 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou } // Aggregate data from mapr log line into local (and global) group sets. -func (a *Aggregate) Aggregate(parts []string) { +func (a *Aggregate) Aggregate(message string) error { + parts := strings.Split(message, protocol.AggregateDelimiter) + if len(parts) < 4 { + return fmt.Errorf("aggregate message without any real data") + } + groupKey := parts[0] samples, err := strconv.Atoi(parts[1]) if err != nil { - logger.FatalExit("Unable to parse sample count", parts[1], err, parts) + return fmt.Errorf("unable to parse sample count '%s': %v", parts[1], err) } + fields := a.makeFields(parts[2:]) set := a.group.GetSet(groupKey) @@ -63,6 +71,8 @@ func (a *Aggregate) Aggregate(parts []string) { // Re-init local group (make it empty again). a.group.InitSet() } + + return nil } // Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. @@ -70,8 +80,8 @@ func (a *Aggregate) makeFields(parts []string) map[string]string { fields := make(map[string]string, len(parts)) for _, part := range parts { - kv := strings.SplitN(part, "=", 2) - if len(kv) < 2 { + kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2) + if len(kv) != 2 { continue } fields[kv[0]] = kv[1] -- cgit v1.2.3 From fe3e68afd99d8ea246be52893730f987e138ec24 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 19 Sep 2021 13:22:59 +0300 Subject: move args to config package logger package rewrite as dlog --- internal/mapr/client/aggregate.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'internal/mapr/client') diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 5cc09a1..d0c1d70 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -5,7 +5,7 @@ import ( "strconv" "strings" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/protocol" ) @@ -52,7 +52,7 @@ func (a *Aggregate) Aggregate(message string) error { for _, sc := range a.query.Select { if val, ok := fields[sc.FieldStorage]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil { - logger.Error(err) + dlog.Common.Error(err) continue } addedSamples = true -- cgit v1.2.3 From 97747ea0f3178f7f5890512d483fdccaa82846b0 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 21:10:29 +0300 Subject: vetting and linting and some code restyling --- internal/mapr/client/aggregate.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'internal/mapr/client') diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index d0c1d70..02a6a5a 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -23,7 +23,9 @@ type Aggregate struct { } // NewAggregate create new client aggregator. -func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate { +func NewAggregate(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *Aggregate { + return &Aggregate{ query: query, group: mapr.NewGroupSet(), @@ -47,8 +49,8 @@ func (a *Aggregate) Aggregate(message string) error { fields := a.makeFields(parts[2:]) set := a.group.GetSet(groupKey) - var addedSamples bool + for _, sc := range a.query.Select { if val, ok := fields[sc.FieldStorage]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil { @@ -71,14 +73,12 @@ func (a *Aggregate) Aggregate(message string) error { // Re-init local group (make it empty again). a.group.InitSet() } - return nil } // Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. func (a *Aggregate) makeFields(parts []string) map[string]string { fields := make(map[string]string, len(parts)) - for _, part := range parts { kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2) if len(kv) != 2 { @@ -86,6 +86,5 @@ func (a *Aggregate) makeFields(parts []string) map[string]string { } fields[kv[0]] = kv[1] } - return fields } -- cgit v1.2.3