summaryrefslogtreecommitdiff
path: root/internal/mapr/aggregateset.go
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2021-10-15 12:38:39 +0300
committerPaul Buetow <pbuetow@mimecast.com>2021-10-15 12:38:39 +0300
commit55ba72efa4e5d2363f8e0c2cf729c596e760e1c3 (patch)
tree72618e384626d9fc368994e3f24be9e9892d0610 /internal/mapr/aggregateset.go
parentdccbee7dc355438d87baff45e054848e508b004d (diff)
parentd3549a3316a9917520ab5e6b0cd7b1846c59ad4b (diff)
merge from github.com/snonux/dtail
Diffstat (limited to 'internal/mapr/aggregateset.go')
-rw-r--r--internal/mapr/aggregateset.go29
1 files changed, 16 insertions, 13 deletions
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go
index a6cc6eb..c50c7a1 100644
--- a/internal/mapr/aggregateset.go
+++ b/internal/mapr/aggregateset.go
@@ -5,6 +5,10 @@ import (
"fmt"
"strconv"
"strings"
+
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
)
// AggregateSet represents aggregated key/value pairs from the
@@ -33,8 +37,7 @@ func (s *AggregateSet) String() string {
// Merge one aggregate set into this one.
func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
s.Samples += set.Samples
- //logger.Trace("Merge", set)
-
+ //dlog.Common.Trace("Merge", set)
for _, sc := range query.Select {
storage := sc.FieldStorage
switch sc.Operation {
@@ -66,24 +69,27 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
// Serialize the aggregate set so it can be sent over the wire.
func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) {
- //logger.Trace("Serialising mapr.AggregateSet", s)
- var sb strings.Builder
+ dlog.Common.Trace("Serialising mapr.AggregateSet", s)
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
sb.WriteString(groupKey)
- sb.WriteString("āž”")
- sb.WriteString(fmt.Sprintf("%dāž”", s.Samples))
+ sb.WriteString(protocol.AggregateDelimiter)
+ sb.WriteString(fmt.Sprintf("%d", s.Samples))
+ sb.WriteString(protocol.AggregateDelimiter)
for k, v := range s.FValues {
sb.WriteString(k)
- sb.WriteString("=")
- sb.WriteString(fmt.Sprintf("%vāž”", v))
+ sb.WriteString(protocol.AggregateKVDelimiter)
+ sb.WriteString(fmt.Sprintf("%v", v))
+ sb.WriteString(protocol.AggregateDelimiter)
}
for k, v := range s.SValues {
sb.WriteString(k)
- sb.WriteString("=")
+ sb.WriteString(protocol.AggregateKVDelimiter)
sb.WriteString(v)
- sb.WriteString("āž”")
+ sb.WriteString(protocol.AggregateDelimiter)
}
select {
@@ -108,7 +114,6 @@ func (s *AggregateSet) addFloatMin(key string, value float64) {
s.FValues[key] = value
return
}
-
if f > value {
s.FValues[key] = value
}
@@ -121,7 +126,6 @@ func (s *AggregateSet) addFloatMax(key string, value float64) {
s.FValues[key] = value
return
}
-
if f < value {
s.FValues[key] = value
}
@@ -140,7 +144,6 @@ func (s *AggregateSet) setFloat(key string, value float64) {
// Aggregate data to the aggregate set.
func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) {
var f float64
-
// First check if we can aggregate anything without converting value to float.
switch agg {
case Count: