diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
| commit | f4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch) | |
| tree | ea5e4a2d2a67035f645bdee496ae55a52034178a /internal/clients | |
| parent | d80d6070557e3a800e3a54967af9eced518f116b (diff) | |
| parent | 739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff) | |
merge develop
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/baseclient.go | 131 | ||||
| -rw-r--r-- | internal/clients/catclient.go | 16 | ||||
| -rw-r--r-- | internal/clients/connectors/connector.go | 17 | ||||
| -rw-r--r-- | internal/clients/connectors/serverconnection.go | 206 | ||||
| -rw-r--r-- | internal/clients/connectors/serverless.go | 116 | ||||
| -rw-r--r-- | internal/clients/grepclient.go | 19 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 77 | ||||
| -rw-r--r-- | internal/clients/handlers/clienthandler.go | 4 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 106 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 56 | ||||
| -rw-r--r-- | internal/clients/healthclient.go | 114 | ||||
| -rw-r--r-- | internal/clients/maker.go | 2 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 89 | ||||
| -rw-r--r-- | internal/clients/remote/connection.go | 212 | ||||
| -rw-r--r-- | internal/clients/stats.go | 76 | ||||
| -rw-r--r-- | internal/clients/tailclient.go | 20 |
16 files changed, 686 insertions, 575 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index f83fcfd..4a7bd84 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -2,15 +2,13 @@ package clients import ( "context" - "fmt" - "strings" "sync" "time" - "github.com/mimecast/dtail/internal/clients/remote" + "github.com/mimecast/dtail/internal/clients/connectors" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/discovery" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/omode" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/regex" "github.com/mimecast/dtail/internal/ssh/client" @@ -19,13 +17,13 @@ import ( // This is the main client data structure. type baseClient struct { - Args + config.Args // To display client side stats stats *stats // List of remote servers to connect to. servers []string // We have one connection per remote server. - connections []*remote.Connection + connections []connectors.Connector // SSH auth methods to use to connect to the remote servers. sshAuthMethods []gossh.AuthMethod // To deal with SSH host keys @@ -41,7 +39,7 @@ type baseClient struct { } func (c *baseClient) init() { - logger.Debug("Initiating base client") + dlog.Client.Debug("Initiating base client", c.Args.String()) flag := regex.Default if c.Args.RegexInvert { @@ -49,12 +47,16 @@ func (c *baseClient) init() { } regex, err := regex.New(c.Args.RegexStr, flag) if err != nil { - logger.FatalExit(c.Regex, "invalid regex!", err, regex) + dlog.Client.FatalPanic(c.Regex, "Invalid regex!", err, regex) } c.Regex = regex - logger.Debug("Regex", c.Regex) - c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, c.throttleCh, c.Args.PrivateKeyPathFile) + if c.Args.Serverless { + return + } + c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods( + c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, + c.throttleCh, c.Args.PrivateKeyPathFile) } func (c *baseClient) makeConnections(maker maker) { @@ -62,26 +64,31 @@ func (c *baseClient) makeConnections(maker maker) { discoveryService := discovery.New(c.Discovery, c.ServersStr, discovery.Shuffle) for _, server := range discoveryService.ServerList() { - c.connections = append(c.connections, c.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback)) + c.connections = append(c.connections, c.makeConnection(server, + c.sshAuthMethods, c.hostKeyCallback)) } c.stats = newTailStats(len(c.connections)) } func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) { - // Periodically check for unknown hosts, and ask the user whether to trust them or not. - go c.hostKeyCallback.PromptAddHosts(ctx) + dlog.Client.Trace("Starting base client") + // Can be nil when serverless. + if c.hostKeyCallback != nil { + // Periodically check for unknown hosts, and ask the user whether to trust them or not. + go c.hostKeyCallback.PromptAddHosts(ctx) + } // Print client stats every time something on statsCh is recieved. go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) - // Keep count of active connections - active := make(chan struct{}, len(c.connections)) + var wg sync.WaitGroup + wg.Add(len(c.connections)) var mutex sync.Mutex - for i, conn := range c.connections { - go func(i int, conn *remote.Connection) { - connStatus := c.start(ctx, active, i, conn) - // Update global status. + for i, conn := range c.connections { + go func(i int, conn connectors.Connector) { + defer wg.Done() + connStatus := c.startConnection(ctx, i, conn) mutex.Lock() defer mutex.Unlock() if connStatus > status { @@ -90,15 +97,12 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i }(i, conn) } - c.waitUntilDone(ctx, active) + wg.Wait() return } -func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, conn *remote.Connection) (status int) { - // Increment connection count - active <- struct{}{} - // Derement connection count - defer func() { <-active }() +func (c *baseClient) startConnection(ctx context.Context, i int, + conn connectors.Connector) (status int) { for { connCtx, cancel := context.WithCancel(ctx) @@ -106,80 +110,25 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con conn.Start(connCtx, cancel, c.throttleCh, c.stats.connectionsEstCh) // Retrieve status code from handler (dtail client will exit with that status) - status = conn.Handler.Status() + status = conn.Handler().Status() if !c.retry { return } time.Sleep(time.Second * 2) - logger.Debug(conn.Server, "Reconnecting") - - conn = c.makeConnection(conn.Server, c.sshAuthMethods, c.hostKeyCallback) + dlog.Client.Debug(conn.Server(), "Reconnecting") + conn = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback) c.connections[i] = conn } } -func (c *baseClient) makeCommandOptions() map[string]string { - options := make(map[string]string) - - if c.Args.Quiet { - options["quiet"] = fmt.Sprintf("%v", c.Args.Quiet) - } - if c.Args.LContext.MaxCount != 0 { - options["max"] = fmt.Sprintf("%d", c.Args.LContext.MaxCount) - } - if c.Args.LContext.BeforeContext != 0 { - options["before"] = fmt.Sprintf("%d", c.Args.LContext.BeforeContext) - } - if c.Args.LContext.AfterContext != 0 { - options["after"] = fmt.Sprintf("%d", c.Args.LContext.AfterContext) - } - - return options -} - -func (c *baseClient) commandOptionsToString(options map[string]string) string { - var sb strings.Builder - - count := 0 - for k, v := range options { - if count > 0 { - sb.WriteString(":") - } - sb.WriteString(fmt.Sprintf("%s=%s", k, v)) - count++ - } - - return sb.String() -} - -func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) *remote.Connection { - conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback) - conn.Handler = c.maker.makeHandler(server) - conn.Commands = c.maker.makeCommands(c.makeCommandOptions()) - - return conn -} - -func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { - defer logger.Debug("Terminated connection") - - // We want to have at least one active connection - <-active - // Put it back on the channel - active <- struct{}{} - - if c.Mode == omode.TailClient && c.retry { - <-ctx.Done() - } - - for { - numActive := len(active) - if numActive == 0 { - return - } - logger.Debug("Active connections", numActive) - time.Sleep(time.Second) +func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, + hostKeyCallback client.HostKeyCallback) connectors.Connector { + if c.Args.Serverless { + return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), + c.maker.makeCommands()) } + return connectors.NewServerConnection(server, c.UserName, sshAuthMethods, + hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands()) } diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index db892f1..bd65560 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -7,6 +7,8 @@ import ( "strings" "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" ) @@ -16,11 +18,10 @@ type CatClient struct { } // NewCatClient returns a new cat client. -func NewCatClient(args Args) (*CatClient, error) { +func NewCatClient(args config.Args) (*CatClient, error) { if args.RegexStr != "" { return nil, errors.New("Can't use regex with 'cat' operating mode") } - args.Mode = omode.CatClient c := CatClient{ @@ -33,7 +34,6 @@ func NewCatClient(args Args) (*CatClient, error) { c.init() c.makeConnections(c) - return &c, nil } @@ -41,10 +41,14 @@ func (c CatClient) makeHandler(server string) handlers.Handler { return handlers.NewClientHandler(server) } -func (c CatClient) makeCommands(options map[string]string) (commands []string) { - optionsStr := c.commandOptionsToString(options) +func (c CatClient) makeCommands() (commands []string) { + regex, err := c.Regex.Serialize() + if err != nil { + dlog.Client.FatalPanic(err) + } for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", + c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } return } diff --git a/internal/clients/connectors/connector.go b/internal/clients/connectors/connector.go new file mode 100644 index 0000000..3ab6a08 --- /dev/null +++ b/internal/clients/connectors/connector.go @@ -0,0 +1,17 @@ +package connectors + +import ( + "context" + + "github.com/mimecast/dtail/internal/clients/handlers" +) + +// Connector interface. +type Connector interface { + // Start the connection. + Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) + // Server hostname. + Server() string + // Handler for the connection. + Handler() handlers.Handler +} diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go new file mode 100644 index 0000000..aeb2a41 --- /dev/null +++ b/internal/clients/connectors/serverconnection.go @@ -0,0 +1,206 @@ +package connectors + +import ( + "context" + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/ssh/client" + + "golang.org/x/crypto/ssh" +) + +// ServerConnection represents a connection to a single remote dtail server via +// SSH protocol. +type ServerConnection struct { + // The full server string as received from the server discovery (can be with port number) + server string + // Only the hostname or FQDN (without the port number) + hostname string + // Only the port number. + port int + config *ssh.ClientConfig + handler handlers.Handler + commands []string + hostKeyCallback client.HostKeyCallback + throttlingDone bool +} + +// NewServerConnection returns a new DTail SSH server connection. +func NewServerConnection(server string, userName string, + authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, + handler handlers.Handler, commands []string) *ServerConnection { + + dlog.Client.Debug(server, "Creating new connection", server, handler, commands) + c := ServerConnection{ + hostKeyCallback: hostKeyCallback, + server: server, + handler: handler, + commands: commands, + config: &ssh.ClientConfig{ + User: userName, + Auth: authMethods, + HostKeyCallback: hostKeyCallback.Wrap(), + Timeout: time.Second * 2, + }, + } + + c.initServerPort() + return &c +} + +// Server returns the server hostname connected to. +func (c *ServerConnection) Server() string { return c.server } + +// Handler returns the handler used for the connection. +func (c *ServerConnection) Handler() handlers.Handler { return c.handler } + +// Attempt to parse the server port address from the provided server FQDN. +func (c *ServerConnection) initServerPort() { + parts := strings.Split(c.server, ":") + if len(parts) == 1 { + c.hostname = c.server + c.port = config.Common.SSHPort + return + } + + dlog.Client.Debug("Parsing port from hostname", parts) + port, err := strconv.Atoi(parts[1]) + if err != nil { + dlog.Client.FatalPanic("Unable to parse client port", c.server, parts, err) + } + c.hostname = parts[0] + c.port = port +} + +// Start the connection to the server. +func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) { + + // Throttle how many connections can be established concurrently (based on ch length) + dlog.Client.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh)) + + select { + case throttleCh <- struct{}{}: + case <-ctx.Done(): + dlog.Client.Debug(c.server, "Not establishing connection as context is done", + len(throttleCh), cap(throttleCh)) + return + } + + dlog.Client.Debug(c.server, "Throttling says that the connection can be established", + len(throttleCh), cap(throttleCh)) + + go func() { + defer func() { + if !c.throttlingDone { + dlog.Client.Debug(c.server, "Unthrottling connection (1)", + len(throttleCh), cap(throttleCh)) + c.throttlingDone = true + <-throttleCh + } + cancel() + }() + + if err := c.dial(ctx, cancel, throttleCh, statsCh); err != nil { + dlog.Client.Warn(c.server, err) + if c.hostKeyCallback.Untrusted(c.server) { + dlog.Client.Debug(c.server, "Not trusting host") + } + } + }() + + <-ctx.Done() +} + +// Dail into a new SSH connection. Close connection in case of an error. +func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) error { + + dlog.Client.Debug(c.server, "Incrementing connection stats") + statsCh <- struct{}{} + defer func() { + dlog.Client.Debug(c.server, "Decrementing connection stats") + <-statsCh + }() + + address := fmt.Sprintf("%s:%d", c.hostname, c.port) + dlog.Client.Debug(c.server, "Dialing into the connection", address) + + client, err := ssh.Dial("tcp", address, c.config) + if err != nil { + return err + } + defer client.Close() + + return c.session(ctx, cancel, client, throttleCh) +} + +// Create the SSH session. Close the session in case of an error. +func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, + client *ssh.Client, throttleCh chan struct{}) error { + + dlog.Client.Debug(c.server, "Creating SSH session") + session, err := client.NewSession() + if err != nil { + return err + } + defer session.Close() + return c.handle(ctx, cancel, session, throttleCh) +} + +func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, + session *ssh.Session, throttleCh chan struct{}) error { + + dlog.Client.Debug(c.server, "Creating handler for SSH session") + stdinPipe, err := session.StdinPipe() + if err != nil { + return err + } + stdoutPipe, err := session.StdoutPipe() + if err != nil { + return err + } + if err := session.Shell(); err != nil { + return err + } + + go func() { + io.Copy(stdinPipe, c.handler) + cancel() + }() + go func() { + io.Copy(c.handler, stdoutPipe) + cancel() + }() + go func() { + select { + case <-c.handler.Done(): + case <-ctx.Done(): + } + cancel() + }() + + // Send all commands to client. + for _, command := range c.commands { + dlog.Client.Debug(command) + c.handler.SendMessage(command) + } + + if !c.throttlingDone { + dlog.Client.Debug(c.server, "Unthrottling connection (2)", + len(throttleCh), cap(throttleCh)) + c.throttlingDone = true + <-throttleCh + } + + <-ctx.Done() + c.handler.Shutdown() + return nil +} diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go new file mode 100644 index 0000000..2ff490a --- /dev/null +++ b/internal/clients/connectors/serverless.go @@ -0,0 +1,116 @@ +package connectors + +import ( + "context" + "io" + + "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + serverHandlers "github.com/mimecast/dtail/internal/server/handlers" + user "github.com/mimecast/dtail/internal/user/server" +) + +// Serverless creates a server object directly without TCP. +type Serverless struct { + handler handlers.Handler + commands []string + userName string +} + +// NewServerless starts a new serverless session. +func NewServerless(userName string, handler handlers.Handler, + commands []string) *Serverless { + + dlog.Client.Debug("Creating new serverless connector", handler, commands) + return &Serverless{ + userName: userName, + handler: handler, + commands: commands, + } +} + +// Server returns serverless server indicator. +func (s *Serverless) Server() string { + return "local(serverless)" +} + +// Handler returns the handler used for the serverless connection. +func (s *Serverless) Handler() handlers.Handler { + return s.handler +} + +// Start the serverless connection. +func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) { + + dlog.Client.Debug("Starting serverless connector") + go func() { + defer cancel() + + if err := s.handle(ctx, cancel); err != nil { + dlog.Client.Warn(err) + } + }() + <-ctx.Done() +} + +func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error { + dlog.Client.Debug("Creating server handler for a serverless session") + + user, err := user.New(s.userName, s.Server()) + if err != nil { + return err + } + + var serverHandler serverHandlers.Handler + switch s.userName { + case config.HealthUser: + dlog.Client.Debug("Creating serverless health handler") + serverHandler = serverHandlers.NewHealthHandler(user) + default: + dlog.Client.Debug("Creating serverless server handler") + serverHandler = serverHandlers.NewServerHandler( + user, + make(chan struct{}, config.Server.MaxConcurrentCats), + make(chan struct{}, config.Server.MaxConcurrentTails), + ) + } + + terminate := func() { + dlog.Client.Debug("Terminating serverless connection") + serverHandler.Shutdown() + cancel() + } + + go func() { + io.Copy(serverHandler, s.handler) + dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done") + terminate() + }() + go func() { + io.Copy(s.handler, serverHandler) + dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done") + terminate() + }() + go func() { + select { + case <-s.handler.Done(): + dlog.Client.Trace("<-s.handler.Done()") + case <-ctx.Done(): + dlog.Client.Trace("<-ctx.Done()") + } + terminate() + }() + + // Send all commands to client. + for _, command := range s.commands { + dlog.Client.Debug("Sending command to serverless server", command) + s.handler.SendMessage(command) + } + + <-ctx.Done() + dlog.Client.Trace("s.handler.Shutdown()") + s.handler.Shutdown() + return nil +} diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 567193a..7521c67 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -7,16 +7,19 @@ import ( "strings" "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" ) -// GrepClient searches a remote file for all lines matching a regular expression. Only the matching lines are displayed. +// GrepClient searches a remote file for all lines matching a regular +// expression. Only the matching lines are displayed. type GrepClient struct { baseClient } // NewGrepClient creates a new grep client. -func NewGrepClient(args Args) (*GrepClient, error) { +func NewGrepClient(args config.Args) (*GrepClient, error) { if args.RegexStr == "" { return nil, errors.New("No regex specified, use '-regex' flag") } @@ -32,7 +35,6 @@ func NewGrepClient(args Args) (*GrepClient, error) { c.init() c.makeConnections(c) - return &c, nil } @@ -40,11 +42,14 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { return handlers.NewClientHandler(server) } -func (c GrepClient) makeCommands(options map[string]string) (commands []string) { - optionsStr := c.commandOptionsToString(options) +func (c GrepClient) makeCommands() (commands []string) { + regex, err := c.Regex.Serialize() + if err != nil { + dlog.Client.FatalPanic(err) + } for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", + c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } - return } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 602a7ac..b520c25 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "encoding/base64" "fmt" "io" @@ -8,8 +9,8 @@ import ( "time" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/version" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/protocol" ) type baseHandler struct { @@ -17,10 +18,20 @@ type baseHandler struct { server string shellStarted bool commands chan string - receiveBuf []byte + receiveBuf bytes.Buffer status int } +func (h *baseHandler) String() string { + return fmt.Sprintf("baseHandler(%s,server:%s,shellStarted:%v,status:%d)@%p", + h.done, + h.server, + h.shellStarted, + h.status, + h, + ) +} + func (h *baseHandler) Server() string { return h.server } @@ -29,21 +40,13 @@ func (h *baseHandler) Status() int { return h.status } -func (h *baseHandler) Done() <-chan struct{} { - return h.done.Done() -} - -func (h *baseHandler) Shutdown() { - h.done.Shutdown() -} - // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) - logger.Debug("Sending command", h.server, command, encoded) + dlog.Client.Debug("Sending command", h.server, command, encoded) select { - case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded): + case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded): case <-time.After(time.Second * 5): return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) case <-h.Done(): @@ -56,13 +59,20 @@ func (h *baseHandler) SendMessage(command string) error { // Read data from the dtail server via Writer interface. func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { - if len(h.receiveBuf) == 0 { + switch b { + /* + // NEXT: Next DTail version make it so that '\n' gets ignored. For now + // leave it for compatibility with older DTail server + ability to display + // the protocol mismatch warn message. + case '\n' { continue - } - message := string(h.receiveBuf) - h.handleMessageType(message) + */ + case '\n', protocol.MessageDelimiter: + message := h.receiveBuf.String() + h.handleMessage(message) + h.receiveBuf.Reset() + default: + h.receiveBuf.WriteByte(b) } } @@ -77,31 +87,32 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.Done(): return 0, io.EOF } - return } -// Handle various message types. -func (h *baseHandler) handleMessageType(message string) { - if len(h.receiveBuf) == 0 { - return - } - - // Hidden server commands starti with a dot "." - if h.receiveBuf[0] == '.' { +func (h *baseHandler) handleMessage(message string) { + if len(message) > 0 && message[0] == '.' { h.handleHiddenMessage(message) - h.receiveBuf = h.receiveBuf[:0] return } - logger.Raw(message) - h.receiveBuf = h.receiveBuf[:0] + dlog.Client.Raw(message) } // Handle messages received from server which are not meant to be displayed // to the end user. func (h *baseHandler) handleHiddenMessage(message string) { - if strings.HasPrefix(message, ".syn close connection") { - h.SendMessage(".ack close connection") + switch { + case strings.HasPrefix(message, ".syn close connection"): + go h.SendMessage(".ack close connection") + h.Shutdown() } } + +func (h *baseHandler) Done() <-chan struct{} { + return h.done.Done() +} + +func (h *baseHandler) Shutdown() { + h.done.Shutdown() +} diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go index 2bcb038..27ac85e 100644 --- a/internal/clients/handlers/clienthandler.go +++ b/internal/clients/handlers/clienthandler.go @@ -2,7 +2,7 @@ package handlers import ( "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // ClientHandler is the basic client handler interface. @@ -12,7 +12,7 @@ type ClientHandler struct { // NewClientHandler creates a new client handler. func NewClientHandler(server string) *ClientHandler { - logger.Debug(server, "Creating new client handler") + dlog.Client.Debug(server, "Creating new client handler") return &ClientHandler{ baseHandler{ diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 0440706..47b594e 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -1,88 +1,56 @@ package handlers import ( - "errors" - "fmt" - "time" + "strings" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/protocol" ) -// HealthHandler implements the handler required for health checks. +// HealthHandler is the handler used on the client side for running mapreduce +// aggregations. type HealthHandler struct { - done *internal.Done - // Buffer of incoming data from server. - receiveBuf []byte - // To send commands to the server. - commands chan string - // To receive messages from the server. - receive chan<- string - // The remote server address - server string - // The return status. - status int -} - -// NewHealthHandler returns a new health check handler. -func NewHealthHandler(server string, receive chan<- string) *HealthHandler { - h := HealthHandler{ - server: server, - receive: receive, - commands: make(chan string), - status: -1, - done: internal.NewDone(), - } - - return &h -} - -// Server returns the remote server name. -func (h *HealthHandler) Server() string { - return h.server -} - -// Status of the handler. -func (h *HealthHandler) Status() int { - return h.status -} - -// Done returns done channel of the handler. -func (h *HealthHandler) Done() <-chan struct{} { - return h.done.Done() -} - -// Shutdown the handler. -func (h *HealthHandler) Shutdown() { - h.done.Shutdown() -} - -// SendMessage sends a DTail command to the server. -func (h *HealthHandler) SendMessage(command string) error { - select { - case h.commands <- fmt.Sprintf("%s;", command): - case <-time.NewTimer(time.Second * 10).C: - return errors.New("Timed out sending command " + command) - case <-h.Done(): + baseHandler +} + +// NewHealthHandler returns a new health client handler. +func NewHealthHandler(server string) *HealthHandler { + dlog.Client.Debug(server, "Creating new health handler") + return &HealthHandler{ + baseHandler: baseHandler{ + server: server, + shellStarted: false, + commands: make(chan string), + status: 2, // Assume CRITICAL status by default. + done: internal.NewDone(), + }, } - - return nil } -// Server writes byte stream to client. +// Read data from the dtail server via Writer interface. func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { - h.receive <- string(h.receiveBuf) - h.receiveBuf = h.receiveBuf[:0] + switch b { + case '\n', protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + h.handleMessage(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } - return len(p), nil } -// Server reads byte stream from client. -func (h *HealthHandler) Read(p []byte) (n int, err error) { - n = copy(p, []byte(<-h.commands)) - return +func (h *HealthHandler) handleMessage(message string) { + if len(message) > 0 && message[0] == '.' { + h.baseHandler.handleHiddenMessage(message) + return + } + s := strings.Split(message, protocol.FieldDelimiter) + message = s[len(s)-1] + if message == "OK" { + h.baseHandler.status = 0 + } } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index fb71c8f..8718b35 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -4,21 +4,24 @@ import ( "strings" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" + "github.com/mimecast/dtail/internal/protocol" ) -// MaprHandler is the handler used on the client side for running mapreduce aggregations. +// MaprHandler is the handler used on the client side for running mapreduce +// aggregations. type MaprHandler struct { baseHandler aggregate *client.Aggregate query *mapr.Query - count uint64 } // NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler { +func NewMaprHandler(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *MaprHandler { + return &MaprHandler{ baseHandler: baseHandler{ server: server, @@ -35,34 +38,35 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr // Read data from the dtail server via Writer interface. func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == '\n' { - if len(h.baseHandler.receiveBuf) == 0 { - continue - } - message := string(h.baseHandler.receiveBuf) - - if h.baseHandler.receiveBuf[0] == 'A' { - h.handleAggregateMessage(strings.TrimSpace(message)) - h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] - continue + switch b { + case '\n': + continue + case protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + dlog.Client.Debug(message) + if message[0] == 'A' { + h.handleAggregateMessage(message) + } else { + h.baseHandler.handleMessage(message) } - h.baseHandler.handleMessageType(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } return len(p), nil } -// Handle a message received from server including mapr aggregation -// related data. +// Handle a message received from server including mapr aggregation related data. func (h *MaprHandler) handleAggregateMessage(message string) { - h.count++ - parts := strings.Split(message, "➔") - - // Index 0 contains 'AGGREGATE', 1 contains server host. - // Aggregation data begins from index 2. - logger.Debug("Received aggregate data", h.server, h.count, parts) - h.aggregate.Aggregate(parts[2:]) - logger.Debug("Aggregated aggregate data", h.server, h.count) + parts := strings.SplitN(message, protocol.FieldDelimiter, 3) + if len(parts) != 3 { + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, + len(parts), "expected 3 parts") + return + } + if err := h.aggregate.Aggregate(parts[2]); err != nil { + dlog.Client.Error("Unable to aggregate data", h.server, message, err) + } } diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index e93f6be..1a02827 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -4,93 +4,75 @@ import ( "context" "fmt" "runtime" - "strings" - "time" "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/clients/remote" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/omode" gossh "golang.org/x/crypto/ssh" ) -// HealthClient is used for health checking (e.g. via Nagios) +// HealthClient is used to perform a basic server health check. type HealthClient struct { - // Client operating mode - mode omode.Mode - // The remote server address - server string - // SSH user name - userName string - // SSH auth methods to use to connect to the remote servers. - sshAuthMethods []gossh.AuthMethod + baseClient } -// NewHealthClient returns a new healh client. -func NewHealthClient(mode omode.Mode) (*HealthClient, error) { +// NewHealthClient returns a new health client. +func NewHealthClient(args config.Args) (*HealthClient, error) { + args.Mode = omode.HealthClient + args.UserName = config.HealthUser c := HealthClient{ - mode: mode, - server: fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort), - userName: config.ControlUser, + baseClient: baseClient{ + Args: args, + throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), + retry: false, + }, } - c.initSSHAuthMethods() + c.init() + c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser)) + c.makeConnections(c) return &c, nil } -// Start the health client. -func (c *HealthClient) Start(ctx context.Context) (status int) { - receive := make(chan string) - - throttleCh := make(chan struct{}, runtime.NumCPU()) - statsCh := make(chan struct{}, 1) - - conn := remote.NewOneOffConnection(c.server, c.userName, c.sshAuthMethods) - conn.Handler = handlers.NewHealthHandler(c.server, receive) - conn.Commands = []string{c.mode.String()} - - connCtx, cancel := context.WithCancel(ctx) - go conn.Start(connCtx, cancel, throttleCh, statsCh) +func (c HealthClient) makeHandler(server string) handlers.Handler { + return handlers.NewHealthHandler(server) +} - for { - select { - case data := <-receive: - // Parse recieved data. - s := strings.Split(data, "|") - message := s[len(s)-1] - if strings.HasPrefix(message, "done;") { - return - } +func (c HealthClient) makeCommands() (commands []string) { + commands = append(commands, "health") + return +} - // Set severity. - s = strings.Split(message, ":") - switch s[0] { - case "OK": - case "WARNING": - if status < 1 { - status = 1 - } - case "CRITICAL": - status = 2 - case "UNKNOWN": - status = 3 - default: - fmt.Printf("CRITICAL: Unexpected server response: '%s'\n", message) - status = 2 - return - } - fmt.Print(message) +// Start the health client. +func (c *HealthClient) Start(ctx context.Context, statsCh <-chan string) int { + status := c.baseClient.Start(ctx, statsCh) - case <-time.After(time.Second * 2): - status = 2 - fmt.Println("CRITICAL: Could not communicate with DTail server") - return + switch status { + case 0: + if c.Serverless { + fmt.Printf("WARNING: All seems fine but the check only run in serverless mode" + + ", please specify a remote server via --server hostname:port\n") + return 1 + } + fmt.Printf("OK: All fine at %s :-)\n", c.ServersStr) + case 2: + if c.Serverless { + fmt.Printf("CRITICAL: DTail server not operating properly (using " + + "serverless connction)!\n") + return 2 } + fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n", + c.ServersStr) + default: + if c.Serverless { + fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless "+ + "connection)\n", status) + return status + } + fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n", + status, c.ServersStr) } -} -// Initialize SSH auth methods. -func (c *HealthClient) initSSHAuthMethods() { - c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser)) + return status } diff --git a/internal/clients/maker.go b/internal/clients/maker.go index a1d6864..d5ffd8b 100644 --- a/internal/clients/maker.go +++ b/internal/clients/maker.go @@ -9,5 +9,5 @@ import ( // and send different commands to the DTail server. type maker interface { makeHandler(server string) handlers.Handler - makeCommands(options map[string]string) (commands []string) + makeCommands() (commands []string) } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index feb7e47..246946f 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -9,7 +9,9 @@ import ( "time" "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/color" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/omode" ) @@ -29,25 +31,25 @@ const ( // MaprClient is used for running mapreduce aggregations on remote files. type MaprClient struct { baseClient - // Query string for mapr aggregations - queryStr string // Global group set for merged mapr aggregation results globalGroup *mapr.GlobalGroupSet // The query object (constructed from queryStr) query *mapr.Query // Additative result or new result every interval run? cumulative bool + // The last result string received + lastResult string } // NewMaprClient returns a new mapreduce client. -func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) { - if queryStr == "" { +func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient, error) { + if args.QueryStr == "" { return nil, errors.New("No mapreduce query specified, use '-query' flag") } - query, err := mapr.NewQuery(queryStr) + query, err := mapr.NewQuery(args.QueryStr) if err != nil { - logger.FatalExit(queryStr, "Can't parse mapr query", err) + dlog.Client.FatalPanic(args.QueryStr, "Can't parse mapr query", err) } // Don't retry connection if in tail mode and no outfile specified. @@ -64,7 +66,7 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* cumulative = args.Mode == omode.MapClient || query.HasOutfile() } - logger.Debug("Cumulative mapreduce mode?", cumulative) + dlog.Client.Debug("Cumulative mapreduce mode?", cumulative) c := MaprClient{ baseClient: baseClient{ @@ -73,7 +75,6 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* retry: retry, }, query: query, - queryStr: queryStr, cumulative: cumulative, } @@ -99,46 +100,51 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i status = c.baseClient.Start(ctx, statsCh) if c.cumulative { - logger.Debug("Received final mapreduce result") + dlog.Client.Debug("Received final mapreduce result") c.reportResults() } return } +// NEXT: Make this a callback function rather trying to use polymorphism to call +// this. This applies to all clients. It will make the code easier to read. func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } -func (c MaprClient) makeCommands(options map[string]string) (commands []string) { +func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) - modeStr := "cat" if c.Mode == omode.TailClient { modeStr = "tail" } - optionsStr := c.commandOptionsToString(options) for _, file := range strings.Split(c.What, ",") { + regex, err := c.Regex.Serialize() + if err != nil { + dlog.Client.FatalPanic(err) + } if c.Timeout > 0 { - commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, + modeStr, file, regex)) continue } - commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, optionsStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", + modeStr, c.Args.SerializeOptions(), file, regex)) } - return } func (c *MaprClient) periodicReportResults(ctx context.Context) { rampUpSleep := c.query.Interval / 2 - logger.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep) + dlog.Client.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep) time.Sleep(rampUpSleep) for { select { case <-time.After(c.query.Interval): - logger.Debug("Gathering interim mapreduce result") + dlog.Client.Debug("Gathering interim mapreduce result") c.reportResults() case <-ctx.Done(): return @@ -151,42 +157,65 @@ func (c *MaprClient) reportResults() { c.writeResultsToOutfile() return } - c.printResults() } func (c *MaprClient) printResults() { var result string var err error - var numLines int + var numRows int + rowsLimit := -1 + + if c.query.Limit == -1 { + // Limit output to 10 rows when the result is printed to stdout. + // This can be overriden with the limit clause though. + rowsLimit = 10 + } if c.cumulative { - result, numLines, err = c.globalGroup.Result(c.query) + result, numRows, err = c.globalGroup.Result(c.query, rowsLimit) } else { - result, numLines, err = c.globalGroup.SwapOut().Result(c.query) + result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit) } if err != nil { - logger.FatalExit(err) + dlog.Client.FatalPanic(err) } - if numLines == 0 { - logger.Warn("Empty result set this time...") + if result == c.lastResult { + dlog.Client.Debug("Result hasn't changed compared to last time...") return } + c.lastResult = result - logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) - logger.Raw(result) + if numRows == 0 { + dlog.Client.Debug("Empty result set this time...") + return + } + + rawQuery := c.query.RawQuery + if config.Client.TermColorsEnable { + rawQuery = color.PaintStrWithAttr(rawQuery, + config.Client.TermColors.MaprTable.RawQueryFg, + config.Client.TermColors.MaprTable.RawQueryBg, + config.Client.TermColors.MaprTable.RawQueryAttr) + } + dlog.Client.Raw(rawQuery) + + if rowsLimit > 0 && numRows > rowsLimit { + dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output "+ + "to %d rows! Use 'limit' clause to override!", numRows, rowsLimit)) + } + dlog.Client.Raw(result) } func (c *MaprClient) writeResultsToOutfile() { if c.cumulative { if err := c.globalGroup.WriteResult(c.query); err != nil { - logger.FatalExit(err) + dlog.Client.FatalPanic(err) } return } - if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil { - logger.FatalExit(err) + dlog.Client.FatalPanic(err) } } diff --git a/internal/clients/remote/connection.go b/internal/clients/remote/connection.go deleted file mode 100644 index b29ffed..0000000 --- a/internal/clients/remote/connection.go +++ /dev/null @@ -1,212 +0,0 @@ -package remote - -import ( - "context" - "fmt" - "io" - "strconv" - "strings" - "time" - - "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/ssh/client" - - "golang.org/x/crypto/ssh" -) - -// Connection represents a client connection connection to a single server. -type Connection struct { - // The remote server's hostname connected to. - Server string - // The remote server's port connected to. - port int - // The SSH client configuration used. - config *ssh.ClientConfig - // The SSH client handler to use. - Handler handlers.Handler - // DTail commands sent from client to server. When client loses - // connection to the server it re-connects automatically and sends the - // same commands again. - Commands []string - // Is it a persistent connection or a one-off? - isOneOff bool - // To deal with SSH server host keys - hostKeyCallback client.HostKeyCallback - // To determine if connection throttling has finished or not - throttlingDone bool -} - -// NewConnection returns a new connection. -func NewConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback) *Connection { - logger.Debug(server, "Creating new connection") - - c := Connection{ - hostKeyCallback: hostKeyCallback, - config: &ssh.ClientConfig{ - User: userName, - Auth: authMethods, - HostKeyCallback: hostKeyCallback.Wrap(), - Timeout: time.Second * 3, - }, - } - - c.initServerPort(server) - - return &c -} - -// NewOneOffConnection creates new one-off connection (only for sending a series of commands and then quit). -func NewOneOffConnection(server string, userName string, authMethods []ssh.AuthMethod) *Connection { - c := Connection{ - config: &ssh.ClientConfig{ - User: userName, - Auth: authMethods, - HostKeyCallback: ssh.InsecureIgnoreHostKey(), - }, - isOneOff: true, - } - - c.initServerPort(server) - - return &c -} - -// Attempt to parse the server port address from the provided server FQDN. -func (c *Connection) initServerPort(server string) { - c.Server = server - c.port = config.Common.SSHPort - parts := strings.Split(server, ":") - - if len(parts) == 2 { - logger.Debug("Parsing port from hostname", parts) - port, err := strconv.Atoi(parts[1]) - if err != nil { - logger.FatalExit("Unable to parse client port", server, parts, err) - } - c.Server = parts[0] - c.port = port - } -} - -// Start the server connection. Build up SSH session and send some DTail commands. -func (c *Connection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { - // Throttle how many connections can be established concurrently (based on ch length) - logger.Debug(c.Server, "Throttling connection", len(throttleCh), cap(throttleCh)) - - select { - case throttleCh <- struct{}{}: - case <-ctx.Done(): - logger.Debug(c.Server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh)) - return - } - - logger.Debug(c.Server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh)) - - go func() { - defer func() { - if !c.throttlingDone { - logger.Debug(c.Server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh)) - c.throttlingDone = true - <-throttleCh - } - cancel() - }() - - if err := c.dial(ctx, cancel, throttleCh, statsCh); err != nil { - logger.Warn(c.Server, c.port, err) - if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.Server, c.port)) { - logger.Debug(c.Server, "Not trusting host") - } - } - }() - - <-ctx.Done() -} - -// Dail into a new SSH connection. Close connection in case of an error. -func (c *Connection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error { - logger.Debug(c.Server, "Incrementing connection stats") - statsCh <- struct{}{} - defer func() { - logger.Debug(c.Server, "Decrementing connection stats") - <-statsCh - }() - - logger.Debug(c.Server, "Dialing into the connection") - address := fmt.Sprintf("%s:%d", c.Server, c.port) - - client, err := ssh.Dial("tcp", address, c.config) - if err != nil { - return err - } - defer client.Close() - - return c.session(ctx, cancel, client, throttleCh) -} - -// Create the SSH session. Close the session in case of an error. -func (c *Connection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error { - logger.Debug(c.Server, "session") - - session, err := client.NewSession() - if err != nil { - return err - } - defer session.Close() - - return c.handle(ctx, cancel, session, throttleCh) -} - -func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error { - logger.Debug(c.Server, "handle") - - stdinPipe, err := session.StdinPipe() - if err != nil { - return err - } - - stdoutPipe, err := session.StdoutPipe() - if err != nil { - return err - } - - if err := session.Shell(); err != nil { - return err - } - - go func() { - io.Copy(stdinPipe, c.Handler) - cancel() - }() - - go func() { - io.Copy(c.Handler, stdoutPipe) - cancel() - }() - - go func() { - select { - case <-c.Handler.Done(): - case <-ctx.Done(): - } - cancel() - }() - - // Send all commands to client. - for _, command := range c.Commands { - logger.Debug(command) - c.Handler.SendMessage(command) - } - - if !c.throttlingDone { - logger.Debug(c.Server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh)) - c.throttlingDone = true - <-throttleCh - } - - <-ctx.Done() - c.Handler.Shutdown() - return nil -} diff --git a/internal/clients/stats.go b/internal/clients/stats.go index d8163d4..1315aea 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -8,14 +8,16 @@ import ( "sync" "time" + "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/protocol" ) // Used to collect and display various client stats. type stats struct { // Total amount servers to connect to. - connectionsTotal int + servers int // To keep track of what connected and disconnected connectionsEstCh chan struct{} // Amount of servers connections are established. @@ -24,19 +26,20 @@ type stats struct { mutex sync.Mutex } -func newTailStats(connectionsTotal int) *stats { +func newTailStats(servers int) *stats { return &stats{ - connectionsTotal: connectionsTotal, - connectionsEstCh: make(chan struct{}, connectionsTotal), + servers: servers, + connectionsEstCh: make(chan struct{}, servers), connected: 0, } } // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) { - var connectedLast int +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, + statsCh <-chan string, quiet bool) { + var connectedLast int for { var force bool var messages []string @@ -54,18 +57,18 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < throttle := len(throttleCh) newConnections := connected - connectedLast - if (connected == connectedLast || quiet) && !force { continue } - stats := s.statsLine(connected, newConnections, throttle) switch force { case true: + stats := s.statsLine(connected, newConnections, throttle) messages = append(messages, fmt.Sprintf("Connection stats: %s", stats)) s.printStatsDueInterrupt(messages) default: - logger.Info(stats) + data := s.statsData(connected, newConnections, throttle) + dlog.Client.Mapreduce("STATS", data) } connectedLast = connected @@ -76,30 +79,58 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < } func (s *stats) printStatsDueInterrupt(messages []string) { - logger.Pause() - for _, message := range messages { + dlog.Client.Pause() + for i, message := range messages { + if i > 0 && config.Client.TermColorsEnable { + fmt.Println(color.PaintStrWithAttr(message, + config.Client.TermColors.Client.ClientFg, + config.Client.TermColors.Client.ClientBg, + config.Client.TermColors.Client.ClientAttr, + )) + continue + } fmt.Println(fmt.Sprintf(" %s", message)) } time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS)) - logger.Resume() + dlog.Client.Resume() } -func (s *stats) statsLine(connected, newConnections int, throttle int) string { - percConnected := percentOf(float64(s.connectionsTotal), float64(connected)) +func (s *stats) statsData(connected, newConnections int, + throttle int) map[string]interface{} { + + percConnected := percentOf(float64(s.servers), float64(connected)) - var stats []string - stats = append(stats, fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected))) - stats = append(stats, fmt.Sprintf("new=%d", newConnections)) - stats = append(stats, fmt.Sprintf("throttle=%d", throttle)) - stats = append(stats, fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine())) + data := make(map[string]interface{}) + data["connected"] = connected + data["servers"] = s.servers + data["connected%"] = int(percConnected) + data["new"] = newConnections + data["throttle"] = throttle + data["goroutines"] = runtime.NumGoroutine() + data["cgocalls"] = runtime.NumCgoCall() + data["cpu"] = runtime.NumCPU() - return strings.Join(stats, "|") + return data +} + +func (s *stats) statsLine(connected, newConnections int, throttle int) string { + sb := strings.Builder{} + i := 0 + for k, v := range s.statsData(connected, newConnections, throttle) { + if i > 0 { + sb.WriteString(protocol.FieldDelimiter) + } + sb.WriteString(k) + sb.WriteByte('=') + sb.WriteString(fmt.Sprintf("%v", v)) + i++ + } + return sb.String() } func (s *stats) numConnected() int { s.mutex.Lock() defer s.mutex.Unlock() - return s.connected } @@ -107,6 +138,5 @@ func percentOf(total float64, value float64) float64 { if total == 0 || total == value { return 100 } - return value / (total / 100.0) } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index 853ef1d..35c01d4 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -6,7 +6,8 @@ import ( "strings" "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/omode" ) @@ -16,9 +17,8 @@ type TailClient struct { } // NewTailClient returns a new TailClient. -func NewTailClient(args Args) (*TailClient, error) { +func NewTailClient(args config.Args) (*TailClient, error) { args.Mode = omode.TailClient - c := TailClient{ baseClient: baseClient{ Args: args, @@ -29,7 +29,6 @@ func NewTailClient(args Args) (*TailClient, error) { c.init() c.makeConnections(c) - return &c, nil } @@ -37,12 +36,15 @@ func (c TailClient) makeHandler(server string) handlers.Handler { return handlers.NewClientHandler(server) } -func (c TailClient) makeCommands(options map[string]string) (commands []string) { - optionsStr := c.commandOptionsToString(options) +func (c TailClient) makeCommands() (commands []string) { + regex, err := c.Regex.Serialize() + if err != nil { + dlog.Client.FatalPanic(err) + } for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", + c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } - logger.Debug(commands) - + dlog.Client.Debug(commands) return } |
