diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-13 09:29:59 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-13 09:29:59 +0200 |
| commit | d8f88d455990636bb797643dee7d39a95bbbd62c (patch) | |
| tree | 8c8447fc975ec6deebe48218d27e3defa1b3dcce /internal/mapr/client/aggregate.go | |
| parent | 7a79d0a8bf58b05dfbae331d00275739530b9584 (diff) | |
task 4abe7505: reset dmap generation state
Diffstat (limited to 'internal/mapr/client/aggregate.go')
| -rw-r--r-- | internal/mapr/client/aggregate.go | 40 |
1 files changed, 28 insertions, 12 deletions
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 2750643..8cbd339 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -12,30 +12,46 @@ import ( // Aggregate mapreduce data on the DTail client side. type Aggregate struct { - // This is the mapr query specified on the command line. - query *mapr.Query // This represents aggregated data of a single remote server. group *mapr.GroupSet - // This represents the merged aggregated data of all servers. - globalGroup *mapr.GlobalGroupSet + // Shared per-client session state. + session *SessionState + // The currently tracked shared generation. + generation uint64 // The server we aggregate the data for (logging and debugging purposes only) server string } // NewAggregate create new client aggregator. -func NewAggregate(server string, query *mapr.Query, - globalGroup *mapr.GlobalGroupSet) *Aggregate { +func NewAggregate(server string, session *SessionState) *Aggregate { + generation := uint64(0) + if session != nil { + generation = session.Snapshot().Generation + } return &Aggregate{ - query: query, - group: mapr.NewGroupSet(), - globalGroup: globalGroup, - server: server, + group: mapr.NewGroupSet(), + session: session, + generation: generation, + server: server, } } // Aggregate data from mapr log line into local (and global) group sets. func (a *Aggregate) Aggregate(message string) error { + if a.session == nil { + return fmt.Errorf("missing client mapreduce session state") + } + + snapshot := a.session.Snapshot() + if snapshot.Query == nil || snapshot.GlobalGroup == nil { + return fmt.Errorf("missing client mapreduce query state") + } + if snapshot.Generation != a.generation { + a.group.InitSet() + a.generation = snapshot.Generation + } + parts := strings.Split(message, protocol.AggregateDelimiter) if len(parts) < 4 { return fmt.Errorf("aggregate message without any real data") @@ -51,7 +67,7 @@ func (a *Aggregate) Aggregate(message string) error { set := a.group.GetSet(groupKey) var addedSamples bool - for _, sc := range a.query.Select { + for _, sc := range snapshot.Query.Select { if val, ok := fields[sc.FieldStorage]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil { dlog.Client.Error(err) @@ -65,7 +81,7 @@ func (a *Aggregate) Aggregate(message string) error { } // Merge data from group into global group. - isMerged, err := a.globalGroup.MergeNoblock(a.query, a.group) + isMerged, err := snapshot.GlobalGroup.MergeNoblock(snapshot.Query, a.group) if err != nil { return fmt.Errorf("unable to merge aggregate data for server %s: %w", a.server, err) } |
