diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-03-04 16:32:27 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-03-04 16:32:27 +0000 |
| commit | 238dd3930e9c58397a86f649c77912ee32e4d7f0 (patch) | |
| tree | b4cda0b8c677188b600478522471628b5d4efea4 /internal/clients | |
| parent | 89d3ebfc4e0c947977e5f455ee76f3ce29cc92c7 (diff) | |
can tail probe with a given timeout and then write a mapreduce result
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/baseclient.go | 2 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 39 | ||||
| -rw-r--r-- | internal/clients/runclient.go | 2 | ||||
| -rw-r--r-- | internal/clients/tailclient.go | 1 |
4 files changed, 26 insertions, 18 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 75da187..10a5559 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -123,7 +123,7 @@ func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { // Put it back on the channel active <- struct{}{} - if c.Mode == omode.TailClient { + if c.Mode == omode.TailClient && c.retry { <-ctx.Done() } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index b581844..32340b3 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -24,7 +24,7 @@ type MaprClient struct { // The query object (constructed from queryStr) query *mapr.Query // Additative result or new result every run? - additative bool + cumulative bool } // NewMaprClient returns a new mapreduce client. @@ -33,23 +33,28 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { return nil, errors.New("No mapreduce query specified, use '-query' flag") } + query, err := mapr.NewQuery(queryStr) + if err != nil { + logger.FatalExit(queryStr, "Can't parse mapr query", err) + } + + // Don't retry connection if in tail mode and no outfile specified. + retry := args.Mode == omode.TailClient && query.Outfile == "" + + // Result is comulative if we are in MapClient mode or with outfile + cumulative := args.Mode == omode.MapClient || query.Outfile != "" + c := MaprClient{ baseClient: baseClient{ Args: args, throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), - retry: args.Mode == omode.TailClient, + retry: retry, }, + query: query, queryStr: queryStr, - additative: args.Mode == omode.MapClient, + cumulative: cumulative, } - query, err := mapr.NewQuery(c.queryStr) - if err != nil { - logger.FatalExit(c.queryStr, "Can't parse mapr query", err) - } - - c.query = query - switch c.query.Table { case "*": c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|") @@ -73,7 +78,7 @@ func (c *MaprClient) Start(ctx context.Context) (status int) { } status = c.baseClient.Start(ctx) - if c.additative { + if c.cumulative { c.recievedFinalResult() } @@ -87,12 +92,16 @@ func (c MaprClient) makeHandler(server string) handlers.Handler { func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) - modeStr := "tail" - if c.additative { - modeStr = "cat" + modeStr := "cat" + if c.Mode == omode.TailClient { + modeStr = "tail" } for _, file := range strings.Split(c.What, ",") { + if c.Timeout > 0 { + commands = append(commands, fmt.Sprintf("timeout %d %s %s regex %s", c.Timeout, modeStr, file, c.Regex)) + continue + } commands = append(commands, fmt.Sprintf("%s %s regex %s", modeStr, file, c.Regex)) } @@ -133,7 +142,7 @@ func (c *MaprClient) printResults() { var err error var numLines int - if c.additative { + if c.cumulative { result, numLines, err = c.globalGroup.Result(c.query) } else { result, numLines, err = c.globalGroup.SwapOut().Result(c.query) diff --git a/internal/clients/runclient.go b/internal/clients/runclient.go index 543df15..9f8e478 100644 --- a/internal/clients/runclient.go +++ b/internal/clients/runclient.go @@ -53,8 +53,6 @@ func (c RunClient) makeCommands() (commands []string) { } commands = append(commands, fmt.Sprintf("run%s %s", c.options(), c.What)) - logger.Debug(commands) - return } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index 4d81fd5..15e77cc 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -38,5 +38,6 @@ func (c TailClient) makeCommands() (commands []string) { for _, file := range strings.Split(c.What, ",") { commands = append(commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex)) } + return } |
