summaryrefslogtreecommitdiff
path: root/internal/mapr/server/aggregate.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-20 11:06:50 +0200
committerPaul Buetow <paul@buetow.org>2026-03-20 11:06:50 +0200
commit13b21feb07c86f65760f7338f284f3b492364cd9 (patch)
treec9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /internal/mapr/server/aggregate.go
parentda8e581617a0240626d2bc922916416440e65bae (diff)
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'internal/mapr/server/aggregate.go')
-rw-r--r--internal/mapr/server/aggregate.go14
1 files changed, 2 insertions, 12 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 9a736a5..c9d4641 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -12,7 +12,6 @@ import (
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
- "github.com/mimecast/dtail/internal/protocol"
)
// Aggregate is for aggregating mapreduce data on the DTail server side.
@@ -282,7 +281,7 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context,
serialize := func() {
dlog.Server.Info("Serializing mapreduce result")
group.Serialize(ctx, maprMessages)
- group = mapr.NewGroupSet()
+ group.InitSet()
}
for {
select {
@@ -301,16 +300,7 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context,
}
func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
- var sb strings.Builder
- for i, field := range a.query.GroupBy {
- if i > 0 {
- sb.WriteString(protocol.AggregateGroupKeyCombinator)
- }
- if val, ok := fields[field]; ok {
- sb.WriteString(val)
- }
- }
- groupKey := sb.String()
+ groupKey := buildGroupKey(a.query.GroupBy, fields)
set := group.GetSet(groupKey)
var addedSample bool