diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-10-15 12:38:39 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-10-15 12:38:39 +0300 |
| commit | 55ba72efa4e5d2363f8e0c2cf729c596e760e1c3 (patch) | |
| tree | 72618e384626d9fc368994e3f24be9e9892d0610 /internal/mapr/aggregateset.go | |
| parent | dccbee7dc355438d87baff45e054848e508b004d (diff) | |
| parent | d3549a3316a9917520ab5e6b0cd7b1846c59ad4b (diff) | |
merge from github.com/snonux/dtail
Diffstat (limited to 'internal/mapr/aggregateset.go')
| -rw-r--r-- | internal/mapr/aggregateset.go | 29 |
1 files changed, 16 insertions, 13 deletions
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index a6cc6eb..c50c7a1 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -5,6 +5,10 @@ import ( "fmt" "strconv" "strings" + + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/protocol" ) // AggregateSet represents aggregated key/value pairs from the @@ -33,8 +37,7 @@ func (s *AggregateSet) String() string { // Merge one aggregate set into this one. func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { s.Samples += set.Samples - //logger.Trace("Merge", set) - + //dlog.Common.Trace("Merge", set) for _, sc := range query.Select { storage := sc.FieldStorage switch sc.Operation { @@ -66,24 +69,27 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { // Serialize the aggregate set so it can be sent over the wire. func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) { - //logger.Trace("Serialising mapr.AggregateSet", s) - var sb strings.Builder + dlog.Common.Trace("Serialising mapr.AggregateSet", s) + sb := pool.BuilderBuffer.Get().(*strings.Builder) + defer pool.RecycleBuilderBuffer(sb) sb.WriteString(groupKey) - sb.WriteString("ā") - sb.WriteString(fmt.Sprintf("%dā", s.Samples)) + sb.WriteString(protocol.AggregateDelimiter) + sb.WriteString(fmt.Sprintf("%d", s.Samples)) + sb.WriteString(protocol.AggregateDelimiter) for k, v := range s.FValues { sb.WriteString(k) - sb.WriteString("=") - sb.WriteString(fmt.Sprintf("%vā", v)) + sb.WriteString(protocol.AggregateKVDelimiter) + sb.WriteString(fmt.Sprintf("%v", v)) + sb.WriteString(protocol.AggregateDelimiter) } for k, v := range s.SValues { sb.WriteString(k) - sb.WriteString("=") + sb.WriteString(protocol.AggregateKVDelimiter) sb.WriteString(v) - sb.WriteString("ā") + sb.WriteString(protocol.AggregateDelimiter) } select { @@ -108,7 +114,6 @@ func (s *AggregateSet) addFloatMin(key string, value float64) { s.FValues[key] = value return } - if f > value { s.FValues[key] = value } @@ -121,7 +126,6 @@ func (s *AggregateSet) addFloatMax(key string, value float64) { s.FValues[key] = value return } - if f < value { s.FValues[key] = value } @@ -140,7 +144,6 @@ func (s *AggregateSet) setFloat(key string, value float64) { // Aggregate data to the aggregate set. func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) { var f float64 - // First check if we can aggregate anything without converting value to float. switch agg { case Count: |
