diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
| commit | 13b21feb07c86f65760f7338f284f3b492364cd9 (patch) | |
| tree | c9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /internal/mapr/server/aggregate.go | |
| parent | da8e581617a0240626d2bc922916416440e65bae (diff) | |
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'internal/mapr/server/aggregate.go')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 14 |
1 files changed, 2 insertions, 12 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 9a736a5..c9d4641 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -12,7 +12,6 @@ import ( "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" - "github.com/mimecast/dtail/internal/protocol" ) // Aggregate is for aggregating mapreduce data on the DTail server side. @@ -282,7 +281,7 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, serialize := func() { dlog.Server.Info("Serializing mapreduce result") group.Serialize(ctx, maprMessages) - group = mapr.NewGroupSet() + group.InitSet() } for { select { @@ -301,16 +300,7 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, } func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { - var sb strings.Builder - for i, field := range a.query.GroupBy { - if i > 0 { - sb.WriteString(protocol.AggregateGroupKeyCombinator) - } - if val, ok := fields[field]; ok { - sb.WriteString(val) - } - } - groupKey := sb.String() + groupKey := buildGroupKey(a.query.GroupBy, fields) set := group.GetSet(groupKey) var addedSample bool |
