diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-01-20 18:41:05 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-01-21 14:35:23 +0000 |
| commit | c128865c4c7411c29a59fca9a3a2f95537686d7b (patch) | |
| tree | 193bccc70d942c8b70cc93fae2670263701e43aa /clients | |
| parent | 3755a9911ecb05886577095f2b8cc8b9e4066a3a (diff) | |
Move commands to cmd/ and move internal dependencies to internal/
Diffstat (limited to 'clients')
| -rw-r--r-- | clients/args.go | 26 | ||||
| -rw-r--r-- | clients/baseclient.go | 139 | ||||
| -rw-r--r-- | clients/catclient.go | 49 | ||||
| -rw-r--r-- | clients/client.go | 9 | ||||
| -rw-r--r-- | clients/connectionmaker.go | 12 | ||||
| -rw-r--r-- | clients/grepclient.go | 49 | ||||
| -rw-r--r-- | clients/handlers/basehandler.go | 134 | ||||
| -rw-r--r-- | clients/handlers/clienthandler.go | 26 | ||||
| -rw-r--r-- | clients/handlers/handler.go | 12 | ||||
| -rw-r--r-- | clients/handlers/healthhandler.go | 75 | ||||
| -rw-r--r-- | clients/handlers/maprhandler.go | 74 | ||||
| -rw-r--r-- | clients/healthclient.go | 96 | ||||
| -rw-r--r-- | clients/maprclient.go | 153 | ||||
| -rw-r--r-- | clients/remote/connection.go | 230 | ||||
| -rw-r--r-- | clients/stats.go | 81 | ||||
| -rw-r--r-- | clients/tailclient.go | 44 |
16 files changed, 0 insertions, 1209 deletions
diff --git a/clients/args.go b/clients/args.go deleted file mode 100644 index 4d5a029..0000000 --- a/clients/args.go +++ /dev/null @@ -1,26 +0,0 @@ -package clients - -import ( - "dtail/omode" -) - -// Args is a helper struct to summarize common client arguments. -type Args struct { - // The operating mode (tail, grep, ...) - Mode omode.Mode - // The raw server string - ServersStr string - // SSH user name (e.g. 'pbuetow') - UserName string - // The files to follow. - Files string - // Regex for filtering. - Regex string - // Trust all unknown host keys? - TrustAllHosts bool - // Server discovery method - Discovery string - MaxInitConnections int - // Server ping timeout (0 means pings disabled) - PingTimeout int -} diff --git a/clients/baseclient.go b/clients/baseclient.go deleted file mode 100644 index 3a1b8f0..0000000 --- a/clients/baseclient.go +++ /dev/null @@ -1,139 +0,0 @@ -package clients - -import ( - "dtail/clients/remote" - "dtail/discovery" - "dtail/logger" - "dtail/omode" - "dtail/ssh/client" - "regexp" - "sync" - "time" - - gossh "golang.org/x/crypto/ssh" -) - -// This is the main client data structure. -type baseClient struct { - Args - // To display client side stats - stats *stats - // List of remote servers to connect to. - servers []string - // We have one connection per remote server. - connections []*remote.Connection - // SSH auth methods to use to connect to the remote servers. - sshAuthMethods []gossh.AuthMethod - // To deal with SSH host keys - hostKeyCallback *client.HostKeyCallback - // To stop the client. - stop chan struct{} - // To indicate that the client has stopped. - stopped chan struct{} - // Throttle how fast we initiate SSH connections concurrently - throttleCh chan struct{} - // Retry connection upon failure? - retry bool - // Connection helper. - maker connectionMaker -} - -func (c *baseClient) init(maker connectionMaker) { - logger.Info("Initiating base client") - - c.maker = maker - //c.connections = make(map[string]*remote.Connection) - c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.TrustAllHosts, c.throttleCh) - - // Retrieve a shuffled list of remote dtail servers. - shuffleServers := true - discoveryService := discovery.New(c.Discovery, c.ServersStr, shuffleServers) - for _, server := range discoveryService.ServerList() { - c.connections = append(c.connections, c.maker.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback)) - } - - if _, err := regexp.Compile(c.Regex); err != nil { - logger.FatalExit(c.Regex, "Can't test compile regex", err) - } - - // Periodically check for unknown hosts, and ask the user whether to trust them or not. - go c.hostKeyCallback.PromptAddHosts(c.stop) - - // Periodically print out connection stats to the client. - c.stats = newTailStats(len(c.connections)) - go c.stats.periodicLogStats(c.throttleCh, c.stop) -} - -func (c *baseClient) Start(wg *sync.WaitGroup) (status int) { - if wg != nil { - defer wg.Done() - } - active := make(chan struct{}, len(c.connections)) - - var wg2 sync.WaitGroup - wg2.Add(len(c.connections)) - - for i, conn := range c.connections { - go func(i int, conn *remote.Connection) { - active <- struct{}{} - defer func() { - logger.Debug(conn.Server, "Disconnected completely...") - <-active - }() - wg2.Done() - - for { - conn.Start(c.throttleCh, c.stats.connectionsEstCh) - if !c.retry { - return - } - time.Sleep(time.Second * 2) - logger.Debug(conn.Server, "Reconencting") - conn = c.maker.makeConnection(conn.Server, c.sshAuthMethods, c.hostKeyCallback) - c.connections[i] = conn - } - }(i, conn) - } - - wg2.Wait() - c.waitUntilDone(active) - - return -} - -func (c *baseClient) waitUntilDone(active chan struct{}) { - defer close(c.stopped) - - if c.Mode != omode.TailClient { - c.waitUntilZero(active) - logger.Info("All connections stopped") - return - } - - <-c.stop - logger.Info("Stopping client") - for _, conn := range c.connections { - conn.Stop() - } - - c.waitUntilZero(active) -} - -func (c *baseClient) waitUntilZero(active chan struct{}) { - for { - logger.Debug("Active connections", len(active)) - if len(active) == 0 { - return - } - time.Sleep(time.Second) - } -} - -func (c *baseClient) Stop() { - close(c.stop) - <-c.WaitC() -} - -func (c *baseClient) WaitC() <-chan struct{} { - return c.stopped -} diff --git a/clients/catclient.go b/clients/catclient.go deleted file mode 100644 index e3b873c..0000000 --- a/clients/catclient.go +++ /dev/null @@ -1,49 +0,0 @@ -package clients - -import ( - "dtail/clients/handlers" - "dtail/clients/remote" - "dtail/ssh/client" - "errors" - "fmt" - "strings" - - gossh "golang.org/x/crypto/ssh" -) - -// CatClient is a client for returning a whole file from the beginning to the end. -type CatClient struct { - baseClient -} - -// NewCatClient returns a new cat client. -func NewCatClient(args Args) (*CatClient, error) { - if args.Regex != "" { - return nil, errors.New("Can't use regex with 'cat' operating mode") - } - - args.Regex = "." - - c := CatClient{ - baseClient: baseClient{ - Args: args, - stop: make(chan struct{}), - stopped: make(chan struct{}), - throttleCh: make(chan struct{}, args.MaxInitConnections), - retry: false, - }, - } - - c.init(c) - - return &c, nil -} - -func (c CatClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { - conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) - conn.Handler = handlers.NewClientHandler(server, c.PingTimeout) - for _, file := range strings.Split(c.Files, ",") { - conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex)) - } - return conn -} diff --git a/clients/client.go b/clients/client.go deleted file mode 100644 index e58f51d..0000000 --- a/clients/client.go +++ /dev/null @@ -1,9 +0,0 @@ -package clients - -import "sync" - -// Client is the interface for the end user command line client. -type Client interface { - Start(wg *sync.WaitGroup) int - Stop() -} diff --git a/clients/connectionmaker.go b/clients/connectionmaker.go deleted file mode 100644 index 9e08c2b..0000000 --- a/clients/connectionmaker.go +++ /dev/null @@ -1,12 +0,0 @@ -package clients - -import ( - "dtail/clients/remote" - "dtail/ssh/client" - - gossh "golang.org/x/crypto/ssh" -) - -type connectionMaker interface { - makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection -} diff --git a/clients/grepclient.go b/clients/grepclient.go deleted file mode 100644 index dbae96c..0000000 --- a/clients/grepclient.go +++ /dev/null @@ -1,49 +0,0 @@ -package clients - -import ( - "dtail/clients/handlers" - "dtail/clients/remote" - "dtail/ssh/client" - "errors" - "fmt" - "strings" - - gossh "golang.org/x/crypto/ssh" -) - -// GrepClient searches a remote file for all lines matching a regular expression. Only the matching lines are displayed. -type GrepClient struct { - baseClient -} - -// NewGrepClient creates a new grep client. -func NewGrepClient(args Args) (*GrepClient, error) { - if args.Regex == "" { - return nil, errors.New("No regex specified, use '-regex' flag") - } - - c := GrepClient{ - baseClient: baseClient{ - Args: args, - stop: make(chan struct{}), - stopped: make(chan struct{}), - throttleCh: make(chan struct{}, args.MaxInitConnections), - retry: false, - }, - } - - c.init(c) - - return &c, nil -} - -func (c GrepClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { - conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) - conn.Handler = handlers.NewClientHandler(server, c.PingTimeout) - - for _, file := range strings.Split(c.Files, ",") { - conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex)) - } - - return conn -} diff --git a/clients/handlers/basehandler.go b/clients/handlers/basehandler.go deleted file mode 100644 index ce82aa2..0000000 --- a/clients/handlers/basehandler.go +++ /dev/null @@ -1,134 +0,0 @@ -package handlers - -import ( - "dtail/logger" - "errors" - "fmt" - "io" - "strings" - "time" -) - -type baseHandler struct { - server string - shellStarted bool - commands chan string - pong chan struct{} - receiveBuf []byte - stop chan struct{} - pingTimeout int -} - -func (h *baseHandler) Server() string { - return h.server -} - -// Used to determine whether server is still responding to requests or not. -func (h *baseHandler) Ping() error { - if h.pingTimeout == 0 { - // Server ping disabled - return nil - } - - if err := h.SendCommand("ping"); err != nil { - return err - } - - select { - case <-h.pong: - return nil - case <-time.After(time.Duration(h.pingTimeout) * time.Second): - } - - return errors.New("Didn't receive any server pongs (ping replies)") -} - -func (h *baseHandler) SendCommand(command string) error { - if command == "ping" { - logger.Trace("Sending command", h.server, command) - } else { - logger.Debug("Sending command", h.server, command) - } - - select { - case h.commands <- fmt.Sprintf("%s;", command): - case <-time.After(time.Second * 5): - return errors.New("Timed out sending command " + command) - case <-h.stop: - } - - return nil -} - -// Read data from the dtail server via Writer interface. -func (h *baseHandler) Write(p []byte) (n int, err error) { - for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { - if len(h.receiveBuf) == 0 { - continue - } - message := string(h.receiveBuf) - h.handleMessageType(message) - } - } - - return len(p), nil -} - -// Send data to the dtail server via Reader interface. -func (h *baseHandler) Read(p []byte) (n int, err error) { - select { - case command := <-h.commands: - n = copy(p, []byte(command)) - case <-h.stop: - return 0, io.EOF - } - return -} - -// Handle various message types. -func (h *baseHandler) handleMessageType(message string) { - if len(h.receiveBuf) == 0 { - return - } - // Hidden server commands starti with a dot "." - if h.receiveBuf[0] == '.' { - h.handleHiddenMessage(message) - h.receiveBuf = h.receiveBuf[:0] - return - } - - // Silent mode will only print out remote logs but not remote server - // commands. But remote server commands will be still logged to ./log/. - if logger.Mode == logger.SilentMode { - if h.receiveBuf[0] == 'R' { - logger.Raw(message) - } - h.receiveBuf = h.receiveBuf[:0] - return - } - logger.Raw(message) - h.receiveBuf = h.receiveBuf[:0] -} - -// Handle messages received from server which are not meant to be displayed -// to the end user. -func (h *baseHandler) handleHiddenMessage(message string) { - switch { - case strings.HasPrefix(message, ".pong"): - h.pong <- struct{}{} - case strings.HasPrefix(message, ".syn close connection"): - h.SendCommand("ack close connection") - } -} - -// Stop the handler. -func (h *baseHandler) Stop() { - select { - case <-h.stop: - default: - logger.Debug("Stopping base handler", h.server) - close(h.stop) - } -} diff --git a/clients/handlers/clienthandler.go b/clients/handlers/clienthandler.go deleted file mode 100644 index e818b52..0000000 --- a/clients/handlers/clienthandler.go +++ /dev/null @@ -1,26 +0,0 @@ -package handlers - -import ( - "dtail/logger" -) - -// ClientHandler is the basic client handler interface. -type ClientHandler struct { - baseHandler -} - -// NewClientHandler creates a new client handler. -func NewClientHandler(server string, pingTimeout int) *ClientHandler { - logger.Debug(server, "Creating new client handler") - - return &ClientHandler{ - baseHandler{ - server: server, - shellStarted: false, - commands: make(chan string), - pong: make(chan struct{}, 1), - stop: make(chan struct{}), - pingTimeout: pingTimeout, - }, - } -} diff --git a/clients/handlers/handler.go b/clients/handlers/handler.go deleted file mode 100644 index 2013be0..0000000 --- a/clients/handlers/handler.go +++ /dev/null @@ -1,12 +0,0 @@ -package handlers - -import "io" - -// Handler provides all methods which can be run on any client handler. -type Handler interface { - io.ReadWriter - Ping() error - Stop() - SendCommand(command string) error - Server() string -} diff --git a/clients/handlers/healthhandler.go b/clients/handlers/healthhandler.go deleted file mode 100644 index 4051e2c..0000000 --- a/clients/handlers/healthhandler.go +++ /dev/null @@ -1,75 +0,0 @@ -package handlers - -import ( - "errors" - "fmt" - "time" -) - -// HealthHandler implements the handler required for health checks. -type HealthHandler struct { - // Buffer of incoming data from server. - receiveBuf []byte - // To send commands to the server. - commands chan string - // To receive messages from the server. - receive chan<- string - // The remote server address - server string -} - -// NewHealthHandler returns a new health check handler. -func NewHealthHandler(server string, receive chan<- string) *HealthHandler { - h := HealthHandler{ - server: server, - receive: receive, - commands: make(chan string), - } - - return &h -} - -// Server returns the remote server name. -func (h *HealthHandler) Server() string { - return h.server -} - -// Stop is not of use for health check handler. -func (h *HealthHandler) Stop() { - // Nothing done here. -} - -// Ping is not of use for health check handler. -func (h *HealthHandler) Ping() error { - return nil -} - -// SendCommand send a DTail command to the server. -func (h *HealthHandler) SendCommand(command string) error { - select { - case h.commands <- fmt.Sprintf("%s;", command): - case <-time.NewTimer(time.Second * 10).C: - return errors.New("Timed out sending command " + command) - } - - return nil -} - -// Server writes byte stream to client. -func (h *HealthHandler) Write(p []byte) (n int, err error) { - for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { - h.receive <- string(h.receiveBuf) - h.receiveBuf = h.receiveBuf[:0] - } - } - - return len(p), nil -} - -// Server reads byte stream from client. -func (h *HealthHandler) Read(p []byte) (n int, err error) { - n = copy(p, []byte(<-h.commands)) - return -} diff --git a/clients/handlers/maprhandler.go b/clients/handlers/maprhandler.go deleted file mode 100644 index 830a142..0000000 --- a/clients/handlers/maprhandler.go +++ /dev/null @@ -1,74 +0,0 @@ -package handlers - -import ( - "dtail/logger" - "dtail/mapr" - "dtail/mapr/client" - "strings" -) - -// MaprHandler is the handler used on the client side for running mapreduce aggregations. -type MaprHandler struct { - baseHandler - aggregate *client.Aggregate - query *mapr.Query - count uint64 -} - -// NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet, pingTimeout int) *MaprHandler { - return &MaprHandler{ - baseHandler: baseHandler{ - server: server, - shellStarted: false, - commands: make(chan string), - pong: make(chan struct{}, 1), - stop: make(chan struct{}), - pingTimeout: pingTimeout, - }, - query: query, - aggregate: client.NewAggregate(server, query, globalGroup), - } -} - -// Read data from the dtail server via Writer interface. -func (h *MaprHandler) Write(p []byte) (n int, err error) { - for _, b := range p { - h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == '\n' { - if len(h.baseHandler.receiveBuf) == 0 { - continue - } - message := string(h.baseHandler.receiveBuf) - - if h.baseHandler.receiveBuf[0] == 'A' { - h.handleAggregateMessage(strings.TrimSpace(message)) - h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] - continue - } - h.baseHandler.handleMessageType(message) - } - } - - return len(p), nil -} - -// Handle a message received from server including mapr aggregation -// related data. -func (h *MaprHandler) handleAggregateMessage(message string) { - h.count++ - parts := strings.Split(message, "|") - - // Index 0 contains 'AGGREGATE', 1 contains server host. - // Aggregation data begins from index 2. - logger.Debug("Received aggregate data", h.server, h.count) - h.aggregate.Aggregate(parts[2:]) - logger.Debug("Aggregated aggregate data", h.server, h.count) -} - -// Stop stops the mapreduce client handler. -func (h *MaprHandler) Stop() { - logger.Debug("Stopping mapreduce handler", h.server) - h.aggregate.Stop() - h.baseHandler.Stop() -} diff --git a/clients/healthclient.go b/clients/healthclient.go deleted file mode 100644 index 1fae99c..0000000 --- a/clients/healthclient.go +++ /dev/null @@ -1,96 +0,0 @@ -package clients - -import ( - "dtail/clients/handlers" - "dtail/clients/remote" - "dtail/config" - "dtail/omode" - "fmt" - "runtime" - "strings" - "sync" - "time" - - gossh "golang.org/x/crypto/ssh" -) - -// HealthClient is used for health checking (e.g. via Nagios) -type HealthClient struct { - // Client operating mode - mode omode.Mode - // The remote server address - server string - // SSH user name - userName string - // SSH auth methods to use to connect to the remote servers. - sshAuthMethods []gossh.AuthMethod -} - -// NewHealthClient returns a new healh client. -func NewHealthClient(mode omode.Mode) (*HealthClient, error) { - c := HealthClient{ - mode: mode, - server: fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort), - userName: config.ControlUser, - } - c.initSSHAuthMethods() - - return &c, nil -} - -// Start the health client. -func (c *HealthClient) Start(wg *sync.WaitGroup) (status int) { - defer wg.Done() - receive := make(chan string) - - throttleCh := make(chan struct{}, runtime.NumCPU()) - statsCh := make(chan struct{}, 1) - - conn := remote.NewOneOffConnection(c.server, c.userName, c.sshAuthMethods) - conn.Handler = handlers.NewHealthHandler(c.server, receive) - conn.Commands = []string{c.mode.String()} - - go conn.Start(throttleCh, statsCh) - defer conn.Stop() - - for { - select { - case data := <-receive: - // Parse recieved data. - s := strings.Split(data, "|") - message := s[len(s)-1] - if strings.HasPrefix(message, "done;") { - return - } - - // Set severity. - s = strings.Split(message, ":") - switch s[0] { - case "OK": - case "WARNING": - if status < 1 { - status = 1 - } - case "CRITICAL": - status = 2 - case "UNKNOWN": - status = 3 - default: - fmt.Printf("CRITICAL: Unexpected server response: '%s'\n", message) - status = 2 - return - } - fmt.Print(message) - - case <-time.After(time.Second * 2): - status = 2 - fmt.Println("CRITICAL: Could not communicate with DTail server") - return - } - } -} - -// Initialize SSH auth methods. -func (c *HealthClient) initSSHAuthMethods() { - c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser)) -} diff --git a/clients/maprclient.go b/clients/maprclient.go deleted file mode 100644 index ad707c9..0000000 --- a/clients/maprclient.go +++ /dev/null @@ -1,153 +0,0 @@ -package clients - -import ( - "dtail/clients/handlers" - "dtail/clients/remote" - "dtail/logger" - "dtail/mapr" - "dtail/omode" - "dtail/ssh/client" - "errors" - "fmt" - "strings" - "sync" - "time" - - gossh "golang.org/x/crypto/ssh" -) - -// MaprClient is used for running mapreduce aggregations on remote files. -type MaprClient struct { - baseClient - // Query string for mapr aggregations - queryStr string - // Global group set for merged mapr aggregation results - globalGroup *mapr.GlobalGroupSet - // The query object (constructed from queryStr) - query *mapr.Query - // Additative result or new result every run? - additative bool -} - -// NewMaprClient returns a new mapreduce client. -func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { - if queryStr == "" { - return nil, errors.New("No mapreduce query specified, use '-query' flag") - } - - c := MaprClient{ - baseClient: baseClient{ - Args: args, - stop: make(chan struct{}), - stopped: make(chan struct{}), - throttleCh: make(chan struct{}, args.MaxInitConnections), - retry: args.Mode == omode.TailClient, - }, - queryStr: queryStr, - additative: args.Mode == omode.MapClient, - } - - query, err := mapr.NewQuery(c.queryStr) - if err != nil { - logger.FatalExit(c.queryStr, "Can't parse mapr query", err) - } - - c.query = query - - switch c.query.Table { - case "*": - c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|") - case ".": - c.Regex = "." - default: - c.Regex = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table) - } - - c.globalGroup = mapr.NewGlobalGroupSet() - c.baseClient.init(c) - - return &c, nil -} - -func (c MaprClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { - conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) - conn.Handler = handlers.NewMaprHandler(conn.Server, c.query, c.globalGroup, c.PingTimeout) - - conn.Commands = append(conn.Commands, fmt.Sprintf("map %s", c.query.RawQuery)) - commandStr := "tail" - if c.additative { - commandStr = "cat" - } - - for _, file := range strings.Split(c.Files, ",") { - conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", commandStr, file, c.Regex)) - } - - return conn -} - -// Start starts the mapreduce client. -func (c *MaprClient) Start(wg *sync.WaitGroup) (status int) { - defer wg.Done() - - if c.query.Outfile == "" { - // Only print out periodic results if we don't write an outfile - go c.periodicPrintResults() - } - - status = c.baseClient.Start(nil) - if c.additative { - c.recievedFinalResult() - } - c.baseClient.Stop() - - return -} - -func (c *MaprClient) recievedFinalResult() { - logger.Info("Received final mapreduce result") - - if c.query.Outfile == "" { - c.printResults() - return - } - - logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile)) - err := c.globalGroup.WriteResult(c.query) - if err != nil { - logger.FatalExit(err) - return - } - logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile)) -} - -func (c *MaprClient) periodicPrintResults() { - for { - select { - case <-time.After(c.query.Interval): - logger.Info("Gathering interim mapreduce result") - c.printResults() - case <-c.baseClient.stop: - return - } - } -} - -func (c *MaprClient) printResults() { - var result string - var err error - var numLines int - - if c.additative { - result, numLines, err = c.globalGroup.Result(c.query) - } else { - result, numLines, err = c.globalGroup.SwapOut().Result(c.query) - } - if err != nil { - logger.FatalExit(err) - } - if numLines > 0 { - logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) - logger.Raw(result) - } -} diff --git a/clients/remote/connection.go b/clients/remote/connection.go deleted file mode 100644 index bd93239..0000000 --- a/clients/remote/connection.go +++ /dev/null @@ -1,230 +0,0 @@ -package remote - -import ( - "dtail/clients/handlers" - "dtail/config" - "dtail/logger" - "dtail/ssh/client" - "fmt" - "io" - "strconv" - "strings" - "time" - - "golang.org/x/crypto/ssh" -) - -// Connection represents a client connection connection to a single server. -type Connection struct { - // The remote server's hostname connected to. - Server string - // The remote server's port connected to. - port int - // The SSH client configuration used. - config *ssh.ClientConfig - // The SSH client handler to use. - Handler handlers.Handler - // DTail commands sent from client to server. When client loses - // connection to the server it re-connects automatically and sends the - // same commands again. - Commands []string - // Is it a persistent connection or a one-off? - isOneOff bool - // Used to stop the connection - stop chan struct{} - // To deal with SSH server host keys - hostKeyCallback *client.HostKeyCallback -} - -// NewConnection returns a new connection. -func NewConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *Connection { - logger.Debug(server, "Creating new connection") - - c := Connection{ - hostKeyCallback: hostKeyCallback, - config: &ssh.ClientConfig{ - User: userName, - Auth: authMethods, - HostKeyCallback: hostKeyCallback.Wrap(), - Timeout: time.Second * 3, - }, - stop: make(chan struct{}), - } - - c.initServerPort(server) - - return &c -} - -// NewOneOffConnection creates new one-off connection (only for sending a series of commands and then quit). -func NewOneOffConnection(server string, userName string, authMethods []ssh.AuthMethod) *Connection { - c := Connection{ - config: &ssh.ClientConfig{ - User: userName, - Auth: authMethods, - HostKeyCallback: ssh.InsecureIgnoreHostKey(), - }, - stop: make(chan struct{}), - isOneOff: true, - } - - c.initServerPort(server) - - return &c -} - -// Attempt to parse the server port address from the provided server FQDN. -func (c *Connection) initServerPort(server string) { - c.Server = server - c.port = config.Common.SSHPort - parts := strings.Split(server, ":") - - if len(parts) == 2 { - logger.Debug("Parsing port from hostname", parts) - port, err := strconv.Atoi(parts[1]) - if err != nil { - logger.FatalExit("Unable to parse client port", server, parts, err) - } - c.Server = parts[0] - c.port = port - } -} - -// Start the server connection. Build up SSH session and send some DTail commandc. -func (c *Connection) Start(throttleCh, statsCh chan struct{}) { - select { - case <-c.stop: - logger.Info(c.Server, c.port, "Disconnecting client") - return - default: - } - - // Wait for SSH connection throttler - throttleCh <- struct{}{} - - // Wait until connection has been initiated or an error occured - // during initialization. - throttleStopCh := make(chan struct{}, 2) - go func() { - <-throttleStopCh - <-throttleCh - }() - - if err := c.dial(c.Server, c.port, throttleStopCh, statsCh); err != nil { - logger.Warn(c.Server, c.port, err) - throttleStopCh <- struct{}{} - - if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.Server, c.port)) { - logger.Debug("Not trusting host, not trying to re-connect", c.Server, c.port) - return - } - } -} - -// Dail into a new SSH connection. Close connection in case of an error. -func (c *Connection) dial(host string, port int, throttleStopCh, statsCh chan struct{}) error { - statsCh <- struct{}{} - defer func() { <-statsCh }() - - logger.Debug(host, "dial") - address := fmt.Sprintf("%s:%d", host, port) - - client, err := ssh.Dial("tcp", address, c.config) - if err != nil { - return err - } - defer client.Close() - - return c.session(client, throttleStopCh) -} - -// Create the SSH session. Close the session in case of an error. -func (c *Connection) session(client *ssh.Client, throttleStopCh chan<- struct{}) error { - logger.Debug(c.Server, "session") - - session, err := client.NewSession() - if err != nil { - return err - } - defer session.Close() - - return c.handle(session, throttleStopCh) -} - -// Handle the SSH session. Also send periodic pings to the server in order -// to determine that session is still intact. -func (c *Connection) handle(session *ssh.Session, throttleStopCh chan<- struct{}) error { - defer c.Handler.Stop() - - logger.Debug(c.Server, "handle") - - stdinPipe, err := session.StdinPipe() - if err != nil { - return err - } - - stdoutPipe, err := session.StdoutPipe() - if err != nil { - return err - } - - if err := session.Shell(); err != nil { - return err - } - - // Establish Bi-directional pipe between SSH session and client handler. - brokenStdinPipe := make(chan struct{}) - go func() { - defer close(brokenStdinPipe) - io.Copy(stdinPipe, c.Handler) - }() - - brokenStdoutPipe := make(chan struct{}) - go func() { - defer close(brokenStdoutPipe) - io.Copy(c.Handler, stdoutPipe) - }() - - // SSH session established, other goroutine can initiate session now. - throttleStopCh <- struct{}{} - - // Send all commands to client. - for _, command := range c.Commands { - logger.Debug(command) - c.Handler.SendCommand(command) - } - - if !c.isOneOff { - return c.periodicAliveCheck(brokenStdinPipe, brokenStdoutPipe) - } - - <-c.stop - - // Normal shutdown, all fine - return nil -} - -// Periodically check whether connection is still alive or not. -func (c *Connection) periodicAliveCheck(brokenStdinPipe, brokenStdoutPipe <-chan struct{}) error { - for { - select { - case <-time.After(time.Second * 3): - if err := c.Handler.Ping(); err != nil { - return err - } - case <-brokenStdinPipe: - logger.Debug("Broken stdin pipe", c.Server, c.port) - return nil - case <-brokenStdoutPipe: - logger.Debug("Broken stdout pipe", c.Server, c.port) - return nil - case <-c.stop: - return nil - } - } -} - -// Stop the connection. -func (c *Connection) Stop() { - close(c.stop) -} diff --git a/clients/stats.go b/clients/stats.go deleted file mode 100644 index e5b9bed..0000000 --- a/clients/stats.go +++ /dev/null @@ -1,81 +0,0 @@ -package clients - -import ( - "dtail/logger" - "fmt" - "runtime" - "sync" - "time" -) - -// Used to collect and display various client stats. -type stats struct { - // Total amount servers to connect to. - connectionsTotal int - // To keep track of what connected and disconnected - connectionsEstCh chan struct{} - // Amount of servers connections are established. - connected int - // To synchronize concurrent access. - mutex sync.Mutex -} - -func newTailStats(connectionsTotal int) *stats { - return &stats{ - connectionsTotal: connectionsTotal, - connectionsEstCh: make(chan struct{}, connectionsTotal), - connected: 0, - } -} - -func (s *stats) periodicLogStats(throttleCh chan struct{}, stop <-chan struct{}) { - connectedLast := 0 - statsInterval := 5 - - for { - select { - case <-time.After(time.Second * time.Duration(statsInterval)): - case <-stop: - return - } - - connected := len(s.connectionsEstCh) - throttle := len(throttleCh) - - newConnections := connected - connectedLast - connectionsPerSecond := float64(newConnections) / float64(statsInterval) - s.log(connected, newConnections, connectionsPerSecond, throttle) - - connectedLast = connected - - s.mutex.Lock() - s.connected = connected - s.mutex.Unlock() - } -} - -func (s *stats) numConnected() int { - s.mutex.Lock() - defer s.mutex.Unlock() - - return s.connected -} - -func (s *stats) log(connected, newConnections int, connectionsPerSecond float64, throttle int) { - percConnected := percentOf(float64(s.connectionsTotal), float64(connected)) - - connectedStr := fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected)) - newConnStr := fmt.Sprintf("new=%d", newConnections) - rateStr := fmt.Sprintf("rate=%2.2f/s", connectionsPerSecond) - throttleStr := fmt.Sprintf("throttle=%d", throttle) - cpusGoroutinesStr := fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine()) - - logger.Info("stats", connectedStr, newConnStr, rateStr, throttleStr, cpusGoroutinesStr) -} - -func percentOf(total float64, value float64) float64 { - if total == 0 || total == value { - return 100 - } - return value / (total / 100.0) -} diff --git a/clients/tailclient.go b/clients/tailclient.go deleted file mode 100644 index cb93258..0000000 --- a/clients/tailclient.go +++ /dev/null @@ -1,44 +0,0 @@ -package clients - -import ( - "dtail/clients/handlers" - "dtail/clients/remote" - "dtail/ssh/client" - "fmt" - "strings" - - gossh "golang.org/x/crypto/ssh" -) - -// TailClient is used for tailing remote log files (opening, seeking to the end and returning only new incoming lines). -type TailClient struct { - baseClient -} - -// NewTailClient returns a new TailClient. -func NewTailClient(args Args) (*TailClient, error) { - c := TailClient{ - baseClient: baseClient{ - Args: args, - stop: make(chan struct{}), - stopped: make(chan struct{}), - throttleCh: make(chan struct{}, args.MaxInitConnections), - retry: true, - }, - } - - c.init(c) - - return &c, nil -} - -func (c TailClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection { - conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) - conn.Handler = handlers.NewClientHandler(server, c.PingTimeout) - - for _, file := range strings.Split(c.Files, ",") { - conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex)) - } - - return conn -} |
