diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
| commit | 13b21feb07c86f65760f7338f284f3b492364cd9 (patch) | |
| tree | c9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /internal/mapr/client | |
| parent | da8e581617a0240626d2bc922916416440e65bae (diff) | |
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'internal/mapr/client')
| -rw-r--r-- | internal/mapr/client/aggregate.go | 31 | ||||
| -rw-r--r-- | internal/mapr/client/aggregate_test.go | 26 |
2 files changed, 54 insertions, 3 deletions
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 8cbd339..9989e8f 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -92,15 +92,40 @@ func (a *Aggregate) Aggregate(message string) error { return nil } +// Flush merges any pending per-server aggregate state into the shared global group. +// The normal hot path uses MergeNoblock to avoid stalling on the global merge lock. +// During shutdown we need a blocking flush so the last local batch is not lost. +func (a *Aggregate) Flush() error { + if a.session == nil { + return fmt.Errorf("missing client mapreduce session state") + } + + snapshot := a.session.Snapshot() + if snapshot.Query == nil || snapshot.GlobalGroup == nil { + return nil + } + if snapshot.Generation != a.generation { + a.group.InitSet() + a.generation = snapshot.Generation + return nil + } + + if err := snapshot.GlobalGroup.Merge(snapshot.Query, a.group); err != nil { + return fmt.Errorf("unable to flush aggregate data for server %s: %w", a.server, err) + } + a.group.InitSet() + return nil +} + // Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. func (a *Aggregate) makeFields(parts []string) map[string]string { fields := make(map[string]string, len(parts)) for _, part := range parts { - kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2) - if len(kv) != 2 { + key, value, ok := strings.Cut(part, protocol.AggregateKVDelimiter) + if !ok { continue } - fields[kv[0]] = kv[1] + fields[key] = value } return fields } diff --git a/internal/mapr/client/aggregate_test.go b/internal/mapr/client/aggregate_test.go index 8ac94a1..3387a63 100644 --- a/internal/mapr/client/aggregate_test.go +++ b/internal/mapr/client/aggregate_test.go @@ -57,6 +57,32 @@ func TestAggregateRejectsMalformedMessage(t *testing.T) { } } +func TestAggregateFlushMergesPendingLocalState(t *testing.T) { + query := mustSessionStateQuery(t, "select status,count(status) from stats group by status") + state := NewSessionState(query) + aggregate := NewAggregate("srv1", state) + countStorage := aggregateCountStorage(t, query) + + set := aggregate.group.GetSet("ERROR") + set.Samples = 3 + set.FValues[countStorage] = 3 + + if err := aggregate.Flush(); err != nil { + t.Fatalf("Flush() error = %v", err) + } + + result, numRows, err := state.Snapshot().GlobalGroup.Result(query, 10, nil) + if err != nil { + t.Fatalf("Result() error = %v", err) + } + if numRows != 1 { + t.Fatalf("numRows = %d, want 1", numRows) + } + if !strings.Contains(result, "3") { + t.Fatalf("expected flushed aggregate row, got %q", result) + } +} + func aggregateCountStorage(t *testing.T, query *mapr.Query) string { t.Helper() |
