summaryrefslogtreecommitdiff
path: root/internal/mapr/client
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-20 11:06:50 +0200
committerPaul Buetow <paul@buetow.org>2026-03-20 11:06:50 +0200
commit13b21feb07c86f65760f7338f284f3b492364cd9 (patch)
treec9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /internal/mapr/client
parentda8e581617a0240626d2bc922916416440e65bae (diff)
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'internal/mapr/client')
-rw-r--r--internal/mapr/client/aggregate.go31
-rw-r--r--internal/mapr/client/aggregate_test.go26
2 files changed, 54 insertions, 3 deletions
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 8cbd339..9989e8f 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -92,15 +92,40 @@ func (a *Aggregate) Aggregate(message string) error {
return nil
}
+// Flush merges any pending per-server aggregate state into the shared global group.
+// The normal hot path uses MergeNoblock to avoid stalling on the global merge lock.
+// During shutdown we need a blocking flush so the last local batch is not lost.
+func (a *Aggregate) Flush() error {
+ if a.session == nil {
+ return fmt.Errorf("missing client mapreduce session state")
+ }
+
+ snapshot := a.session.Snapshot()
+ if snapshot.Query == nil || snapshot.GlobalGroup == nil {
+ return nil
+ }
+ if snapshot.Generation != a.generation {
+ a.group.InitSet()
+ a.generation = snapshot.Generation
+ return nil
+ }
+
+ if err := snapshot.GlobalGroup.Merge(snapshot.Query, a.group); err != nil {
+ return fmt.Errorf("unable to flush aggregate data for server %s: %w", a.server, err)
+ }
+ a.group.InitSet()
+ return nil
+}
+
// Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"].
func (a *Aggregate) makeFields(parts []string) map[string]string {
fields := make(map[string]string, len(parts))
for _, part := range parts {
- kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2)
- if len(kv) != 2 {
+ key, value, ok := strings.Cut(part, protocol.AggregateKVDelimiter)
+ if !ok {
continue
}
- fields[kv[0]] = kv[1]
+ fields[key] = value
}
return fields
}
diff --git a/internal/mapr/client/aggregate_test.go b/internal/mapr/client/aggregate_test.go
index 8ac94a1..3387a63 100644
--- a/internal/mapr/client/aggregate_test.go
+++ b/internal/mapr/client/aggregate_test.go
@@ -57,6 +57,32 @@ func TestAggregateRejectsMalformedMessage(t *testing.T) {
}
}
+func TestAggregateFlushMergesPendingLocalState(t *testing.T) {
+ query := mustSessionStateQuery(t, "select status,count(status) from stats group by status")
+ state := NewSessionState(query)
+ aggregate := NewAggregate("srv1", state)
+ countStorage := aggregateCountStorage(t, query)
+
+ set := aggregate.group.GetSet("ERROR")
+ set.Samples = 3
+ set.FValues[countStorage] = 3
+
+ if err := aggregate.Flush(); err != nil {
+ t.Fatalf("Flush() error = %v", err)
+ }
+
+ result, numRows, err := state.Snapshot().GlobalGroup.Result(query, 10, nil)
+ if err != nil {
+ t.Fatalf("Result() error = %v", err)
+ }
+ if numRows != 1 {
+ t.Fatalf("numRows = %d, want 1", numRows)
+ }
+ if !strings.Contains(result, "3") {
+ t.Fatalf("expected flushed aggregate row, got %q", result)
+ }
+}
+
func aggregateCountStorage(t *testing.T, query *mapr.Query) string {
t.Helper()