summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2021-10-21 21:28:49 +0300
committerPaul Buetow <pbuetow@mimecast.com>2021-10-21 21:28:49 +0300
commitf4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch)
treeea5e4a2d2a67035f645bdee496ae55a52034178a /internal/clients
parentd80d6070557e3a800e3a54967af9eced518f116b (diff)
parent739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff)
merge develop
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/baseclient.go131
-rw-r--r--internal/clients/catclient.go16
-rw-r--r--internal/clients/connectors/connector.go17
-rw-r--r--internal/clients/connectors/serverconnection.go206
-rw-r--r--internal/clients/connectors/serverless.go116
-rw-r--r--internal/clients/grepclient.go19
-rw-r--r--internal/clients/handlers/basehandler.go77
-rw-r--r--internal/clients/handlers/clienthandler.go4
-rw-r--r--internal/clients/handlers/healthhandler.go106
-rw-r--r--internal/clients/handlers/maprhandler.go56
-rw-r--r--internal/clients/healthclient.go114
-rw-r--r--internal/clients/maker.go2
-rw-r--r--internal/clients/maprclient.go89
-rw-r--r--internal/clients/remote/connection.go212
-rw-r--r--internal/clients/stats.go76
-rw-r--r--internal/clients/tailclient.go20
16 files changed, 686 insertions, 575 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index f83fcfd..4a7bd84 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -2,15 +2,13 @@ package clients
import (
"context"
- "fmt"
- "strings"
"sync"
"time"
- "github.com/mimecast/dtail/internal/clients/remote"
+ "github.com/mimecast/dtail/internal/clients/connectors"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/discovery"
- "github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/omode"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/regex"
"github.com/mimecast/dtail/internal/ssh/client"
@@ -19,13 +17,13 @@ import (
// This is the main client data structure.
type baseClient struct {
- Args
+ config.Args
// To display client side stats
stats *stats
// List of remote servers to connect to.
servers []string
// We have one connection per remote server.
- connections []*remote.Connection
+ connections []connectors.Connector
// SSH auth methods to use to connect to the remote servers.
sshAuthMethods []gossh.AuthMethod
// To deal with SSH host keys
@@ -41,7 +39,7 @@ type baseClient struct {
}
func (c *baseClient) init() {
- logger.Debug("Initiating base client")
+ dlog.Client.Debug("Initiating base client", c.Args.String())
flag := regex.Default
if c.Args.RegexInvert {
@@ -49,12 +47,16 @@ func (c *baseClient) init() {
}
regex, err := regex.New(c.Args.RegexStr, flag)
if err != nil {
- logger.FatalExit(c.Regex, "invalid regex!", err, regex)
+ dlog.Client.FatalPanic(c.Regex, "Invalid regex!", err, regex)
}
c.Regex = regex
- logger.Debug("Regex", c.Regex)
- c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, c.throttleCh, c.Args.PrivateKeyPathFile)
+ if c.Args.Serverless {
+ return
+ }
+ c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(
+ c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts,
+ c.throttleCh, c.Args.PrivateKeyPathFile)
}
func (c *baseClient) makeConnections(maker maker) {
@@ -62,26 +64,31 @@ func (c *baseClient) makeConnections(maker maker) {
discoveryService := discovery.New(c.Discovery, c.ServersStr, discovery.Shuffle)
for _, server := range discoveryService.ServerList() {
- c.connections = append(c.connections, c.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback))
+ c.connections = append(c.connections, c.makeConnection(server,
+ c.sshAuthMethods, c.hostKeyCallback))
}
c.stats = newTailStats(len(c.connections))
}
func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
- // Periodically check for unknown hosts, and ask the user whether to trust them or not.
- go c.hostKeyCallback.PromptAddHosts(ctx)
+ dlog.Client.Trace("Starting base client")
+ // Can be nil when serverless.
+ if c.hostKeyCallback != nil {
+ // Periodically check for unknown hosts, and ask the user whether to trust them or not.
+ go c.hostKeyCallback.PromptAddHosts(ctx)
+ }
// Print client stats every time something on statsCh is recieved.
go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet)
- // Keep count of active connections
- active := make(chan struct{}, len(c.connections))
+ var wg sync.WaitGroup
+ wg.Add(len(c.connections))
var mutex sync.Mutex
- for i, conn := range c.connections {
- go func(i int, conn *remote.Connection) {
- connStatus := c.start(ctx, active, i, conn)
- // Update global status.
+ for i, conn := range c.connections {
+ go func(i int, conn connectors.Connector) {
+ defer wg.Done()
+ connStatus := c.startConnection(ctx, i, conn)
mutex.Lock()
defer mutex.Unlock()
if connStatus > status {
@@ -90,15 +97,12 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
}(i, conn)
}
- c.waitUntilDone(ctx, active)
+ wg.Wait()
return
}
-func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, conn *remote.Connection) (status int) {
- // Increment connection count
- active <- struct{}{}
- // Derement connection count
- defer func() { <-active }()
+func (c *baseClient) startConnection(ctx context.Context, i int,
+ conn connectors.Connector) (status int) {
for {
connCtx, cancel := context.WithCancel(ctx)
@@ -106,80 +110,25 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con
conn.Start(connCtx, cancel, c.throttleCh, c.stats.connectionsEstCh)
// Retrieve status code from handler (dtail client will exit with that status)
- status = conn.Handler.Status()
+ status = conn.Handler().Status()
if !c.retry {
return
}
time.Sleep(time.Second * 2)
- logger.Debug(conn.Server, "Reconnecting")
-
- conn = c.makeConnection(conn.Server, c.sshAuthMethods, c.hostKeyCallback)
+ dlog.Client.Debug(conn.Server(), "Reconnecting")
+ conn = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback)
c.connections[i] = conn
}
}
-func (c *baseClient) makeCommandOptions() map[string]string {
- options := make(map[string]string)
-
- if c.Args.Quiet {
- options["quiet"] = fmt.Sprintf("%v", c.Args.Quiet)
- }
- if c.Args.LContext.MaxCount != 0 {
- options["max"] = fmt.Sprintf("%d", c.Args.LContext.MaxCount)
- }
- if c.Args.LContext.BeforeContext != 0 {
- options["before"] = fmt.Sprintf("%d", c.Args.LContext.BeforeContext)
- }
- if c.Args.LContext.AfterContext != 0 {
- options["after"] = fmt.Sprintf("%d", c.Args.LContext.AfterContext)
- }
-
- return options
-}
-
-func (c *baseClient) commandOptionsToString(options map[string]string) string {
- var sb strings.Builder
-
- count := 0
- for k, v := range options {
- if count > 0 {
- sb.WriteString(":")
- }
- sb.WriteString(fmt.Sprintf("%s=%s", k, v))
- count++
- }
-
- return sb.String()
-}
-
-func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) *remote.Connection {
- conn := remote.NewConnection(server, c.UserName, sshAuthMethods, hostKeyCallback)
- conn.Handler = c.maker.makeHandler(server)
- conn.Commands = c.maker.makeCommands(c.makeCommandOptions())
-
- return conn
-}
-
-func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
- defer logger.Debug("Terminated connection")
-
- // We want to have at least one active connection
- <-active
- // Put it back on the channel
- active <- struct{}{}
-
- if c.Mode == omode.TailClient && c.retry {
- <-ctx.Done()
- }
-
- for {
- numActive := len(active)
- if numActive == 0 {
- return
- }
- logger.Debug("Active connections", numActive)
- time.Sleep(time.Second)
+func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod,
+ hostKeyCallback client.HostKeyCallback) connectors.Connector {
+ if c.Args.Serverless {
+ return connectors.NewServerless(c.UserName, c.maker.makeHandler(server),
+ c.maker.makeCommands())
}
+ return connectors.NewServerConnection(server, c.UserName, sshAuthMethods,
+ hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands())
}
diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go
index db892f1..bd65560 100644
--- a/internal/clients/catclient.go
+++ b/internal/clients/catclient.go
@@ -7,6 +7,8 @@ import (
"strings"
"github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
@@ -16,11 +18,10 @@ type CatClient struct {
}
// NewCatClient returns a new cat client.
-func NewCatClient(args Args) (*CatClient, error) {
+func NewCatClient(args config.Args) (*CatClient, error) {
if args.RegexStr != "" {
return nil, errors.New("Can't use regex with 'cat' operating mode")
}
-
args.Mode = omode.CatClient
c := CatClient{
@@ -33,7 +34,6 @@ func NewCatClient(args Args) (*CatClient, error) {
c.init()
c.makeConnections(c)
-
return &c, nil
}
@@ -41,10 +41,14 @@ func (c CatClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}
-func (c CatClient) makeCommands(options map[string]string) (commands []string) {
- optionsStr := c.commandOptionsToString(options)
+func (c CatClient) makeCommands() (commands []string) {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s",
+ c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
return
}
diff --git a/internal/clients/connectors/connector.go b/internal/clients/connectors/connector.go
new file mode 100644
index 0000000..3ab6a08
--- /dev/null
+++ b/internal/clients/connectors/connector.go
@@ -0,0 +1,17 @@
+package connectors
+
+import (
+ "context"
+
+ "github.com/mimecast/dtail/internal/clients/handlers"
+)
+
+// Connector interface.
+type Connector interface {
+ // Start the connection.
+ Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{})
+ // Server hostname.
+ Server() string
+ // Handler for the connection.
+ Handler() handlers.Handler
+}
diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go
new file mode 100644
index 0000000..aeb2a41
--- /dev/null
+++ b/internal/clients/connectors/serverconnection.go
@@ -0,0 +1,206 @@
+package connectors
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/ssh/client"
+
+ "golang.org/x/crypto/ssh"
+)
+
+// ServerConnection represents a connection to a single remote dtail server via
+// SSH protocol.
+type ServerConnection struct {
+ // The full server string as received from the server discovery (can be with port number)
+ server string
+ // Only the hostname or FQDN (without the port number)
+ hostname string
+ // Only the port number.
+ port int
+ config *ssh.ClientConfig
+ handler handlers.Handler
+ commands []string
+ hostKeyCallback client.HostKeyCallback
+ throttlingDone bool
+}
+
+// NewServerConnection returns a new DTail SSH server connection.
+func NewServerConnection(server string, userName string,
+ authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback,
+ handler handlers.Handler, commands []string) *ServerConnection {
+
+ dlog.Client.Debug(server, "Creating new connection", server, handler, commands)
+ c := ServerConnection{
+ hostKeyCallback: hostKeyCallback,
+ server: server,
+ handler: handler,
+ commands: commands,
+ config: &ssh.ClientConfig{
+ User: userName,
+ Auth: authMethods,
+ HostKeyCallback: hostKeyCallback.Wrap(),
+ Timeout: time.Second * 2,
+ },
+ }
+
+ c.initServerPort()
+ return &c
+}
+
+// Server returns the server hostname connected to.
+func (c *ServerConnection) Server() string { return c.server }
+
+// Handler returns the handler used for the connection.
+func (c *ServerConnection) Handler() handlers.Handler { return c.handler }
+
+// Attempt to parse the server port address from the provided server FQDN.
+func (c *ServerConnection) initServerPort() {
+ parts := strings.Split(c.server, ":")
+ if len(parts) == 1 {
+ c.hostname = c.server
+ c.port = config.Common.SSHPort
+ return
+ }
+
+ dlog.Client.Debug("Parsing port from hostname", parts)
+ port, err := strconv.Atoi(parts[1])
+ if err != nil {
+ dlog.Client.FatalPanic("Unable to parse client port", c.server, parts, err)
+ }
+ c.hostname = parts[0]
+ c.port = port
+}
+
+// Start the connection to the server.
+func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc,
+ throttleCh, statsCh chan struct{}) {
+
+ // Throttle how many connections can be established concurrently (based on ch length)
+ dlog.Client.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh))
+
+ select {
+ case throttleCh <- struct{}{}:
+ case <-ctx.Done():
+ dlog.Client.Debug(c.server, "Not establishing connection as context is done",
+ len(throttleCh), cap(throttleCh))
+ return
+ }
+
+ dlog.Client.Debug(c.server, "Throttling says that the connection can be established",
+ len(throttleCh), cap(throttleCh))
+
+ go func() {
+ defer func() {
+ if !c.throttlingDone {
+ dlog.Client.Debug(c.server, "Unthrottling connection (1)",
+ len(throttleCh), cap(throttleCh))
+ c.throttlingDone = true
+ <-throttleCh
+ }
+ cancel()
+ }()
+
+ if err := c.dial(ctx, cancel, throttleCh, statsCh); err != nil {
+ dlog.Client.Warn(c.server, err)
+ if c.hostKeyCallback.Untrusted(c.server) {
+ dlog.Client.Debug(c.server, "Not trusting host")
+ }
+ }
+ }()
+
+ <-ctx.Done()
+}
+
+// Dail into a new SSH connection. Close connection in case of an error.
+func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc,
+ throttleCh, statsCh chan struct{}) error {
+
+ dlog.Client.Debug(c.server, "Incrementing connection stats")
+ statsCh <- struct{}{}
+ defer func() {
+ dlog.Client.Debug(c.server, "Decrementing connection stats")
+ <-statsCh
+ }()
+
+ address := fmt.Sprintf("%s:%d", c.hostname, c.port)
+ dlog.Client.Debug(c.server, "Dialing into the connection", address)
+
+ client, err := ssh.Dial("tcp", address, c.config)
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ return c.session(ctx, cancel, client, throttleCh)
+}
+
+// Create the SSH session. Close the session in case of an error.
+func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc,
+ client *ssh.Client, throttleCh chan struct{}) error {
+
+ dlog.Client.Debug(c.server, "Creating SSH session")
+ session, err := client.NewSession()
+ if err != nil {
+ return err
+ }
+ defer session.Close()
+ return c.handle(ctx, cancel, session, throttleCh)
+}
+
+func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc,
+ session *ssh.Session, throttleCh chan struct{}) error {
+
+ dlog.Client.Debug(c.server, "Creating handler for SSH session")
+ stdinPipe, err := session.StdinPipe()
+ if err != nil {
+ return err
+ }
+ stdoutPipe, err := session.StdoutPipe()
+ if err != nil {
+ return err
+ }
+ if err := session.Shell(); err != nil {
+ return err
+ }
+
+ go func() {
+ io.Copy(stdinPipe, c.handler)
+ cancel()
+ }()
+ go func() {
+ io.Copy(c.handler, stdoutPipe)
+ cancel()
+ }()
+ go func() {
+ select {
+ case <-c.handler.Done():
+ case <-ctx.Done():
+ }
+ cancel()
+ }()
+
+ // Send all commands to client.
+ for _, command := range c.commands {
+ dlog.Client.Debug(command)
+ c.handler.SendMessage(command)
+ }
+
+ if !c.throttlingDone {
+ dlog.Client.Debug(c.server, "Unthrottling connection (2)",
+ len(throttleCh), cap(throttleCh))
+ c.throttlingDone = true
+ <-throttleCh
+ }
+
+ <-ctx.Done()
+ c.handler.Shutdown()
+ return nil
+}
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
new file mode 100644
index 0000000..2ff490a
--- /dev/null
+++ b/internal/clients/connectors/serverless.go
@@ -0,0 +1,116 @@
+package connectors
+
+import (
+ "context"
+ "io"
+
+ "github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ serverHandlers "github.com/mimecast/dtail/internal/server/handlers"
+ user "github.com/mimecast/dtail/internal/user/server"
+)
+
+// Serverless creates a server object directly without TCP.
+type Serverless struct {
+ handler handlers.Handler
+ commands []string
+ userName string
+}
+
+// NewServerless starts a new serverless session.
+func NewServerless(userName string, handler handlers.Handler,
+ commands []string) *Serverless {
+
+ dlog.Client.Debug("Creating new serverless connector", handler, commands)
+ return &Serverless{
+ userName: userName,
+ handler: handler,
+ commands: commands,
+ }
+}
+
+// Server returns serverless server indicator.
+func (s *Serverless) Server() string {
+ return "local(serverless)"
+}
+
+// Handler returns the handler used for the serverless connection.
+func (s *Serverless) Handler() handlers.Handler {
+ return s.handler
+}
+
+// Start the serverless connection.
+func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc,
+ throttleCh, statsCh chan struct{}) {
+
+ dlog.Client.Debug("Starting serverless connector")
+ go func() {
+ defer cancel()
+
+ if err := s.handle(ctx, cancel); err != nil {
+ dlog.Client.Warn(err)
+ }
+ }()
+ <-ctx.Done()
+}
+
+func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error {
+ dlog.Client.Debug("Creating server handler for a serverless session")
+
+ user, err := user.New(s.userName, s.Server())
+ if err != nil {
+ return err
+ }
+
+ var serverHandler serverHandlers.Handler
+ switch s.userName {
+ case config.HealthUser:
+ dlog.Client.Debug("Creating serverless health handler")
+ serverHandler = serverHandlers.NewHealthHandler(user)
+ default:
+ dlog.Client.Debug("Creating serverless server handler")
+ serverHandler = serverHandlers.NewServerHandler(
+ user,
+ make(chan struct{}, config.Server.MaxConcurrentCats),
+ make(chan struct{}, config.Server.MaxConcurrentTails),
+ )
+ }
+
+ terminate := func() {
+ dlog.Client.Debug("Terminating serverless connection")
+ serverHandler.Shutdown()
+ cancel()
+ }
+
+ go func() {
+ io.Copy(serverHandler, s.handler)
+ dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done")
+ terminate()
+ }()
+ go func() {
+ io.Copy(s.handler, serverHandler)
+ dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done")
+ terminate()
+ }()
+ go func() {
+ select {
+ case <-s.handler.Done():
+ dlog.Client.Trace("<-s.handler.Done()")
+ case <-ctx.Done():
+ dlog.Client.Trace("<-ctx.Done()")
+ }
+ terminate()
+ }()
+
+ // Send all commands to client.
+ for _, command := range s.commands {
+ dlog.Client.Debug("Sending command to serverless server", command)
+ s.handler.SendMessage(command)
+ }
+
+ <-ctx.Done()
+ dlog.Client.Trace("s.handler.Shutdown()")
+ s.handler.Shutdown()
+ return nil
+}
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go
index 567193a..7521c67 100644
--- a/internal/clients/grepclient.go
+++ b/internal/clients/grepclient.go
@@ -7,16 +7,19 @@ import (
"strings"
"github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
-// GrepClient searches a remote file for all lines matching a regular expression. Only the matching lines are displayed.
+// GrepClient searches a remote file for all lines matching a regular
+// expression. Only the matching lines are displayed.
type GrepClient struct {
baseClient
}
// NewGrepClient creates a new grep client.
-func NewGrepClient(args Args) (*GrepClient, error) {
+func NewGrepClient(args config.Args) (*GrepClient, error) {
if args.RegexStr == "" {
return nil, errors.New("No regex specified, use '-regex' flag")
}
@@ -32,7 +35,6 @@ func NewGrepClient(args Args) (*GrepClient, error) {
c.init()
c.makeConnections(c)
-
return &c, nil
}
@@ -40,11 +42,14 @@ func (c GrepClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}
-func (c GrepClient) makeCommands(options map[string]string) (commands []string) {
- optionsStr := c.commandOptionsToString(options)
+func (c GrepClient) makeCommands() (commands []string) {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s",
+ c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
-
return
}
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 602a7ac..b520c25 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"encoding/base64"
"fmt"
"io"
@@ -8,8 +9,8 @@ import (
"time"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/version"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/protocol"
)
type baseHandler struct {
@@ -17,10 +18,20 @@ type baseHandler struct {
server string
shellStarted bool
commands chan string
- receiveBuf []byte
+ receiveBuf bytes.Buffer
status int
}
+func (h *baseHandler) String() string {
+ return fmt.Sprintf("baseHandler(%s,server:%s,shellStarted:%v,status:%d)@%p",
+ h.done,
+ h.server,
+ h.shellStarted,
+ h.status,
+ h,
+ )
+}
+
func (h *baseHandler) Server() string {
return h.server
}
@@ -29,21 +40,13 @@ func (h *baseHandler) Status() int {
return h.status
}
-func (h *baseHandler) Done() <-chan struct{} {
- return h.done.Done()
-}
-
-func (h *baseHandler) Shutdown() {
- h.done.Shutdown()
-}
-
// SendMessage to the server.
func (h *baseHandler) SendMessage(command string) error {
encoded := base64.StdEncoding.EncodeToString([]byte(command))
- logger.Debug("Sending command", h.server, command, encoded)
+ dlog.Client.Debug("Sending command", h.server, command, encoded)
select {
- case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded):
+ case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded):
case <-time.After(time.Second * 5):
return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded)
case <-h.Done():
@@ -56,13 +59,20 @@ func (h *baseHandler) SendMessage(command string) error {
// Read data from the dtail server via Writer interface.
func (h *baseHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.receiveBuf = append(h.receiveBuf, b)
- if b == '\n' {
- if len(h.receiveBuf) == 0 {
+ switch b {
+ /*
+ // NEXT: Next DTail version make it so that '\n' gets ignored. For now
+ // leave it for compatibility with older DTail server + ability to display
+ // the protocol mismatch warn message.
+ case '\n' {
continue
- }
- message := string(h.receiveBuf)
- h.handleMessageType(message)
+ */
+ case '\n', protocol.MessageDelimiter:
+ message := h.receiveBuf.String()
+ h.handleMessage(message)
+ h.receiveBuf.Reset()
+ default:
+ h.receiveBuf.WriteByte(b)
}
}
@@ -77,31 +87,32 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
case <-h.Done():
return 0, io.EOF
}
-
return
}
-// Handle various message types.
-func (h *baseHandler) handleMessageType(message string) {
- if len(h.receiveBuf) == 0 {
- return
- }
-
- // Hidden server commands starti with a dot "."
- if h.receiveBuf[0] == '.' {
+func (h *baseHandler) handleMessage(message string) {
+ if len(message) > 0 && message[0] == '.' {
h.handleHiddenMessage(message)
- h.receiveBuf = h.receiveBuf[:0]
return
}
- logger.Raw(message)
- h.receiveBuf = h.receiveBuf[:0]
+ dlog.Client.Raw(message)
}
// Handle messages received from server which are not meant to be displayed
// to the end user.
func (h *baseHandler) handleHiddenMessage(message string) {
- if strings.HasPrefix(message, ".syn close connection") {
- h.SendMessage(".ack close connection")
+ switch {
+ case strings.HasPrefix(message, ".syn close connection"):
+ go h.SendMessage(".ack close connection")
+ h.Shutdown()
}
}
+
+func (h *baseHandler) Done() <-chan struct{} {
+ return h.done.Done()
+}
+
+func (h *baseHandler) Shutdown() {
+ h.done.Shutdown()
+}
diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go
index 2bcb038..27ac85e 100644
--- a/internal/clients/handlers/clienthandler.go
+++ b/internal/clients/handlers/clienthandler.go
@@ -2,7 +2,7 @@ package handlers
import (
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// ClientHandler is the basic client handler interface.
@@ -12,7 +12,7 @@ type ClientHandler struct {
// NewClientHandler creates a new client handler.
func NewClientHandler(server string) *ClientHandler {
- logger.Debug(server, "Creating new client handler")
+ dlog.Client.Debug(server, "Creating new client handler")
return &ClientHandler{
baseHandler{
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 0440706..47b594e 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -1,88 +1,56 @@
package handlers
import (
- "errors"
- "fmt"
- "time"
+ "strings"
"github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/protocol"
)
-// HealthHandler implements the handler required for health checks.
+// HealthHandler is the handler used on the client side for running mapreduce
+// aggregations.
type HealthHandler struct {
- done *internal.Done
- // Buffer of incoming data from server.
- receiveBuf []byte
- // To send commands to the server.
- commands chan string
- // To receive messages from the server.
- receive chan<- string
- // The remote server address
- server string
- // The return status.
- status int
-}
-
-// NewHealthHandler returns a new health check handler.
-func NewHealthHandler(server string, receive chan<- string) *HealthHandler {
- h := HealthHandler{
- server: server,
- receive: receive,
- commands: make(chan string),
- status: -1,
- done: internal.NewDone(),
- }
-
- return &h
-}
-
-// Server returns the remote server name.
-func (h *HealthHandler) Server() string {
- return h.server
-}
-
-// Status of the handler.
-func (h *HealthHandler) Status() int {
- return h.status
-}
-
-// Done returns done channel of the handler.
-func (h *HealthHandler) Done() <-chan struct{} {
- return h.done.Done()
-}
-
-// Shutdown the handler.
-func (h *HealthHandler) Shutdown() {
- h.done.Shutdown()
-}
-
-// SendMessage sends a DTail command to the server.
-func (h *HealthHandler) SendMessage(command string) error {
- select {
- case h.commands <- fmt.Sprintf("%s;", command):
- case <-time.NewTimer(time.Second * 10).C:
- return errors.New("Timed out sending command " + command)
- case <-h.Done():
+ baseHandler
+}
+
+// NewHealthHandler returns a new health client handler.
+func NewHealthHandler(server string) *HealthHandler {
+ dlog.Client.Debug(server, "Creating new health handler")
+ return &HealthHandler{
+ baseHandler: baseHandler{
+ server: server,
+ shellStarted: false,
+ commands: make(chan string),
+ status: 2, // Assume CRITICAL status by default.
+ done: internal.NewDone(),
+ },
}
-
- return nil
}
-// Server writes byte stream to client.
+// Read data from the dtail server via Writer interface.
func (h *HealthHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.receiveBuf = append(h.receiveBuf, b)
- if b == '\n' {
- h.receive <- string(h.receiveBuf)
- h.receiveBuf = h.receiveBuf[:0]
+ switch b {
+ case '\n', protocol.MessageDelimiter:
+ message := h.baseHandler.receiveBuf.String()
+ h.handleMessage(message)
+ h.baseHandler.receiveBuf.Reset()
+ default:
+ h.baseHandler.receiveBuf.WriteByte(b)
}
}
-
return len(p), nil
}
-// Server reads byte stream from client.
-func (h *HealthHandler) Read(p []byte) (n int, err error) {
- n = copy(p, []byte(<-h.commands))
- return
+func (h *HealthHandler) handleMessage(message string) {
+ if len(message) > 0 && message[0] == '.' {
+ h.baseHandler.handleHiddenMessage(message)
+ return
+ }
+ s := strings.Split(message, protocol.FieldDelimiter)
+ message = s[len(s)-1]
+ if message == "OK" {
+ h.baseHandler.status = 0
+ }
}
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index fb71c8f..8718b35 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -4,21 +4,24 @@ import (
"strings"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
+ "github.com/mimecast/dtail/internal/protocol"
)
-// MaprHandler is the handler used on the client side for running mapreduce aggregations.
+// MaprHandler is the handler used on the client side for running mapreduce
+// aggregations.
type MaprHandler struct {
baseHandler
aggregate *client.Aggregate
query *mapr.Query
- count uint64
}
// NewMaprHandler returns a new mapreduce client handler.
-func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler {
+func NewMaprHandler(server string, query *mapr.Query,
+ globalGroup *mapr.GlobalGroupSet) *MaprHandler {
+
return &MaprHandler{
baseHandler: baseHandler{
server: server,
@@ -35,34 +38,35 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr
// Read data from the dtail server via Writer interface.
func (h *MaprHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b)
- if b == '\n' {
- if len(h.baseHandler.receiveBuf) == 0 {
- continue
- }
- message := string(h.baseHandler.receiveBuf)
-
- if h.baseHandler.receiveBuf[0] == 'A' {
- h.handleAggregateMessage(strings.TrimSpace(message))
- h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0]
- continue
+ switch b {
+ case '\n':
+ continue
+ case protocol.MessageDelimiter:
+ message := h.baseHandler.receiveBuf.String()
+ dlog.Client.Debug(message)
+ if message[0] == 'A' {
+ h.handleAggregateMessage(message)
+ } else {
+ h.baseHandler.handleMessage(message)
}
- h.baseHandler.handleMessageType(message)
+ h.baseHandler.receiveBuf.Reset()
+ default:
+ h.baseHandler.receiveBuf.WriteByte(b)
}
}
return len(p), nil
}
-// Handle a message received from server including mapr aggregation
-// related data.
+// Handle a message received from server including mapr aggregation related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
- h.count++
- parts := strings.Split(message, "➔")
-
- // Index 0 contains 'AGGREGATE', 1 contains server host.
- // Aggregation data begins from index 2.
- logger.Debug("Received aggregate data", h.server, h.count, parts)
- h.aggregate.Aggregate(parts[2:])
- logger.Debug("Aggregated aggregate data", h.server, h.count)
+ parts := strings.SplitN(message, protocol.FieldDelimiter, 3)
+ if len(parts) != 3 {
+ dlog.Client.Error("Unable to aggregate data", h.server, message, parts,
+ len(parts), "expected 3 parts")
+ return
+ }
+ if err := h.aggregate.Aggregate(parts[2]); err != nil {
+ dlog.Client.Error("Unable to aggregate data", h.server, message, err)
+ }
}
diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go
index e93f6be..1a02827 100644
--- a/internal/clients/healthclient.go
+++ b/internal/clients/healthclient.go
@@ -4,93 +4,75 @@ import (
"context"
"fmt"
"runtime"
- "strings"
- "time"
"github.com/mimecast/dtail/internal/clients/handlers"
- "github.com/mimecast/dtail/internal/clients/remote"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/omode"
gossh "golang.org/x/crypto/ssh"
)
-// HealthClient is used for health checking (e.g. via Nagios)
+// HealthClient is used to perform a basic server health check.
type HealthClient struct {
- // Client operating mode
- mode omode.Mode
- // The remote server address
- server string
- // SSH user name
- userName string
- // SSH auth methods to use to connect to the remote servers.
- sshAuthMethods []gossh.AuthMethod
+ baseClient
}
-// NewHealthClient returns a new healh client.
-func NewHealthClient(mode omode.Mode) (*HealthClient, error) {
+// NewHealthClient returns a new health client.
+func NewHealthClient(args config.Args) (*HealthClient, error) {
+ args.Mode = omode.HealthClient
+ args.UserName = config.HealthUser
c := HealthClient{
- mode: mode,
- server: fmt.Sprintf("%s:%d", config.Server.SSHBindAddress, config.Common.SSHPort),
- userName: config.ControlUser,
+ baseClient: baseClient{
+ Args: args,
+ throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
+ retry: false,
+ },
}
- c.initSSHAuthMethods()
+ c.init()
+ c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser))
+ c.makeConnections(c)
return &c, nil
}
-// Start the health client.
-func (c *HealthClient) Start(ctx context.Context) (status int) {
- receive := make(chan string)
-
- throttleCh := make(chan struct{}, runtime.NumCPU())
- statsCh := make(chan struct{}, 1)
-
- conn := remote.NewOneOffConnection(c.server, c.userName, c.sshAuthMethods)
- conn.Handler = handlers.NewHealthHandler(c.server, receive)
- conn.Commands = []string{c.mode.String()}
-
- connCtx, cancel := context.WithCancel(ctx)
- go conn.Start(connCtx, cancel, throttleCh, statsCh)
+func (c HealthClient) makeHandler(server string) handlers.Handler {
+ return handlers.NewHealthHandler(server)
+}
- for {
- select {
- case data := <-receive:
- // Parse recieved data.
- s := strings.Split(data, "|")
- message := s[len(s)-1]
- if strings.HasPrefix(message, "done;") {
- return
- }
+func (c HealthClient) makeCommands() (commands []string) {
+ commands = append(commands, "health")
+ return
+}
- // Set severity.
- s = strings.Split(message, ":")
- switch s[0] {
- case "OK":
- case "WARNING":
- if status < 1 {
- status = 1
- }
- case "CRITICAL":
- status = 2
- case "UNKNOWN":
- status = 3
- default:
- fmt.Printf("CRITICAL: Unexpected server response: '%s'\n", message)
- status = 2
- return
- }
- fmt.Print(message)
+// Start the health client.
+func (c *HealthClient) Start(ctx context.Context, statsCh <-chan string) int {
+ status := c.baseClient.Start(ctx, statsCh)
- case <-time.After(time.Second * 2):
- status = 2
- fmt.Println("CRITICAL: Could not communicate with DTail server")
- return
+ switch status {
+ case 0:
+ if c.Serverless {
+ fmt.Printf("WARNING: All seems fine but the check only run in serverless mode" +
+ ", please specify a remote server via --server hostname:port\n")
+ return 1
+ }
+ fmt.Printf("OK: All fine at %s :-)\n", c.ServersStr)
+ case 2:
+ if c.Serverless {
+ fmt.Printf("CRITICAL: DTail server not operating properly (using " +
+ "serverless connction)!\n")
+ return 2
}
+ fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n",
+ c.ServersStr)
+ default:
+ if c.Serverless {
+ fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless "+
+ "connection)\n", status)
+ return status
+ }
+ fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n",
+ status, c.ServersStr)
}
-}
-// Initialize SSH auth methods.
-func (c *HealthClient) initSSHAuthMethods() {
- c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser))
+ return status
}
diff --git a/internal/clients/maker.go b/internal/clients/maker.go
index a1d6864..d5ffd8b 100644
--- a/internal/clients/maker.go
+++ b/internal/clients/maker.go
@@ -9,5 +9,5 @@ import (
// and send different commands to the DTail server.
type maker interface {
makeHandler(server string) handlers.Handler
- makeCommands(options map[string]string) (commands []string)
+ makeCommands() (commands []string)
}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index feb7e47..246946f 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -9,7 +9,9 @@ import (
"time"
"github.com/mimecast/dtail/internal/clients/handlers"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/color"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/omode"
)
@@ -29,25 +31,25 @@ const (
// MaprClient is used for running mapreduce aggregations on remote files.
type MaprClient struct {
baseClient
- // Query string for mapr aggregations
- queryStr string
// Global group set for merged mapr aggregation results
globalGroup *mapr.GlobalGroupSet
// The query object (constructed from queryStr)
query *mapr.Query
// Additative result or new result every interval run?
cumulative bool
+ // The last result string received
+ lastResult string
}
// NewMaprClient returns a new mapreduce client.
-func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) {
- if queryStr == "" {
+func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient, error) {
+ if args.QueryStr == "" {
return nil, errors.New("No mapreduce query specified, use '-query' flag")
}
- query, err := mapr.NewQuery(queryStr)
+ query, err := mapr.NewQuery(args.QueryStr)
if err != nil {
- logger.FatalExit(queryStr, "Can't parse mapr query", err)
+ dlog.Client.FatalPanic(args.QueryStr, "Can't parse mapr query", err)
}
// Don't retry connection if in tail mode and no outfile specified.
@@ -64,7 +66,7 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*
cumulative = args.Mode == omode.MapClient || query.HasOutfile()
}
- logger.Debug("Cumulative mapreduce mode?", cumulative)
+ dlog.Client.Debug("Cumulative mapreduce mode?", cumulative)
c := MaprClient{
baseClient: baseClient{
@@ -73,7 +75,6 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*
retry: retry,
},
query: query,
- queryStr: queryStr,
cumulative: cumulative,
}
@@ -99,46 +100,51 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
status = c.baseClient.Start(ctx, statsCh)
if c.cumulative {
- logger.Debug("Received final mapreduce result")
+ dlog.Client.Debug("Received final mapreduce result")
c.reportResults()
}
return
}
+// NEXT: Make this a callback function rather trying to use polymorphism to call
+// this. This applies to all clients. It will make the code easier to read.
func (c MaprClient) makeHandler(server string) handlers.Handler {
return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}
-func (c MaprClient) makeCommands(options map[string]string) (commands []string) {
+func (c MaprClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
-
modeStr := "cat"
if c.Mode == omode.TailClient {
modeStr = "tail"
}
- optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
if c.Timeout > 0 {
- commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout,
+ modeStr, file, regex))
continue
}
- commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, optionsStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s",
+ modeStr, c.Args.SerializeOptions(), file, regex))
}
-
return
}
func (c *MaprClient) periodicReportResults(ctx context.Context) {
rampUpSleep := c.query.Interval / 2
- logger.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep)
+ dlog.Client.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep)
time.Sleep(rampUpSleep)
for {
select {
case <-time.After(c.query.Interval):
- logger.Debug("Gathering interim mapreduce result")
+ dlog.Client.Debug("Gathering interim mapreduce result")
c.reportResults()
case <-ctx.Done():
return
@@ -151,42 +157,65 @@ func (c *MaprClient) reportResults() {
c.writeResultsToOutfile()
return
}
-
c.printResults()
}
func (c *MaprClient) printResults() {
var result string
var err error
- var numLines int
+ var numRows int
+ rowsLimit := -1
+
+ if c.query.Limit == -1 {
+ // Limit output to 10 rows when the result is printed to stdout.
+ // This can be overriden with the limit clause though.
+ rowsLimit = 10
+ }
if c.cumulative {
- result, numLines, err = c.globalGroup.Result(c.query)
+ result, numRows, err = c.globalGroup.Result(c.query, rowsLimit)
} else {
- result, numLines, err = c.globalGroup.SwapOut().Result(c.query)
+ result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit)
}
if err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
- if numLines == 0 {
- logger.Warn("Empty result set this time...")
+ if result == c.lastResult {
+ dlog.Client.Debug("Result hasn't changed compared to last time...")
return
}
+ c.lastResult = result
- logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
- logger.Raw(result)
+ if numRows == 0 {
+ dlog.Client.Debug("Empty result set this time...")
+ return
+ }
+
+ rawQuery := c.query.RawQuery
+ if config.Client.TermColorsEnable {
+ rawQuery = color.PaintStrWithAttr(rawQuery,
+ config.Client.TermColors.MaprTable.RawQueryFg,
+ config.Client.TermColors.MaprTable.RawQueryBg,
+ config.Client.TermColors.MaprTable.RawQueryAttr)
+ }
+ dlog.Client.Raw(rawQuery)
+
+ if rowsLimit > 0 && numRows > rowsLimit {
+ dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output "+
+ "to %d rows! Use 'limit' clause to override!", numRows, rowsLimit))
+ }
+ dlog.Client.Raw(result)
}
func (c *MaprClient) writeResultsToOutfile() {
if c.cumulative {
if err := c.globalGroup.WriteResult(c.query); err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
return
}
-
if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
}
diff --git a/internal/clients/remote/connection.go b/internal/clients/remote/connection.go
deleted file mode 100644
index b29ffed..0000000
--- a/internal/clients/remote/connection.go
+++ /dev/null
@@ -1,212 +0,0 @@
-package remote
-
-import (
- "context"
- "fmt"
- "io"
- "strconv"
- "strings"
- "time"
-
- "github.com/mimecast/dtail/internal/clients/handlers"
- "github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/ssh/client"
-
- "golang.org/x/crypto/ssh"
-)
-
-// Connection represents a client connection connection to a single server.
-type Connection struct {
- // The remote server's hostname connected to.
- Server string
- // The remote server's port connected to.
- port int
- // The SSH client configuration used.
- config *ssh.ClientConfig
- // The SSH client handler to use.
- Handler handlers.Handler
- // DTail commands sent from client to server. When client loses
- // connection to the server it re-connects automatically and sends the
- // same commands again.
- Commands []string
- // Is it a persistent connection or a one-off?
- isOneOff bool
- // To deal with SSH server host keys
- hostKeyCallback client.HostKeyCallback
- // To determine if connection throttling has finished or not
- throttlingDone bool
-}
-
-// NewConnection returns a new connection.
-func NewConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback) *Connection {
- logger.Debug(server, "Creating new connection")
-
- c := Connection{
- hostKeyCallback: hostKeyCallback,
- config: &ssh.ClientConfig{
- User: userName,
- Auth: authMethods,
- HostKeyCallback: hostKeyCallback.Wrap(),
- Timeout: time.Second * 3,
- },
- }
-
- c.initServerPort(server)
-
- return &c
-}
-
-// NewOneOffConnection creates new one-off connection (only for sending a series of commands and then quit).
-func NewOneOffConnection(server string, userName string, authMethods []ssh.AuthMethod) *Connection {
- c := Connection{
- config: &ssh.ClientConfig{
- User: userName,
- Auth: authMethods,
- HostKeyCallback: ssh.InsecureIgnoreHostKey(),
- },
- isOneOff: true,
- }
-
- c.initServerPort(server)
-
- return &c
-}
-
-// Attempt to parse the server port address from the provided server FQDN.
-func (c *Connection) initServerPort(server string) {
- c.Server = server
- c.port = config.Common.SSHPort
- parts := strings.Split(server, ":")
-
- if len(parts) == 2 {
- logger.Debug("Parsing port from hostname", parts)
- port, err := strconv.Atoi(parts[1])
- if err != nil {
- logger.FatalExit("Unable to parse client port", server, parts, err)
- }
- c.Server = parts[0]
- c.port = port
- }
-}
-
-// Start the server connection. Build up SSH session and send some DTail commands.
-func (c *Connection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) {
- // Throttle how many connections can be established concurrently (based on ch length)
- logger.Debug(c.Server, "Throttling connection", len(throttleCh), cap(throttleCh))
-
- select {
- case throttleCh <- struct{}{}:
- case <-ctx.Done():
- logger.Debug(c.Server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh))
- return
- }
-
- logger.Debug(c.Server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh))
-
- go func() {
- defer func() {
- if !c.throttlingDone {
- logger.Debug(c.Server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh))
- c.throttlingDone = true
- <-throttleCh
- }
- cancel()
- }()
-
- if err := c.dial(ctx, cancel, throttleCh, statsCh); err != nil {
- logger.Warn(c.Server, c.port, err)
- if c.hostKeyCallback.Untrusted(fmt.Sprintf("%s:%d", c.Server, c.port)) {
- logger.Debug(c.Server, "Not trusting host")
- }
- }
- }()
-
- <-ctx.Done()
-}
-
-// Dail into a new SSH connection. Close connection in case of an error.
-func (c *Connection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error {
- logger.Debug(c.Server, "Incrementing connection stats")
- statsCh <- struct{}{}
- defer func() {
- logger.Debug(c.Server, "Decrementing connection stats")
- <-statsCh
- }()
-
- logger.Debug(c.Server, "Dialing into the connection")
- address := fmt.Sprintf("%s:%d", c.Server, c.port)
-
- client, err := ssh.Dial("tcp", address, c.config)
- if err != nil {
- return err
- }
- defer client.Close()
-
- return c.session(ctx, cancel, client, throttleCh)
-}
-
-// Create the SSH session. Close the session in case of an error.
-func (c *Connection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error {
- logger.Debug(c.Server, "session")
-
- session, err := client.NewSession()
- if err != nil {
- return err
- }
- defer session.Close()
-
- return c.handle(ctx, cancel, session, throttleCh)
-}
-
-func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error {
- logger.Debug(c.Server, "handle")
-
- stdinPipe, err := session.StdinPipe()
- if err != nil {
- return err
- }
-
- stdoutPipe, err := session.StdoutPipe()
- if err != nil {
- return err
- }
-
- if err := session.Shell(); err != nil {
- return err
- }
-
- go func() {
- io.Copy(stdinPipe, c.Handler)
- cancel()
- }()
-
- go func() {
- io.Copy(c.Handler, stdoutPipe)
- cancel()
- }()
-
- go func() {
- select {
- case <-c.Handler.Done():
- case <-ctx.Done():
- }
- cancel()
- }()
-
- // Send all commands to client.
- for _, command := range c.Commands {
- logger.Debug(command)
- c.Handler.SendMessage(command)
- }
-
- if !c.throttlingDone {
- logger.Debug(c.Server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh))
- c.throttlingDone = true
- <-throttleCh
- }
-
- <-ctx.Done()
- c.Handler.Shutdown()
- return nil
-}
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index d8163d4..1315aea 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -8,14 +8,16 @@ import (
"sync"
"time"
+ "github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/protocol"
)
// Used to collect and display various client stats.
type stats struct {
// Total amount servers to connect to.
- connectionsTotal int
+ servers int
// To keep track of what connected and disconnected
connectionsEstCh chan struct{}
// Amount of servers connections are established.
@@ -24,19 +26,20 @@ type stats struct {
mutex sync.Mutex
}
-func newTailStats(connectionsTotal int) *stats {
+func newTailStats(servers int) *stats {
return &stats{
- connectionsTotal: connectionsTotal,
- connectionsEstCh: make(chan struct{}, connectionsTotal),
+ servers: servers,
+ connectionsEstCh: make(chan struct{}, servers),
connected: 0,
}
}
// Start starts printing client connection stats every time a signal is recieved or
// connection count has changed.
-func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) {
- var connectedLast int
+func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{},
+ statsCh <-chan string, quiet bool) {
+ var connectedLast int
for {
var force bool
var messages []string
@@ -54,18 +57,18 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
throttle := len(throttleCh)
newConnections := connected - connectedLast
-
if (connected == connectedLast || quiet) && !force {
continue
}
- stats := s.statsLine(connected, newConnections, throttle)
switch force {
case true:
+ stats := s.statsLine(connected, newConnections, throttle)
messages = append(messages, fmt.Sprintf("Connection stats: %s", stats))
s.printStatsDueInterrupt(messages)
default:
- logger.Info(stats)
+ data := s.statsData(connected, newConnections, throttle)
+ dlog.Client.Mapreduce("STATS", data)
}
connectedLast = connected
@@ -76,30 +79,58 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
}
func (s *stats) printStatsDueInterrupt(messages []string) {
- logger.Pause()
- for _, message := range messages {
+ dlog.Client.Pause()
+ for i, message := range messages {
+ if i > 0 && config.Client.TermColorsEnable {
+ fmt.Println(color.PaintStrWithAttr(message,
+ config.Client.TermColors.Client.ClientFg,
+ config.Client.TermColors.Client.ClientBg,
+ config.Client.TermColors.Client.ClientAttr,
+ ))
+ continue
+ }
fmt.Println(fmt.Sprintf(" %s", message))
}
time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS))
- logger.Resume()
+ dlog.Client.Resume()
}
-func (s *stats) statsLine(connected, newConnections int, throttle int) string {
- percConnected := percentOf(float64(s.connectionsTotal), float64(connected))
+func (s *stats) statsData(connected, newConnections int,
+ throttle int) map[string]interface{} {
+
+ percConnected := percentOf(float64(s.servers), float64(connected))
- var stats []string
- stats = append(stats, fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected)))
- stats = append(stats, fmt.Sprintf("new=%d", newConnections))
- stats = append(stats, fmt.Sprintf("throttle=%d", throttle))
- stats = append(stats, fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine()))
+ data := make(map[string]interface{})
+ data["connected"] = connected
+ data["servers"] = s.servers
+ data["connected%"] = int(percConnected)
+ data["new"] = newConnections
+ data["throttle"] = throttle
+ data["goroutines"] = runtime.NumGoroutine()
+ data["cgocalls"] = runtime.NumCgoCall()
+ data["cpu"] = runtime.NumCPU()
- return strings.Join(stats, "|")
+ return data
+}
+
+func (s *stats) statsLine(connected, newConnections int, throttle int) string {
+ sb := strings.Builder{}
+ i := 0
+ for k, v := range s.statsData(connected, newConnections, throttle) {
+ if i > 0 {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ sb.WriteString(k)
+ sb.WriteByte('=')
+ sb.WriteString(fmt.Sprintf("%v", v))
+ i++
+ }
+ return sb.String()
}
func (s *stats) numConnected() int {
s.mutex.Lock()
defer s.mutex.Unlock()
-
return s.connected
}
@@ -107,6 +138,5 @@ func percentOf(total float64, value float64) float64 {
if total == 0 || total == value {
return 100
}
-
return value / (total / 100.0)
}
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index 853ef1d..35c01d4 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -6,7 +6,8 @@ import (
"strings"
"github.com/mimecast/dtail/internal/clients/handlers"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/omode"
)
@@ -16,9 +17,8 @@ type TailClient struct {
}
// NewTailClient returns a new TailClient.
-func NewTailClient(args Args) (*TailClient, error) {
+func NewTailClient(args config.Args) (*TailClient, error) {
args.Mode = omode.TailClient
-
c := TailClient{
baseClient: baseClient{
Args: args,
@@ -29,7 +29,6 @@ func NewTailClient(args Args) (*TailClient, error) {
c.init()
c.makeConnections(c)
-
return &c, nil
}
@@ -37,12 +36,15 @@ func (c TailClient) makeHandler(server string) handlers.Handler {
return handlers.NewClientHandler(server)
}
-func (c TailClient) makeCommands(options map[string]string) (commands []string) {
- optionsStr := c.commandOptionsToString(options)
+func (c TailClient) makeCommands() (commands []string) {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), optionsStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s",
+ c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
- logger.Debug(commands)
-
+ dlog.Client.Debug(commands)
return
}