summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-02 09:38:36 +0200
committerPaul Buetow <paul@buetow.org>2026-03-02 09:38:36 +0200
commit5e8d0454d1aa5388a7045bdcb3069e41e4474957 (patch)
tree8d12e2e7629ea8ec1b7efbe5a0fb022163f727dd
parentd8d7e7c229e44bbd3a9d0ac268829e40d0704957 (diff)
mapr client: replace runtime panics with errors
Task: 4e6d7744-3f5c-4880-9e5d-368ece96470d
-rw-r--r--internal/clients/maprclient.go35
-rw-r--r--internal/mapr/client/aggregate.go2
2 files changed, 21 insertions, 16 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index cfbffee..2af038f 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -99,11 +99,13 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
go c.periodicReportResults(ctx)
status = c.baseClient.Start(ctx, statsCh)
-
+
// Always write final result for cumulative mode (includes outfile case)
if c.cumulative {
dlog.Client.Debug("Writing final mapreduce result")
- c.reportResults(true)
+ if err := c.reportResults(true); err != nil {
+ dlog.Client.Error("Unable to write final mapreduce result", err)
+ }
dlog.Client.Debug("Final result written")
}
@@ -149,22 +151,23 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) {
select {
case <-time.After(c.query.Interval):
dlog.Client.Debug("Gathering interim mapreduce result")
- c.reportResults(false)
+ if err := c.reportResults(false); err != nil {
+ dlog.Client.Error("Unable to gather mapreduce result", err)
+ }
case <-ctx.Done():
return
}
}
}
-func (c *MaprClient) reportResults(finalResult bool) {
+func (c *MaprClient) reportResults(finalResult bool) error {
if c.query.HasOutfile() {
- c.writeResultsToOutfile(finalResult)
- return
+ return c.writeResultsToOutfile(finalResult)
}
- c.printResults()
+ return c.printResults()
}
-func (c *MaprClient) printResults() {
+func (c *MaprClient) printResults() error {
var result string
var err error
var numRows int
@@ -182,18 +185,18 @@ func (c *MaprClient) printResults() {
result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit)
}
if err != nil {
- dlog.Client.FatalPanic(err)
+ return fmt.Errorf("unable to render mapreduce result: %w", err)
}
if result == c.lastResult {
dlog.Client.Debug("Result hasn't changed compared to last time...")
- return
+ return nil
}
c.lastResult = result
if numRows == 0 {
dlog.Client.Debug("Empty result set this time...")
- return
+ return nil
}
rawQuery := c.query.RawQuery
@@ -210,19 +213,21 @@ func (c *MaprClient) printResults() {
"to %d rows! Use 'limit' clause to override!", numRows, rowsLimit))
}
dlog.Client.Raw(fmt.Sprintf("%s\n", result))
+ return nil
}
-func (c *MaprClient) writeResultsToOutfile(finalResult bool) {
+func (c *MaprClient) writeResultsToOutfile(finalResult bool) error {
dlog.Client.Debug("writeResultsToOutfile called", "finalResult", finalResult, "cumulative", c.cumulative)
if c.cumulative {
if err := c.globalGroup.WriteResult(c.query, finalResult); err != nil {
- dlog.Client.FatalPanic(err)
+ return fmt.Errorf("unable to write cumulative mapreduce result: %w", err)
}
dlog.Client.Debug("WriteResult completed for cumulative mode")
- return
+ return nil
}
if err := c.globalGroup.SwapOut().WriteResult(c.query, true); err != nil {
- dlog.Client.FatalPanic(err)
+ return fmt.Errorf("unable to write non-cumulative mapreduce result: %w", err)
}
dlog.Client.Debug("WriteResult completed for non-cumulative mode")
+ return nil
}
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 2e9b61a..2750643 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -67,7 +67,7 @@ func (a *Aggregate) Aggregate(message string) error {
// Merge data from group into global group.
isMerged, err := a.globalGroup.MergeNoblock(a.query, a.group)
if err != nil {
- panic(err)
+ return fmt.Errorf("unable to merge aggregate data for server %s: %w", a.server, err)
}
if isMerged {
// Re-init local group (make it empty again).