summaryrefslogtreecommitdiff
path: root/internal/mapr/client
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-07-02 13:29:45 +0100
committerPaul Buetow <pbuetow@mimecast.com>2020-07-02 13:29:45 +0100
commit41fb325372034c7a42bf0157cdc08894d8371247 (patch)
tree57e121a4e501e99648ded9340c20d9918b563b33 /internal/mapr/client
parent490615493acc8deb0b29c19cfdcd6ca067760c4b (diff)
send $line base64 encoded over the wire
Diffstat (limited to 'internal/mapr/client')
-rw-r--r--internal/mapr/client/aggregate.go12
1 files changed, 11 insertions, 1 deletions
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 1272a19..fd3a899 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -1,6 +1,7 @@
package client
import (
+ "encoding/base64"
"strconv"
"strings"
@@ -35,7 +36,7 @@ func (a *Aggregate) Aggregate(parts []string) {
groupKey := parts[0]
samples, err := strconv.Atoi(parts[1])
if err != nil {
- logger.FatalExit(parts, err)
+ logger.FatalExit("Unable to parse sample count", parts[1], err, parts)
}
fields := a.makeFields(parts[2:])
set := a.group.GetSet(groupKey)
@@ -74,6 +75,15 @@ func (a *Aggregate) makeFields(parts []string) map[string]string {
if len(kv) != 2 {
continue
}
+ if kv[0] == "$line" {
+ decoded, err := base64.StdEncoding.DecodeString(kv[1])
+ if err != nil {
+ logger.Error("Unable to decode $line", kv[1], err)
+ continue
+ }
+ fields[kv[0]] = string(decoded)
+ continue
+ }
fields[kv[0]] = kv[1]
}