diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-07-02 13:29:45 +0100 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-07-02 13:29:45 +0100 |
| commit | 41fb325372034c7a42bf0157cdc08894d8371247 (patch) | |
| tree | 57e121a4e501e99648ded9340c20d9918b563b33 | |
| parent | 490615493acc8deb0b29c19cfdcd6ca067760c4b (diff) | |
send $line base64 encoded over the wire
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 5 | ||||
| -rw-r--r-- | internal/mapr/aggregateset.go | 8 | ||||
| -rw-r--r-- | internal/mapr/client/aggregate.go | 12 |
3 files changed, 17 insertions, 8 deletions
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index df44ec9..44daf7d 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -64,6 +64,11 @@ func (h *MaprHandler) handleAggregateMessage(message string) { // Index 0 contains 'AGGREGATE', 1 contains server host. // Aggregation data begins from index 2. logger.Debug("Received aggregate data", h.server, h.count, parts) + /* + for k, v := range parts { + logger.Debug(k, v) + } + */ h.aggregate.Aggregate(parts[2:]) logger.Debug("Aggregated aggregate data", h.server, h.count) } diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index fdf8db2..d8705bd 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -6,8 +6,6 @@ import ( "fmt" "strconv" "strings" - - "github.com/mimecast/dtail/internal/io/logger" ) // AggregateSet represents aggregated key/value pairs from the @@ -86,11 +84,7 @@ func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- sb.WriteString(k) sb.WriteString("=") if k == "$line" { - decoded, err := base64.StdEncoding.DecodeString(v) - if err != nil { - logger.Error("Unable to decode $line", err, v) - } - sb.WriteString(string(decoded)) + sb.WriteString(base64.StdEncoding.EncodeToString([]byte(v))) sb.WriteString("|") continue } diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 1272a19..fd3a899 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -1,6 +1,7 @@ package client import ( + "encoding/base64" "strconv" "strings" @@ -35,7 +36,7 @@ func (a *Aggregate) Aggregate(parts []string) { groupKey := parts[0] samples, err := strconv.Atoi(parts[1]) if err != nil { - logger.FatalExit(parts, err) + logger.FatalExit("Unable to parse sample count", parts[1], err, parts) } fields := a.makeFields(parts[2:]) set := a.group.GetSet(groupKey) @@ -74,6 +75,15 @@ func (a *Aggregate) makeFields(parts []string) map[string]string { if len(kv) != 2 { continue } + if kv[0] == "$line" { + decoded, err := base64.StdEncoding.DecodeString(kv[1]) + if err != nil { + logger.Error("Unable to decode $line", kv[1], err) + continue + } + fields[kv[0]] = string(decoded) + continue + } fields[kv[0]] = kv[1] } |
