summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-03-04 16:32:27 +0000
committerPaul Buetow <pbuetow@mimecast.com>2020-03-04 16:32:27 +0000
commit238dd3930e9c58397a86f649c77912ee32e4d7f0 (patch)
treeb4cda0b8c677188b600478522471628b5d4efea4 /internal/clients
parent89d3ebfc4e0c947977e5f455ee76f3ce29cc92c7 (diff)
can tail probe with a given timeout and then write a mapreduce result
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/baseclient.go2
-rw-r--r--internal/clients/maprclient.go39
-rw-r--r--internal/clients/runclient.go2
-rw-r--r--internal/clients/tailclient.go1
4 files changed, 26 insertions, 18 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 75da187..10a5559 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -123,7 +123,7 @@ func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
// Put it back on the channel
active <- struct{}{}
- if c.Mode == omode.TailClient {
+ if c.Mode == omode.TailClient && c.retry {
<-ctx.Done()
}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index b581844..32340b3 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -24,7 +24,7 @@ type MaprClient struct {
// The query object (constructed from queryStr)
query *mapr.Query
// Additative result or new result every run?
- additative bool
+ cumulative bool
}
// NewMaprClient returns a new mapreduce client.
@@ -33,23 +33,28 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
return nil, errors.New("No mapreduce query specified, use '-query' flag")
}
+ query, err := mapr.NewQuery(queryStr)
+ if err != nil {
+ logger.FatalExit(queryStr, "Can't parse mapr query", err)
+ }
+
+ // Don't retry connection if in tail mode and no outfile specified.
+ retry := args.Mode == omode.TailClient && query.Outfile == ""
+
+ // Result is comulative if we are in MapClient mode or with outfile
+ cumulative := args.Mode == omode.MapClient || query.Outfile != ""
+
c := MaprClient{
baseClient: baseClient{
Args: args,
throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
- retry: args.Mode == omode.TailClient,
+ retry: retry,
},
+ query: query,
queryStr: queryStr,
- additative: args.Mode == omode.MapClient,
+ cumulative: cumulative,
}
- query, err := mapr.NewQuery(c.queryStr)
- if err != nil {
- logger.FatalExit(c.queryStr, "Can't parse mapr query", err)
- }
-
- c.query = query
-
switch c.query.Table {
case "*":
c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|")
@@ -73,7 +78,7 @@ func (c *MaprClient) Start(ctx context.Context) (status int) {
}
status = c.baseClient.Start(ctx)
- if c.additative {
+ if c.cumulative {
c.recievedFinalResult()
}
@@ -87,12 +92,16 @@ func (c MaprClient) makeHandler(server string) handlers.Handler {
func (c MaprClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
- modeStr := "tail"
- if c.additative {
- modeStr = "cat"
+ modeStr := "cat"
+ if c.Mode == omode.TailClient {
+ modeStr = "tail"
}
for _, file := range strings.Split(c.What, ",") {
+ if c.Timeout > 0 {
+ commands = append(commands, fmt.Sprintf("timeout %d %s %s regex %s", c.Timeout, modeStr, file, c.Regex))
+ continue
+ }
commands = append(commands, fmt.Sprintf("%s %s regex %s", modeStr, file, c.Regex))
}
@@ -133,7 +142,7 @@ func (c *MaprClient) printResults() {
var err error
var numLines int
- if c.additative {
+ if c.cumulative {
result, numLines, err = c.globalGroup.Result(c.query)
} else {
result, numLines, err = c.globalGroup.SwapOut().Result(c.query)
diff --git a/internal/clients/runclient.go b/internal/clients/runclient.go
index 543df15..9f8e478 100644
--- a/internal/clients/runclient.go
+++ b/internal/clients/runclient.go
@@ -53,8 +53,6 @@ func (c RunClient) makeCommands() (commands []string) {
}
commands = append(commands, fmt.Sprintf("run%s %s", c.options(), c.What))
- logger.Debug(commands)
-
return
}
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index 4d81fd5..15e77cc 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -38,5 +38,6 @@ func (c TailClient) makeCommands() (commands []string) {
for _, file := range strings.Split(c.What, ",") {
commands = append(commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex))
}
+
return
}