From a1ce50df2c9ced992d5895e7f6d7dc1fde6ffe75 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 17 Jun 2020 12:59:15 +0100 Subject: initial log monitoring support --- internal/mapr/aggregateset.go | 12 ++++++++++++ internal/mapr/server/aggregate.go | 6 ++++++ 2 files changed, 18 insertions(+) (limited to 'internal/mapr') diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index 7fb4c17..fdf8db2 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -2,9 +2,12 @@ package mapr import ( "context" + "encoding/base64" "fmt" "strconv" "strings" + + "github.com/mimecast/dtail/internal/io/logger" ) // AggregateSet represents aggregated key/value pairs from the @@ -82,6 +85,15 @@ func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- for k, v := range s.SValues { sb.WriteString(k) sb.WriteString("=") + if k == "$line" { + decoded, err := base64.StdEncoding.DecodeString(v) + if err != nil { + logger.Error("Unable to decode $line", err, v) + } + sb.WriteString(string(decoded)) + sb.WriteString("|") + continue + } sb.WriteString(v) sb.WriteString("|") } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 9403aa9..80a464d 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -192,6 +192,12 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { var addedSample bool for _, sc := range a.query.Select { if val, ok := fields[sc.Field]; ok { + /* + if sc.Field == "$line" { + // Complete log line as to arrive untouched on the client side. + val = base64.StdEncoding.EncodeToString([]byte(val)) + } + */ if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { logger.Error(err) continue -- cgit v1.2.3