From 5e8d0454d1aa5388a7045bdcb3069e41e4474957 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 2 Mar 2026 09:38:36 +0200 Subject: mapr client: replace runtime panics with errors Task: 4e6d7744-3f5c-4880-9e5d-368ece96470d --- internal/clients/maprclient.go | 35 ++++++++++++++++++++--------------- internal/mapr/client/aggregate.go | 2 +- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index cfbffee..2af038f 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -99,11 +99,13 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i go c.periodicReportResults(ctx) status = c.baseClient.Start(ctx, statsCh) - + // Always write final result for cumulative mode (includes outfile case) if c.cumulative { dlog.Client.Debug("Writing final mapreduce result") - c.reportResults(true) + if err := c.reportResults(true); err != nil { + dlog.Client.Error("Unable to write final mapreduce result", err) + } dlog.Client.Debug("Final result written") } @@ -149,22 +151,23 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) { select { case <-time.After(c.query.Interval): dlog.Client.Debug("Gathering interim mapreduce result") - c.reportResults(false) + if err := c.reportResults(false); err != nil { + dlog.Client.Error("Unable to gather mapreduce result", err) + } case <-ctx.Done(): return } } } -func (c *MaprClient) reportResults(finalResult bool) { +func (c *MaprClient) reportResults(finalResult bool) error { if c.query.HasOutfile() { - c.writeResultsToOutfile(finalResult) - return + return c.writeResultsToOutfile(finalResult) } - c.printResults() + return c.printResults() } -func (c *MaprClient) printResults() { +func (c *MaprClient) printResults() error { var result string var err error var numRows int @@ -182,18 +185,18 @@ func (c *MaprClient) printResults() { result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit) } if err != nil { - dlog.Client.FatalPanic(err) + return fmt.Errorf("unable to render mapreduce result: %w", err) } if result == c.lastResult { dlog.Client.Debug("Result hasn't changed compared to last time...") - return + return nil } c.lastResult = result if numRows == 0 { dlog.Client.Debug("Empty result set this time...") - return + return nil } rawQuery := c.query.RawQuery @@ -210,19 +213,21 @@ func (c *MaprClient) printResults() { "to %d rows! Use 'limit' clause to override!", numRows, rowsLimit)) } dlog.Client.Raw(fmt.Sprintf("%s\n", result)) + return nil } -func (c *MaprClient) writeResultsToOutfile(finalResult bool) { +func (c *MaprClient) writeResultsToOutfile(finalResult bool) error { dlog.Client.Debug("writeResultsToOutfile called", "finalResult", finalResult, "cumulative", c.cumulative) if c.cumulative { if err := c.globalGroup.WriteResult(c.query, finalResult); err != nil { - dlog.Client.FatalPanic(err) + return fmt.Errorf("unable to write cumulative mapreduce result: %w", err) } dlog.Client.Debug("WriteResult completed for cumulative mode") - return + return nil } if err := c.globalGroup.SwapOut().WriteResult(c.query, true); err != nil { - dlog.Client.FatalPanic(err) + return fmt.Errorf("unable to write non-cumulative mapreduce result: %w", err) } dlog.Client.Debug("WriteResult completed for non-cumulative mode") + return nil } diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 2e9b61a..2750643 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -67,7 +67,7 @@ func (a *Aggregate) Aggregate(message string) error { // Merge data from group into global group. isMerged, err := a.globalGroup.MergeNoblock(a.query, a.group) if err != nil { - panic(err) + return fmt.Errorf("unable to merge aggregate data for server %s: %w", a.server, err) } if isMerged { // Re-init local group (make it empty again). -- cgit v1.2.3