summaryrefslogtreecommitdiff
path: root/clients
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-20 18:41:05 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-01-21 14:35:23 +0000
commitc128865c4c7411c29a59fca9a3a2f95537686d7b (patch)
tree193bccc70d942c8b70cc93fae2670263701e43aa /clients
parent3755a9911ecb05886577095f2b8cc8b9e4066a3a (diff)
Move commands to cmd/ and move internal dependencies to internal/
Diffstat (limited to 'clients')
-rw-r--r--clients/args.go26
-rw-r--r--clients/baseclient.go139
-rw-r--r--clients/catclient.go49
-rw-r--r--clients/client.go9
-rw-r--r--clients/connectionmaker.go12
-rw-r--r--clients/grepclient.go49
-rw-r--r--clients/handlers/basehandler.go134
-rw-r--r--clients/handlers/clienthandler.go26
-rw-r--r--clients/handlers/handler.go12
-rw-r--r--clients/handlers/healthhandler.go75
-rw-r--r--clients/handlers/maprhandler.go74
-rw-r--r--clients/healthclient.go96
-rw-r--r--clients/maprclient.go153
-rw-r--r--clients/remote/connection.go230
-rw-r--r--clients/stats.go81
-rw-r--r--clients/tailclient.go44
16 files changed, 0 insertions, 1209 deletions
diff --git a/clients/args.go b/clients/args.go
deleted file mode 100644
index 4d5a029..0000000
--- a/clients/args.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package clients
-
-import (
- "dtail/omode"
-)
-
-// Args is a helper struct to summarize common client arguments.
-type Args struct {
- // The operating mode (tail, grep, ...)
- Mode omode.Mode
- // The raw server string
- ServersStr string
- // SSH user name (e.g. 'pbuetow')
- UserName string
- // The files to follow.
- Files string
- // Regex for filtering.
- Regex string
- // Trust all unknown host keys?
- TrustAllHosts bool
- // Server discovery method
- Discovery string
- MaxInitConnections int
- // Server ping timeout (0 means pings disabled)
- PingTimeout int
-}
diff --git a/clients/baseclient.go b/clients/baseclient.go
deleted file mode 100644
index 3a1b8f0..0000000
--- a/clients/baseclient.go
+++ /dev/null
@@ -1,139 +0,0 @@
-package clients
-
-import (
- "dtail/clients/remote"
- "dtail/discovery"
- "dtail/logger"
- "dtail/omode"
- "dtail/ssh/client"
- "regexp"
- "sync"
- "time"
-
- gossh "golang.org/x/crypto/ssh"
-)
-
-// This is the main client data structure.
-type baseClient struct {
- Args
- // To display client side stats
- stats *stats
- // List of remote servers to connect to.
- servers []string
- // We have one connection per remote server.
- connections []*remote.Connection
- // SSH auth methods to use to connect to the remote servers.
- sshAuthMethods []gossh.AuthMethod
- // To deal with SSH host keys
- hostKeyCallback *client.HostKeyCallback
- // To stop the client.
- stop chan struct{}
- // To indicate that the client has stopped.
- stopped chan struct{}
- // Throttle how fast we initiate SSH connections concurrently
- throttleCh chan struct{}
- // Retry connection upon failure?
- retry bool
- // Connection helper.
- maker connectionMaker
-}
-
-func (c *baseClient) init(maker connectionMaker) {
- logger.Info("Initiating base client")
-
- c.maker = maker
- //c.connections = make(map[string]*remote.Connection)
- c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.TrustAllHosts, c.throttleCh)
-
- // Retrieve a shuffled list of remote dtail servers.
- shuffleServers := true
- discoveryService := discovery.New(c.Discovery, c.ServersStr, shuffleServers)
- for _, server := range discoveryService.ServerList() {
- c.connections = append(c.connections, c.maker.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback))
- }
-
- if _, err := regexp.Compile(c.Regex); err != nil {
- logger.FatalExit(c.Regex, "Can't test compile regex", err)
- }
-
- // Periodically check for unknown hosts, and ask the user whether to trust them or not.
- go c.hostKeyCallback.PromptAddHosts(c.stop)
-
- // Periodically print out connection stats to the client.
- c.stats = newTailStats(len(c.connections))
- go c.stats.periodicLogStats(c.throttleCh, c.stop)
-}
-
-func (c *baseClient) Start(wg *sync.WaitGroup) (status int) {
- if wg != nil {
- defer wg.Done()
- }
- active := make(chan struct{}, len(c.connections))
-
- var wg2 sync.WaitGroup
- wg2.Add(len(c.connections))
-
- for i, conn := range c.connections {
- go func(i int, conn *remote.Connection) {
- active <- struct{}{}
- defer func() {
- logger.Debug(conn.Server, "Disconnected completely...")
- <-active
- }()
- wg2.Done()
-
- for {
- conn.Start(c.throttleCh, c.stats.connectionsEstCh)
- if !c.retry {
- return
- }
- time.Sleep(time.Second * 2)
- logger.Debug(conn.Server, "Reconencting")
- conn = c.maker.makeConnection(conn.Server, c.sshAuthMethods, c.hostKeyCallback)
- c.connections[i] = conn
- }
- }(i, conn)
- }
-
- wg2.Wait()
- c.waitUntilDone(active)
-
- return
-}
-
-func (c *baseClient) waitUntilDone(active chan struct{}) {
- defer close(c.stopped)
-
- if c.Mode != omode.TailClient {
- c.waitUntilZero(active)
- logger.Info("All connections stopped")
- return
- }
-
- <-c.stop
- logger.Info("Stopping client")
- for _, conn := range c.connections {
- conn.Stop()
- }
-
- c.waitUntilZero(active)
-}
-
-func (c *baseClient) waitUntilZero(active chan struct{}) {
- for {
- logger.Debug("Active connections", len(active))
- if len(active) == 0 {
- return
- }
- time.Sleep(time.Second)
- }
-}
-
-func (c *baseClient) Stop() {
- close(c.stop)
- <-c.WaitC()
-}
-
-func (c *baseClient) WaitC() <-chan struct{} {
- return c.stopped
-}
diff --git a/clients/catclient.go b/clients/catclient.go
deleted file mode 100644
index e3b873c..0000000
--- a/clients/catclient.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package clients
-
-import (
- "dtail/clients/handlers"
- "dtail/clients/remote"
- "dtail/ssh/client"
- "errors"
- "fmt"
- "strings"
-
- gossh "golang.org/x/crypto/ssh"
-)
-
-// CatClient is a client for returning a whole file from the beginning to the end.
-type CatClient struct {
- baseClient
-}
-
-// NewCatClient returns a new cat client.
-func NewCatClient(args Args) (*CatClient, error) {
- if args.Regex != "" {
- return nil, errors.New("Can't use regex with 'cat' operating mode")
- }
-
- args.Regex = "."
-
- c := CatClient{
- baseClient: baseClient{
- Args: args,
- stop: make(chan struct{}),
- stopped: make(chan struct{}),
- throttleCh: make(chan struct{}, args.MaxInitConnections),
- retry: false,
- },
- }
-
- c.init(c)
-
- return &c, nil
-}
-
-func (c CatClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
- conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
- conn.Handler = handlers.NewClientHandler(server, c.PingTimeout)
- for _, file := range strings.Split(c.Files, ",") {
- conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex))
- }
- return conn
-}
diff --git a/clients/client.go b/clients/client.go
deleted file mode 100644
index e58f51d..0000000
--- a/clients/client.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package clients
-
-import "sync"
-
-// Client is the interface for the end user command line client.
-type Client interface {
- Start(wg *sync.WaitGroup) int
- Stop()
-}
diff --git a/clients/connectionmaker.go b/clients/connectionmaker.go
deleted file mode 100644
index 9e08c2b..0000000
--- a/clients/connectionmaker.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package clients
-
-import (
- "dtail/clients/remote"
- "dtail/ssh/client"
-
- gossh "golang.org/x/crypto/ssh"
-)
-
-type connectionMaker interface {
- makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection
-}
diff --git a/clients/grepclient.go b/clients/grepclient.go
deleted file mode 100644
index dbae96c..0000000
--- a/clients/grepclient.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package clients
-
-import (
- "dtail/clients/handlers"
- "dtail/clients/remote"
- "dtail/ssh/client"
- "errors"
- "fmt"
- "strings"
-
- gossh "golang.org/x/crypto/ssh"
-)
-
-// GrepClient searches a remote file for all lines matching a regular expression. Only the matching lines are displayed.
-type GrepClient struct {
- baseClient
-}
-
-// NewGrepClient creates a new grep client.
-func NewGrepClient(args Args) (*GrepClient, error) {
- if args.Regex == "" {
- return nil, errors.New("No regex specified, use '-regex' flag")
- }
-
- c := GrepClient{
- baseClient: baseClient{
- Args: args,
- stop: make(chan struct{}),
- stopped: make(chan struct{}),
- throttleCh: make(chan struct{}, args.MaxInitConnections),
- retry: false,
- },
- }
-
- c.init(c)
-
- return &c, nil
-}
-
-func (c GrepClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
- conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
- conn.Handler = handlers.NewClientHandler(server, c.PingTimeout)
-
- for _, file := range strings.Split(c.Files, ",") {
- conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex))
- }
-
- return conn
-}
diff --git a/clients/handlers/basehandler.go b/clients/handlers/basehandler.go
deleted file mode 100644
index ce82aa2..0000000
--- a/clients/handlers/basehandler.go
+++ /dev/null
@@ -1,134 +0,0 @@
-package handlers
-
-import (
- "dtail/logger"
- "errors"
- "fmt"
- "io"
- "strings"
- "time"
-)
-
-type baseHandler struct {
- server string
- shellStarted bool
- commands chan string
- pong chan struct{}
- receiveBuf []byte
- stop chan struct{}
- pingTimeout int
-}
-
-func (h *baseHandler) Server() string {
- return h.server
-}
-
-// Used to determine whether server is still responding to requests or not.
-func (h *baseHandler) Ping() error {
- if h.pingTimeout == 0 {
- // Server ping disabled
- return nil
- }
-
- if err := h.SendCommand("ping"); err != nil {
- return err
- }
-
- select {
- case <-h.pong:
- return nil
- case <-time.After(time.Duration(h.pingTimeout) * time.Second):
- }
-
- return errors.New("Didn't receive any server pongs (ping replies)")
-}
-
-func (h *baseHandler) SendCommand(command string) error {
- if command == "ping" {
- logger.Trace("Sending command", h.server, command)
- } else {
- logger.Debug("Sending command", h.server, command)
- }
-
- select {
- case h.commands <- fmt.Sprintf("%s;", command):
- case <-time.After(time.Second * 5):
- return errors.New("Timed out sending command " + command)
- case <-h.stop:
- }
-
- return nil
-}
-
-// Read data from the dtail server via Writer interface.
-func (h *baseHandler) Write(p []byte) (n int, err error) {
- for _, b := range p {
- h.receiveBuf = append(h.receiveBuf, b)
- if b == '\n' {
- if len(h.receiveBuf) == 0 {
- continue
- }
- message := string(h.receiveBuf)
- h.handleMessageType(message)
- }
- }
-
- return len(p), nil
-}
-
-// Send data to the dtail server via Reader interface.
-func (h *baseHandler) Read(p []byte) (n int, err error) {
- select {
- case command := <-h.commands:
- n = copy(p, []byte(command))
- case <-h.stop:
- return 0, io.EOF
- }
- return
-}
-
-// Handle various message types.
-func (h *baseHandler) handleMessageType(message string) {
- if len(h.receiveBuf) == 0 {
- return
- }
- // Hidden server commands starti with a dot "."
- if h.receiveBuf[0] == '.' {
- h.handleHiddenMessage(message)
- h.receiveBuf = h.receiveBuf[:0]
- return
- }
-
- // Silent mode will only print out remote logs but not remote server
- // commands. But remote server commands will be still logged to ./log/.
- if logger.Mode == logger.SilentMode {
- if h.receiveBuf[0] == 'R' {
- logger.Raw(message)
- }
- h.receiveBuf = h.receiveBuf[:0]
- return
- }
- logger.Raw(message)
- h.receiveBuf = h.receiveBuf[:0]
-}
-
-// Handle messages received from server which are not meant to be displayed
-// to the end user.
-func (h *baseHandler) handleHiddenMessage(message string) {
- switch {
- case strings.HasPrefix(message, ".pong"):
- h.pong <- struct{}{}
- case strings.HasPrefix(message, ".syn close connection"):
- h.SendCommand("ack close connection")
- }
-}
-
-// Stop the handler.
-func (h *baseHandler) Stop() {
- select {
- case <-h.stop:
- default:
- logger.Debug("Stopping base handler", h.server)
- close(h.stop)
- }
-}
diff --git a/clients/handlers/clienthandler.go b/clients/handlers/clienthandler.go
deleted file mode 100644
index e818b52..0000000
--- a/clients/handlers/clienthandler.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package handlers
-
-import (
- "dtail/logger"
-)
-
-// ClientHandler is the basic client handler interface.
-type ClientHandler struct {
- baseHandler
-}
-
-// NewClientHandler creates a new client handler.
-func NewClientHandler(server string, pingTimeout int) *ClientHandler {
- logger.Debug(server, "Creating new client handler")
-
- return &ClientHandler{
- baseHandler{
- server: server,
- shellStarted: false,
- commands: make(chan string),
- pong: make(chan struct{}, 1),
- stop: make(chan struct{}),
- pingTimeout: pingTimeout,
- },
- }
-}
diff --git a/clients/handlers/handler.go b/clients/handlers/handler.go
deleted file mode 100644
index 2013be0..0000000
--- a/clients/handlers/handler.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package handlers
-
-import "io"
-
-// Handler provides all methods which can be run on any client handler.
-type Handler interface {
- io.ReadWriter
- Ping() error
- Stop()
- SendCommand(command string) error
- Server() string
-}
diff --git a/clients/handlers/healthhandler.go b/clients/handlers/healthhandler.go
deleted file mode 100644
index 4051e2c..0000000
--- a/clients/handlers/healthhandler.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package handlers
-
-import (
- "errors"
- "fmt"
- "time"
-)
-
-// HealthHandler implements the handler required for health checks.
-type HealthHandler struct {
- // Buffer of incoming data from server.
- receiveBuf []byte
- // To send commands to the server.
- commands chan string
- // To receive messages from the server.
- receive chan<- string
- // The remote server address
- server string
-}
-
-// NewHealthHandler returns a new health check handler.
-func NewHealthHandler(server string, receive chan<- string) *HealthHandler {
- h := HealthHandler{
- server: server,
- receive: receive,
- commands: make(chan string),
- }
-
- return &h
-}
-
-// Server returns the remote server name.
-func (h *HealthHandler) Server() string {
- return h.server
-}
-
-// Stop is not of use for health check handler.
-func (h *HealthHandler) Stop() {
- // Nothing done here.
-}
-
-// Ping is not of use for health check handler.
-func (h *HealthHandler) Ping() error {
- return nil
-}
-
-// SendCommand send a DTail command to the server.
-func (h *HealthHandler) SendCommand(command string) error {
- select {
- case h.commands <- fmt.Sprintf("%s;", command):
- case <-time.NewTimer(time.Second * 10).C:
- return errors.New("Timed out sending command " + command)
- }
-
- return nil
-}
-
-// Server writes byte stream to client.
-func (h *HealthHandler) Write(p []byte) (n int, err error) {
- for _, b := range p {
- h.receiveBuf = append(h.receiveBuf, b)
- if b == '\n' {
- h.receive <- string(h.receiveBuf)
- h.receiveBuf = h.receiveBuf[:0]
- }
- }
-
- return len(p), nil
-}
-
-// Server reads byte stream from client.
-func (h *HealthHandler) Read(p []byte) (n int, err error) {
- n = copy(p, []byte(<-h.commands))
- return
-}
diff --git a/clients/handlers/maprhandler.go b/clients/handlers/maprhandler.go
deleted file mode 100644
index 830a142..0000000
--- a/clients/handlers/maprhandler.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package handlers
-
-import (
- "dtail/logger"
- "dtail/mapr"
- "dtail/mapr/client"
- "strings"
-)
-
-// MaprHandler is the handler used on the client side for running mapreduce aggregations.
-type MaprHandler struct {
- baseHandler
- aggregate *client.Aggregate
- query *mapr.Query
- count uint64
-}
-
-// NewMaprHandler returns a new mapreduce client handler.
-func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet, pingTimeout int) *MaprHandler {
- return &MaprHandler{
- baseHandler: baseHandler{
- server: server,
- shellStarted: false,
- commands: make(chan string),
- pong: make(chan struct{}, 1),
- stop: make(chan struct{}),
- pingTimeout: pingTimeout,
- },
- query: query,
- aggregate: client.NewAggregate(server, query, globalGroup),
- }
-}
-
-// Read data from the dtail server via Writer interface.
-func (h *MaprHandler) Write(p []byte) (n int, err error) {
- for _, b := range p {
- h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b)
- if b == '\n' {
- if len(h.baseHandler.receiveBuf) == 0 {
- continue
- }
- message := string(h.baseHandler.receiveBuf)
-
- if h.baseHandler.receiveBuf[0] == 'A' {
- h.handleAggregateMessage(strings.TrimSpace(message))
- h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0]
- continue
- }
- h.baseHandler.handleMessageType(message)
- }
- }
-
- return len(p), nil
-}
-
-// Handle a message received from server including mapr aggregation
-// related data.
-func (h *MaprHandler) handleAggregateMessage(message string) {
- h.count++
- parts := strings.Split(message, "|")
-
- // Index 0 contains 'AGGREGATE', 1 contains server host.
- // Aggregation data begins from index 2.
- logger.Debug("Received aggregate data", h.server, h.count)
- h.aggregate.Aggregate(parts[2:])
- logger.Debug("Aggregated aggregate data", h.server, h.count)
-}
-
-// Stop stops the mapreduce client handler.
-func (h *MaprHandler) Stop() {
- logger.Debug("Stopping mapreduce handler", h.server)
- h.aggregate.Stop()
- h.baseHandler.Stop()
-}
diff --git a/clients/healthclient.go b/clients/healthclient.go
deleted file mode 100644
index 1fae99c..0000000
--- a/clients/healthclient.go
+++ /dev/null
@@ -1,96 +0,0 @@
-package clients
-
-import (
- "dtail/clients/handlers"
- "dtail/clients/remote"
- "dtail/config"
- "dtail/omode"
- "fmt"
- "runtime"
- "strings"
- "sync"
- "time"
-
- gossh "golang.org/x/crypto/ssh"
-)
-
-// HealthClient is used for health checking (e.g. via Nagios)
-type HealthClient struct {
- // Client operating mode
- mode omode.Mode
- // The remote server address
- server string
- // SSH user name
- userName string
- // SSH auth methods to use to connect to the remote servers.
- sshAuthMethods []gossh.AuthMethod
-}
-
-// NewHealthClient returns a new healh client.
-func NewHealthClient(mode omode.Mode) (*HealthClient, error) {
- c := HealthClient{
- mode: mode,
- server: fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort),
- userName: config.ControlUser,
- }
- c.initSSHAuthMethods()
-
- return &c, nil
-}
-
-// Start the health client.
-func (c *HealthClient) Start(wg *sync.WaitGroup) (status int) {
- defer wg.Done()
- receive := make(chan string)
-
- throttleCh := make(chan struct{}, runtime.NumCPU())
- statsCh := make(chan struct{}, 1)
-
- conn := remote.NewOneOffConnection(c.server, c.userName, c.sshAuthMethods)
- conn.Handler = handlers.NewHealthHandler(c.server, receive)
- conn.Commands = []string{c.mode.String()}
-
- go conn.Start(throttleCh, statsCh)
- defer conn.Stop()
-
- for {
- select {
- case data := <-receive:
- // Parse recieved data.
- s := strings.Split(data, "|")
- message := s[len(s)-1]
- if strings.HasPrefix(message, "done;") {
- return
- }
-
- // Set severity.
- s = strings.Split(message, ":")
- switch s[0] {
- case "OK":
- case "WARNING":
- if status < 1 {
- status = 1
- }
- case "CRITICAL":
- status = 2
- case "UNKNOWN":
- status = 3
- default:
- fmt.Printf("CRITICAL: Unexpected server response: '%s'\n", message)
- status = 2
- return
- }
- fmt.Print(message)
-
- case <-time.After(time.Second * 2):
- status = 2
- fmt.Println("CRITICAL: Could not communicate with DTail server")
- return
- }
- }
-}
-
-// Initialize SSH auth methods.
-func (c *HealthClient) initSSHAuthMethods() {
- c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser))
-}
diff --git a/clients/maprclient.go b/clients/maprclient.go
deleted file mode 100644
index ad707c9..0000000
--- a/clients/maprclient.go
+++ /dev/null
@@ -1,153 +0,0 @@
-package clients
-
-import (
- "dtail/clients/handlers"
- "dtail/clients/remote"
- "dtail/logger"
- "dtail/mapr"
- "dtail/omode"
- "dtail/ssh/client"
- "errors"
- "fmt"
- "strings"
- "sync"
- "time"
-
- gossh "golang.org/x/crypto/ssh"
-)
-
-// MaprClient is used for running mapreduce aggregations on remote files.
-type MaprClient struct {
- baseClient
- // Query string for mapr aggregations
- queryStr string
- // Global group set for merged mapr aggregation results
- globalGroup *mapr.GlobalGroupSet
- // The query object (constructed from queryStr)
- query *mapr.Query
- // Additative result or new result every run?
- additative bool
-}
-
-// NewMaprClient returns a new mapreduce client.
-func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
- if queryStr == "" {
- return nil, errors.New("No mapreduce query specified, use '-query' flag")
- }
-
- c := MaprClient{
- baseClient: baseClient{
- Args: args,
- stop: make(chan struct{}),
- stopped: make(chan struct{}),
- throttleCh: make(chan struct{}, args.MaxInitConnections),
- retry: args.Mode == omode.TailClient,
- },
- queryStr: queryStr,
- additative: args.Mode == omode.MapClient,
- }
-
- query, err := mapr.NewQuery(c.queryStr)
- if err != nil {
- logger.FatalExit(c.queryStr, "Can't parse mapr query", err)
- }
-
- c.query = query
-
- switch c.query.Table {
- case "*":
- c.Regex = fmt.Sprintf("\\|MAPREDUCE:\\|")
- case ".":
- c.Regex = "."
- default:
- c.Regex = fmt.Sprintf("\\|MAPREDUCE:%s\\|", c.query.Table)
- }
-
- c.globalGroup = mapr.NewGlobalGroupSet()
- c.baseClient.init(c)
-
- return &c, nil
-}
-
-func (c MaprClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
- conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
- conn.Handler = handlers.NewMaprHandler(conn.Server, c.query, c.globalGroup, c.PingTimeout)
-
- conn.Commands = append(conn.Commands, fmt.Sprintf("map %s", c.query.RawQuery))
- commandStr := "tail"
- if c.additative {
- commandStr = "cat"
- }
-
- for _, file := range strings.Split(c.Files, ",") {
- conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", commandStr, file, c.Regex))
- }
-
- return conn
-}
-
-// Start starts the mapreduce client.
-func (c *MaprClient) Start(wg *sync.WaitGroup) (status int) {
- defer wg.Done()
-
- if c.query.Outfile == "" {
- // Only print out periodic results if we don't write an outfile
- go c.periodicPrintResults()
- }
-
- status = c.baseClient.Start(nil)
- if c.additative {
- c.recievedFinalResult()
- }
- c.baseClient.Stop()
-
- return
-}
-
-func (c *MaprClient) recievedFinalResult() {
- logger.Info("Received final mapreduce result")
-
- if c.query.Outfile == "" {
- c.printResults()
- return
- }
-
- logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile))
- err := c.globalGroup.WriteResult(c.query)
- if err != nil {
- logger.FatalExit(err)
- return
- }
- logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile))
-}
-
-func (c *MaprClient) periodicPrintResults() {
- for {
- select {
- case <-time.After(c.query.Interval):
- logger.Info("Gathering interim mapreduce result")
- c.printResults()
- case <-c.baseClient.stop:
- return
- }
- }
-}
-
-func (c *MaprClient) printResults() {
- var result string
- var err error
- var numLines int
-
- if c.additative {
- result, numLines, err = c.globalGroup.Result(c.query)
- } else {
- result, numLines, err = c.globalGroup.SwapOut().Result(c.query)
- }
- if err != nil {
- logger.FatalExit(err)
- }
- if numLines > 0 {
- logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
- logger.Raw(result)
- }
-}
diff --git a/clients/remote/connection.go b/clients/remote/connection.go
deleted file mode 100644
index bd93239..0000000
--- a/clients/remote/connection.go
+++ /dev/null
@@ -1,230 +0,0 @@
-package remote
-
-import (
- "dtail/clients/handlers"
- "dtail/config"
- "dtail/logger"
- "dtail/ssh/client"
- "fmt"
- "io"
- "strconv"
- "strings"
- "time"
-
- "golang.org/x/crypto/ssh"
-)
-
-// Connection represents a client connection connection to a single server.
-type Connection struct {
- // The remote server's hostname connected to.
- Server string
- // The remote server's port connected to.
- port int
- // The SSH client configuration used.
- config *ssh.ClientConfig
- // The SSH client handler to use.
- Handler handlers.Handler
- // DTail commands sent from client to server. When client loses
- // connection to the server it re-connects automatically and sends the
- // same commands again.
- Commands []string
- // Is it a persistent connection or a one-off?
- isOneOff bool
- // Used to stop the connection
- stop chan struct{}
- // To deal with SSH server host keys
- hostKeyCallback *client.HostKeyCallback
-}
-
-// NewConnection returns a new connection.
-func NewConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *Connection {
- logger.Debug(server, "Creating new connection")
-
- c := Connection{
- hostKeyCallback: hostKeyCallback,
- config: &ssh.ClientConfig{
- User: userName,
- Auth: authMethods,
- HostKeyCallback: hostKeyCallback.Wrap(),
- Timeout: time.Second * 3,
- },
- stop: make(chan struct{}),
- }
-
- c.initServerPort(server)
-
- return &c
-}
-
-// NewOneOffConnection creates new one-off connection (only for sending a series of commands and then quit).
-func NewOneOffConnection(server string, userName string, authMethods []ssh.AuthMethod) *Connection {
- c := Connection{
- config: &ssh.ClientConfig{
- User: userName,
- Auth: authMethods,
- HostKeyCallback: ssh.InsecureIgnoreHostKey(),
- },
- stop: make(chan struct{}),
- isOneOff: true,
- }
-
- c.initServerPort(server)
-
- return &c
-}
-
-// Attempt to parse the server port address from the provided server FQDN.
-func (c *Connection) initServerPort(server string) {
- c.Server = server
- c.port = config.Common.SSHPort
- parts := strings.Split(server, ":")
-
- if len(parts) == 2 {
- logger.Debug("Parsing port from hostname", parts)
- port, err := strconv.Atoi(parts[1])
- if err != nil {
- logger.FatalExit("Unable to parse client port", server, parts, err)
- }
- c.Server = parts[0]
- c.port = port
- }
-}
-
-// Start the server connection. Build up SSH session and send some DTail commandc.
-func (c *Connection) Start(throttleCh, statsCh chan struct{}) {
- select {
- case <-c.stop:
- logger.Info(c.Server, c.port, "Disconnecting client")
- return
- default:
- }
-
- // Wait for SSH connection throttler
- throttleCh <- struct{}{}
-
- // Wait until connection has been initiated or an error occured
- // during initialization.
- throttleStopCh := make(chan struct{}, 2)
- go func() {
- <-throttleStopCh
- <-throttleCh
- }()
-
- if err := c.dial(c.Server, c.port, throttleStopCh, statsCh); err != nil {
- logger.Warn(c.Server, c.port, err)
- throttleStopCh <- struct{}{}
-
- if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.Server, c.port)) {
- logger.Debug("Not trusting host, not trying to re-connect", c.Server, c.port)
- return
- }
- }
-}
-
-// Dail into a new SSH connection. Close connection in case of an error.
-func (c *Connection) dial(host string, port int, throttleStopCh, statsCh chan struct{}) error {
- statsCh <- struct{}{}
- defer func() { <-statsCh }()
-
- logger.Debug(host, "dial")
- address := fmt.Sprintf("%s:%d", host, port)
-
- client, err := ssh.Dial("tcp", address, c.config)
- if err != nil {
- return err
- }
- defer client.Close()
-
- return c.session(client, throttleStopCh)
-}
-
-// Create the SSH session. Close the session in case of an error.
-func (c *Connection) session(client *ssh.Client, throttleStopCh chan<- struct{}) error {
- logger.Debug(c.Server, "session")
-
- session, err := client.NewSession()
- if err != nil {
- return err
- }
- defer session.Close()
-
- return c.handle(session, throttleStopCh)
-}
-
-// Handle the SSH session. Also send periodic pings to the server in order
-// to determine that session is still intact.
-func (c *Connection) handle(session *ssh.Session, throttleStopCh chan<- struct{}) error {
- defer c.Handler.Stop()
-
- logger.Debug(c.Server, "handle")
-
- stdinPipe, err := session.StdinPipe()
- if err != nil {
- return err
- }
-
- stdoutPipe, err := session.StdoutPipe()
- if err != nil {
- return err
- }
-
- if err := session.Shell(); err != nil {
- return err
- }
-
- // Establish Bi-directional pipe between SSH session and client handler.
- brokenStdinPipe := make(chan struct{})
- go func() {
- defer close(brokenStdinPipe)
- io.Copy(stdinPipe, c.Handler)
- }()
-
- brokenStdoutPipe := make(chan struct{})
- go func() {
- defer close(brokenStdoutPipe)
- io.Copy(c.Handler, stdoutPipe)
- }()
-
- // SSH session established, other goroutine can initiate session now.
- throttleStopCh <- struct{}{}
-
- // Send all commands to client.
- for _, command := range c.Commands {
- logger.Debug(command)
- c.Handler.SendCommand(command)
- }
-
- if !c.isOneOff {
- return c.periodicAliveCheck(brokenStdinPipe, brokenStdoutPipe)
- }
-
- <-c.stop
-
- // Normal shutdown, all fine
- return nil
-}
-
-// Periodically check whether connection is still alive or not.
-func (c *Connection) periodicAliveCheck(brokenStdinPipe, brokenStdoutPipe <-chan struct{}) error {
- for {
- select {
- case <-time.After(time.Second * 3):
- if err := c.Handler.Ping(); err != nil {
- return err
- }
- case <-brokenStdinPipe:
- logger.Debug("Broken stdin pipe", c.Server, c.port)
- return nil
- case <-brokenStdoutPipe:
- logger.Debug("Broken stdout pipe", c.Server, c.port)
- return nil
- case <-c.stop:
- return nil
- }
- }
-}
-
-// Stop the connection.
-func (c *Connection) Stop() {
- close(c.stop)
-}
diff --git a/clients/stats.go b/clients/stats.go
deleted file mode 100644
index e5b9bed..0000000
--- a/clients/stats.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package clients
-
-import (
- "dtail/logger"
- "fmt"
- "runtime"
- "sync"
- "time"
-)
-
-// Used to collect and display various client stats.
-type stats struct {
- // Total amount servers to connect to.
- connectionsTotal int
- // To keep track of what connected and disconnected
- connectionsEstCh chan struct{}
- // Amount of servers connections are established.
- connected int
- // To synchronize concurrent access.
- mutex sync.Mutex
-}
-
-func newTailStats(connectionsTotal int) *stats {
- return &stats{
- connectionsTotal: connectionsTotal,
- connectionsEstCh: make(chan struct{}, connectionsTotal),
- connected: 0,
- }
-}
-
-func (s *stats) periodicLogStats(throttleCh chan struct{}, stop <-chan struct{}) {
- connectedLast := 0
- statsInterval := 5
-
- for {
- select {
- case <-time.After(time.Second * time.Duration(statsInterval)):
- case <-stop:
- return
- }
-
- connected := len(s.connectionsEstCh)
- throttle := len(throttleCh)
-
- newConnections := connected - connectedLast
- connectionsPerSecond := float64(newConnections) / float64(statsInterval)
- s.log(connected, newConnections, connectionsPerSecond, throttle)
-
- connectedLast = connected
-
- s.mutex.Lock()
- s.connected = connected
- s.mutex.Unlock()
- }
-}
-
-func (s *stats) numConnected() int {
- s.mutex.Lock()
- defer s.mutex.Unlock()
-
- return s.connected
-}
-
-func (s *stats) log(connected, newConnections int, connectionsPerSecond float64, throttle int) {
- percConnected := percentOf(float64(s.connectionsTotal), float64(connected))
-
- connectedStr := fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected))
- newConnStr := fmt.Sprintf("new=%d", newConnections)
- rateStr := fmt.Sprintf("rate=%2.2f/s", connectionsPerSecond)
- throttleStr := fmt.Sprintf("throttle=%d", throttle)
- cpusGoroutinesStr := fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine())
-
- logger.Info("stats", connectedStr, newConnStr, rateStr, throttleStr, cpusGoroutinesStr)
-}
-
-func percentOf(total float64, value float64) float64 {
- if total == 0 || total == value {
- return 100
- }
- return value / (total / 100.0)
-}
diff --git a/clients/tailclient.go b/clients/tailclient.go
deleted file mode 100644
index cb93258..0000000
--- a/clients/tailclient.go
+++ /dev/null
@@ -1,44 +0,0 @@
-package clients
-
-import (
- "dtail/clients/handlers"
- "dtail/clients/remote"
- "dtail/ssh/client"
- "fmt"
- "strings"
-
- gossh "golang.org/x/crypto/ssh"
-)
-
-// TailClient is used for tailing remote log files (opening, seeking to the end and returning only new incoming lines).
-type TailClient struct {
- baseClient
-}
-
-// NewTailClient returns a new TailClient.
-func NewTailClient(args Args) (*TailClient, error) {
- c := TailClient{
- baseClient: baseClient{
- Args: args,
- stop: make(chan struct{}),
- stopped: make(chan struct{}),
- throttleCh: make(chan struct{}, args.MaxInitConnections),
- retry: true,
- },
- }
-
- c.init(c)
-
- return &c, nil
-}
-
-func (c TailClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback *client.HostKeyCallback) *remote.Connection {
- conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
- conn.Handler = handlers.NewClientHandler(server, c.PingTimeout)
-
- for _, file := range strings.Split(c.Files, ",") {
- conn.Commands = append(conn.Commands, fmt.Sprintf("%s %s regex %s", c.Mode.String(), file, c.Regex))
- }
-
- return conn
-}