summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-07-03 12:15:59 +0100
committerPaul Buetow <pbuetow@mimecast.com>2020-07-03 12:15:59 +0100
commiteb9c8d4ae7f8fb7e65f912ff4838c7737b5487d0 (patch)
treef5e6c0be15200b18f306878037c3896e1084cf53 /internal/clients
parent912f7dce7222d345dc6cc6cc593a45ee7e2e15f8 (diff)
refactor mapr client
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/maprclient.go89
1 files changed, 56 insertions, 33 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index c4e445b..e28dadb 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -14,6 +14,14 @@ import (
"github.com/mimecast/dtail/internal/omode"
)
+type MaprClientMode int
+
+const (
+ DefaultMode MaprClientMode = iota
+ CumulativeMode MaprClientMode = iota
+ NonCumulativeMode MaprClientMode = iota
+)
+
// MaprClient is used for running mapreduce aggregations on remote files.
type MaprClient struct {
baseClient
@@ -23,12 +31,12 @@ type MaprClient struct {
globalGroup *mapr.GlobalGroupSet
// The query object (constructed from queryStr)
query *mapr.Query
- // Additative result or new result every run?
+ // Additative result or new result every interval run?
cumulative bool
}
// NewMaprClient returns a new mapreduce client.
-func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
+func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) {
if queryStr == "" {
return nil, errors.New("No mapreduce query specified, use '-query' flag")
}
@@ -39,10 +47,18 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
}
// Don't retry connection if in tail mode and no outfile specified.
- retry := args.Mode == omode.TailClient && query.Outfile == ""
-
- // Result is comulative if we are in MapClient mode or with outfile
- cumulative := args.Mode == omode.MapClient || query.Outfile != ""
+ retry := args.Mode == omode.TailClient && !query.HasOutfile()
+
+ var cumulative bool
+ switch maprClientMode {
+ case CumulativeMode:
+ cumulative = true
+ case NonCumulativeMode:
+ cumulative = false
+ default:
+ // Result is comulative if we are in MapClient mode or with outfile
+ cumulative = args.Mode == omode.MapClient || query.HasOutfile()
+ }
c := MaprClient{
baseClient: baseClient{
@@ -72,14 +88,12 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
// Start starts the mapreduce client.
func (c *MaprClient) Start(ctx context.Context) (status int) {
- if c.query.Outfile == "" {
- // Only print out periodic results if we don't write an outfile
- go c.periodicPrintResults(ctx)
- }
+ go c.periodicReportResults(ctx)
status = c.baseClient.Start(ctx)
if c.cumulative {
- c.recievedFinalResult()
+ logger.Info("Received final mapreduce result")
+ c.reportResults()
}
return
@@ -108,35 +122,27 @@ func (c MaprClient) makeCommands() (commands []string) {
return
}
-func (c *MaprClient) recievedFinalResult() {
- logger.Info("Received final mapreduce result")
-
- if c.query.Outfile == "" {
- c.printResults()
- return
- }
-
- logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile))
- err := c.globalGroup.WriteResult(c.query)
- if err != nil {
- logger.FatalExit(err)
- return
- }
- logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile))
-}
-
-func (c *MaprClient) periodicPrintResults(ctx context.Context) {
+func (c *MaprClient) periodicReportResults(ctx context.Context) {
for {
select {
case <-time.After(c.query.Interval):
logger.Info("Gathering interim mapreduce result")
- c.printResults()
+ c.reportResults()
case <-ctx.Done():
return
}
}
}
+func (c *MaprClient) reportResults() {
+ if c.query.HasOutfile() {
+ c.writeResultsToOutfile()
+ return
+ }
+
+ c.printResults()
+}
+
func (c *MaprClient) printResults() {
var result string
var err error
@@ -150,8 +156,25 @@ func (c *MaprClient) printResults() {
if err != nil {
logger.FatalExit(err)
}
- if numLines > 0 {
- logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
- logger.Raw(result)
+
+ if numLines == 0 {
+ logger.Info("Empty result set this time...")
+ return
+ }
+
+ logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
+ logger.Raw(result)
+}
+
+func (c *MaprClient) writeResultsToOutfile() {
+ if c.cumulative {
+ if err := c.globalGroup.WriteResult(c.query); err != nil {
+ logger.FatalExit(err)
+ }
+ return
+ }
+
+ if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil {
+ logger.FatalExit(err)
}
}