summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-07-02 13:29:45 +0100
committerPaul Buetow <pbuetow@mimecast.com>2020-07-02 13:29:45 +0100
commit41fb325372034c7a42bf0157cdc08894d8371247 (patch)
tree57e121a4e501e99648ded9340c20d9918b563b33
parent490615493acc8deb0b29c19cfdcd6ca067760c4b (diff)
send $line base64 encoded over the wire
-rw-r--r--internal/clients/handlers/maprhandler.go5
-rw-r--r--internal/mapr/aggregateset.go8
-rw-r--r--internal/mapr/client/aggregate.go12
3 files changed, 17 insertions, 8 deletions
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index df44ec9..44daf7d 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -64,6 +64,11 @@ func (h *MaprHandler) handleAggregateMessage(message string) {
// Index 0 contains 'AGGREGATE', 1 contains server host.
// Aggregation data begins from index 2.
logger.Debug("Received aggregate data", h.server, h.count, parts)
+ /*
+ for k, v := range parts {
+ logger.Debug(k, v)
+ }
+ */
h.aggregate.Aggregate(parts[2:])
logger.Debug("Aggregated aggregate data", h.server, h.count)
}
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go
index fdf8db2..d8705bd 100644
--- a/internal/mapr/aggregateset.go
+++ b/internal/mapr/aggregateset.go
@@ -6,8 +6,6 @@ import (
"fmt"
"strconv"
"strings"
-
- "github.com/mimecast/dtail/internal/io/logger"
)
// AggregateSet represents aggregated key/value pairs from the
@@ -86,11 +84,7 @@ func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<-
sb.WriteString(k)
sb.WriteString("=")
if k == "$line" {
- decoded, err := base64.StdEncoding.DecodeString(v)
- if err != nil {
- logger.Error("Unable to decode $line", err, v)
- }
- sb.WriteString(string(decoded))
+ sb.WriteString(base64.StdEncoding.EncodeToString([]byte(v)))
sb.WriteString("|")
continue
}
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 1272a19..fd3a899 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -1,6 +1,7 @@
package client
import (
+ "encoding/base64"
"strconv"
"strings"
@@ -35,7 +36,7 @@ func (a *Aggregate) Aggregate(parts []string) {
groupKey := parts[0]
samples, err := strconv.Atoi(parts[1])
if err != nil {
- logger.FatalExit(parts, err)
+ logger.FatalExit("Unable to parse sample count", parts[1], err, parts)
}
fields := a.makeFields(parts[2:])
set := a.group.GetSet(groupKey)
@@ -74,6 +75,15 @@ func (a *Aggregate) makeFields(parts []string) map[string]string {
if len(kv) != 2 {
continue
}
+ if kv[0] == "$line" {
+ decoded, err := base64.StdEncoding.DecodeString(kv[1])
+ if err != nil {
+ logger.Error("Unable to decode $line", kv[1], err)
+ continue
+ }
+ fields[kv[0]] = string(decoded)
+ continue
+ }
fields[kv[0]] = kv[1]
}