summaryrefslogtreecommitdiff
path: root/clients/maprclient.go
diff options
context:
space:
mode:
Diffstat (limited to 'clients/maprclient.go')
-rw-r--r--clients/maprclient.go153
1 files changed, 0 insertions, 153 deletions
diff --git a/clients/maprclient.go b/clients/maprclient.go
deleted file mode 100644
index ad707c9..0000000
--- a/clients/maprclient.go
+++ /dev/null
@@ -1,153 +0,0 @@
-package clients
-
-import (
- "dtail/clients/handlers"
- "dtail/clients/remote"
- "dtail/logger"
- "dtail/mapr"
- "dtail/omode"
- "dtail/ssh/client"
- "errors"
- "fmt"
- "strings"
- "sync"
- "time"
-
- gossh "golang.org/x/crypto/ssh"
-)
-
-// MaprClient is used for running mapreduce aggregations on remote files.
-type MaprClient struct {
- baseClient
- // Query string for mapr aggregations
- queryStr string
- // Global group set for merged mapr aggregation results
- globalGroup *mapr.GlobalGroupSet
- // The query object (constructed from queryStr)
- query *mapr.Query
- // Additative result or new result every run?
- additative bool
-}
-
-// NewMaprClient returns a new mapreduce client.
-func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
- if queryStr == "" {
- return nil, errors.New("No mapreduce query specified, use '-query' flag")
- }
-
- c := MaprClient{
- baseClient: baseClient{
- Args: args,
- stop: make(chan struct{}),
- stopped: make(chan struct{}),
- throttleCh: make(chan struct{}, args.MaxInitConnections),
- retry: args.Mode == omode.TailClient,
- },
- queryStr: queryStr,
- additative: args.Mode == omode.MapClient,
- }
-
- query, err := mapr.NewQuery(c.queryStr)
- if err != nil {
- logger.FatalExit(c.queryStr, "Can't parse mapr query", err)
- }
-
- c.query = query
-
- switch c.query.Table {
- case "*":
- c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|")
- case ".":
- c.Regex = "."
- default:
- c.Regex = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table)
- }
-
- c.globalGroup = mapr.NewGlobalGroupSet()
- c.baseClient.init(c)
-
- return &c, nil
-}
-
-func (c MaprClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
- conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
- conn.Handler = handlers.NewMaprHandler(conn.Server, c.query, c.globalGroup, c.PingTimeout)
-
- conn.Commands = append(conn.Commands, fmt.Sprintf("map %s", c.query.RawQuery))
- commandStr := "tail"
- if c.additative {
- commandStr = "cat"
- }
-
- for _, file := range strings.Split(c.Files, ",") {
- conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", commandStr, file, c.Regex))
- }
-
- return conn
-}
-
-// Start starts the mapreduce client.
-func (c *MaprClient) Start(wg *sync.WaitGroup) (status int) {
- defer wg.Done()
-
- if c.query.Outfile == "" {
- // Only print out periodic results if we don't write an outfile
- go c.periodicPrintResults()
- }
-
- status = c.baseClient.Start(nil)
- if c.additative {
- c.recievedFinalResult()
- }
- c.baseClient.Stop()
-
- return
-}
-
-func (c *MaprClient) recievedFinalResult() {
- logger.Info("Received final mapreduce result")
-
- if c.query.Outfile == "" {
- c.printResults()
- return
- }
-
- logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile))
- err := c.globalGroup.WriteResult(c.query)
- if err != nil {
- logger.FatalExit(err)
- return
- }
- logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile))
-}
-
-func (c *MaprClient) periodicPrintResults() {
- for {
- select {
- case <-time.After(c.query.Interval):
- logger.Info("Gathering interim mapreduce result")
- c.printResults()
- case <-c.baseClient.stop:
- return
- }
- }
-}
-
-func (c *MaprClient) printResults() {
- var result string
- var err error
- var numLines int
-
- if c.additative {
- result, numLines, err = c.globalGroup.Result(c.query)
- } else {
- result, numLines, err = c.globalGroup.SwapOut().Result(c.query)
- }
- if err != nil {
- logger.FatalExit(err)
- }
- if numLines > 0 {
- logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
- logger.Raw(result)
- }
-}