From 7fff78dc69be2fdaa6646d98283a5a5991fdc757 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 2 Jul 2020 13:29:45 +0100 Subject: send $line base64 encoded over the wire --- internal/clients/handlers/maprhandler.go | 5 +++++ internal/mapr/aggregateset.go | 8 +------- internal/mapr/client/aggregate.go | 12 +++++++++++- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index df44ec9..44daf7d 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -64,6 +64,11 @@ func (h *MaprHandler) handleAggregateMessage(message string) { // Index 0 contains 'AGGREGATE', 1 contains server host. // Aggregation data begins from index 2. logger.Debug("Received aggregate data", h.server, h.count, parts) + /* + for k, v := range parts { + logger.Debug(k, v) + } + */ h.aggregate.Aggregate(parts[2:]) logger.Debug("Aggregated aggregate data", h.server, h.count) } diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index fdf8db2..d8705bd 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -6,8 +6,6 @@ import ( "fmt" "strconv" "strings" - - "github.com/mimecast/dtail/internal/io/logger" ) // AggregateSet represents aggregated key/value pairs from the @@ -86,11 +84,7 @@ func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- sb.WriteString(k) sb.WriteString("=") if k == "$line" { - decoded, err := base64.StdEncoding.DecodeString(v) - if err != nil { - logger.Error("Unable to decode $line", err, v) - } - sb.WriteString(string(decoded)) + sb.WriteString(base64.StdEncoding.EncodeToString([]byte(v))) sb.WriteString("|") continue } diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 1272a19..fd3a899 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -1,6 +1,7 @@ package client import ( + "encoding/base64" "strconv" "strings" @@ -35,7 +36,7 @@ func (a *Aggregate) Aggregate(parts []string) { groupKey := parts[0] samples, err := strconv.Atoi(parts[1]) if err != nil { - logger.FatalExit(parts, err) + logger.FatalExit("Unable to parse sample count", parts[1], err, parts) } fields := a.makeFields(parts[2:]) set := a.group.GetSet(groupKey) @@ -74,6 +75,15 @@ func (a *Aggregate) makeFields(parts []string) map[string]string { if len(kv) != 2 { continue } + if kv[0] == "$line" { + decoded, err := base64.StdEncoding.DecodeString(kv[1]) + if err != nil { + logger.Error("Unable to decode $line", kv[1], err) + continue + } + fields[kv[0]] = string(decoded) + continue + } fields[kv[0]] = kv[1] } -- cgit v1.2.3