diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-01-20 18:41:05 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-01-21 14:35:23 +0000 |
| commit | c128865c4c7411c29a59fca9a3a2f95537686d7b (patch) | |
| tree | 193bccc70d942c8b70cc93fae2670263701e43aa /clients/maprclient.go | |
| parent | 3755a9911ecb05886577095f2b8cc8b9e4066a3a (diff) | |
Move commands to cmd/ and move internal dependencies to internal/
Diffstat (limited to 'clients/maprclient.go')
| -rw-r--r-- | clients/maprclient.go | 153 |
1 files changed, 0 insertions, 153 deletions
diff --git a/clients/maprclient.go b/clients/maprclient.go deleted file mode 100644 index ad707c9..0000000 --- a/clients/maprclient.go +++ /dev/null @@ -1,153 +0,0 @@ -package clients - -import ( - "dtail/clients/handlers" - "dtail/clients/remote" - "dtail/logger" - "dtail/mapr" - "dtail/omode" - "dtail/ssh/client" - "errors" - "fmt" - "strings" - "sync" - "time" - - gossh "golang.org/x/crypto/ssh" -) - -// MaprClient is used for running mapreduce aggregations on remote files. -type MaprClient struct { - baseClient - // Query string for mapr aggregations - queryStr string - // Global group set for merged mapr aggregation results - globalGroup *mapr.GlobalGroupSet - // The query object (constructed from queryStr) - query *mapr.Query - // Additative result or new result every run? - additative bool -} - -// NewMaprClient returns a new mapreduce client. -func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { - if queryStr == "" { - return nil, errors.New("No mapreduce query specified, use '-query' flag") - } - - c := MaprClient{ - baseClient: baseClient{ - Args: args, - stop: make(chan struct{}), - stopped: make(chan struct{}), - throttleCh: make(chan struct{}, args.MaxInitConnections), - retry: args.Mode == omode.TailClient, - }, - queryStr: queryStr, - additative: args.Mode == omode.MapClient, - } - - query, err := mapr.NewQuery(c.queryStr) - if err != nil { - logger.FatalExit(c.queryStr, "Can't parse mapr query", err) - } - - c.query = query - - switch c.query.Table { - case "*": - c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|") - case ".": - c.Regex = "." - default: - c.Regex = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table) - } - - c.globalGroup = mapr.NewGlobalGroupSet() - c.baseClient.init(c) - - return &c, nil -} - -func (c MaprClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { - conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) - conn.Handler = handlers.NewMaprHandler(conn.Server, c.query, c.globalGroup, c.PingTimeout) - - conn.Commands = append(conn.Commands, fmt.Sprintf("map %s", c.query.RawQuery)) - commandStr := "tail" - if c.additative { - commandStr = "cat" - } - - for _, file := range strings.Split(c.Files, ",") { - conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", commandStr, file, c.Regex)) - } - - return conn -} - -// Start starts the mapreduce client. -func (c *MaprClient) Start(wg *sync.WaitGroup) (status int) { - defer wg.Done() - - if c.query.Outfile == "" { - // Only print out periodic results if we don't write an outfile - go c.periodicPrintResults() - } - - status = c.baseClient.Start(nil) - if c.additative { - c.recievedFinalResult() - } - c.baseClient.Stop() - - return -} - -func (c *MaprClient) recievedFinalResult() { - logger.Info("Received final mapreduce result") - - if c.query.Outfile == "" { - c.printResults() - return - } - - logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile)) - err := c.globalGroup.WriteResult(c.query) - if err != nil { - logger.FatalExit(err) - return - } - logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile)) -} - -func (c *MaprClient) periodicPrintResults() { - for { - select { - case <-time.After(c.query.Interval): - logger.Info("Gathering interim mapreduce result") - c.printResults() - case <-c.baseClient.stop: - return - } - } -} - -func (c *MaprClient) printResults() { - var result string - var err error - var numLines int - - if c.additative { - result, numLines, err = c.globalGroup.Result(c.query) - } else { - result, numLines, err = c.globalGroup.SwapOut().Result(c.query) - } - if err != nil { - logger.FatalExit(err) - } - if numLines > 0 { - logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) - logger.Raw(result) - } -} |
