summaryrefslogtreecommitdiff
path: root/internal/mapr
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-06-17 12:59:15 +0100
committerPaul Buetow <pbuetow@mimecast.com>2020-06-17 12:59:15 +0100
commitb7b528277014879e436ae7fe1f3851024938fbd3 (patch)
tree8be1cc0ecf3a25c390c9a7ad97afb5aa52a6c512 /internal/mapr
parent4da9ed0f4ded049e28607cc7ea78c8b091ca721b (diff)
initial log monitoring support
Diffstat (limited to 'internal/mapr')
-rw-r--r--internal/mapr/aggregateset.go12
-rw-r--r--internal/mapr/server/aggregate.go6
2 files changed, 18 insertions, 0 deletions
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go
index 7fb4c17..fdf8db2 100644
--- a/internal/mapr/aggregateset.go
+++ b/internal/mapr/aggregateset.go
@@ -2,9 +2,12 @@ package mapr
import (
"context"
+ "encoding/base64"
"fmt"
"strconv"
"strings"
+
+ "github.com/mimecast/dtail/internal/io/logger"
)
// AggregateSet represents aggregated key/value pairs from the
@@ -82,6 +85,15 @@ func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<-
for k, v := range s.SValues {
sb.WriteString(k)
sb.WriteString("=")
+ if k == "$line" {
+ decoded, err := base64.StdEncoding.DecodeString(v)
+ if err != nil {
+ logger.Error("Unable to decode $line", err, v)
+ }
+ sb.WriteString(string(decoded))
+ sb.WriteString("|")
+ continue
+ }
sb.WriteString(v)
sb.WriteString("|")
}
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 9403aa9..80a464d 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -192,6 +192,12 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
var addedSample bool
for _, sc := range a.query.Select {
if val, ok := fields[sc.Field]; ok {
+ /*
+ if sc.Field == "$line" {
+ // Complete log line as to arrive untouched on the client side.
+ val = base64.StdEncoding.EncodeToString([]byte(val))
+ }
+ */
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
logger.Error(err)
continue