diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-07-03 12:15:59 +0100 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-07-03 12:15:59 +0100 |
| commit | eb9c8d4ae7f8fb7e65f912ff4838c7737b5487d0 (patch) | |
| tree | f5e6c0be15200b18f306878037c3896e1084cf53 /internal/clients | |
| parent | 912f7dce7222d345dc6cc6cc593a45ee7e2e15f8 (diff) | |
refactor mapr client
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/maprclient.go | 89 |
1 files changed, 56 insertions, 33 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index c4e445b..e28dadb 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -14,6 +14,14 @@ import ( "github.com/mimecast/dtail/internal/omode" ) +type MaprClientMode int + +const ( + DefaultMode MaprClientMode = iota + CumulativeMode MaprClientMode = iota + NonCumulativeMode MaprClientMode = iota +) + // MaprClient is used for running mapreduce aggregations on remote files. type MaprClient struct { baseClient @@ -23,12 +31,12 @@ type MaprClient struct { globalGroup *mapr.GlobalGroupSet // The query object (constructed from queryStr) query *mapr.Query - // Additative result or new result every run? + // Additative result or new result every interval run? cumulative bool } // NewMaprClient returns a new mapreduce client. -func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { +func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) { if queryStr == "" { return nil, errors.New("No mapreduce query specified, use '-query' flag") } @@ -39,10 +47,18 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { } // Don't retry connection if in tail mode and no outfile specified. - retry := args.Mode == omode.TailClient && query.Outfile == "" - - // Result is comulative if we are in MapClient mode or with outfile - cumulative := args.Mode == omode.MapClient || query.Outfile != "" + retry := args.Mode == omode.TailClient && !query.HasOutfile() + + var cumulative bool + switch maprClientMode { + case CumulativeMode: + cumulative = true + case NonCumulativeMode: + cumulative = false + default: + // Result is comulative if we are in MapClient mode or with outfile + cumulative = args.Mode == omode.MapClient || query.HasOutfile() + } c := MaprClient{ baseClient: baseClient{ @@ -72,14 +88,12 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { // Start starts the mapreduce client. func (c *MaprClient) Start(ctx context.Context) (status int) { - if c.query.Outfile == "" { - // Only print out periodic results if we don't write an outfile - go c.periodicPrintResults(ctx) - } + go c.periodicReportResults(ctx) status = c.baseClient.Start(ctx) if c.cumulative { - c.recievedFinalResult() + logger.Info("Received final mapreduce result") + c.reportResults() } return @@ -108,35 +122,27 @@ func (c MaprClient) makeCommands() (commands []string) { return } -func (c *MaprClient) recievedFinalResult() { - logger.Info("Received final mapreduce result") - - if c.query.Outfile == "" { - c.printResults() - return - } - - logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile)) - err := c.globalGroup.WriteResult(c.query) - if err != nil { - logger.FatalExit(err) - return - } - logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile)) -} - -func (c *MaprClient) periodicPrintResults(ctx context.Context) { +func (c *MaprClient) periodicReportResults(ctx context.Context) { for { select { case <-time.After(c.query.Interval): logger.Info("Gathering interim mapreduce result") - c.printResults() + c.reportResults() case <-ctx.Done(): return } } } +func (c *MaprClient) reportResults() { + if c.query.HasOutfile() { + c.writeResultsToOutfile() + return + } + + c.printResults() +} + func (c *MaprClient) printResults() { var result string var err error @@ -150,8 +156,25 @@ func (c *MaprClient) printResults() { if err != nil { logger.FatalExit(err) } - if numLines > 0 { - logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) - logger.Raw(result) + + if numLines == 0 { + logger.Info("Empty result set this time...") + return + } + + logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) + logger.Raw(result) +} + +func (c *MaprClient) writeResultsToOutfile() { + if c.cumulative { + if err := c.globalGroup.WriteResult(c.query); err != nil { + logger.FatalExit(err) + } + return + } + + if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil { + logger.FatalExit(err) } } |
