diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-09 21:10:29 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-10 13:36:41 +0300 |
| commit | 97747ea0f3178f7f5890512d483fdccaa82846b0 (patch) | |
| tree | 9ff1335ca26afc90e55fd6de416457e252d75a35 /internal/clients | |
| parent | 7a7169791a64190e1002e38bc9c04ad0d5c1ce1f (diff) | |
vetting and linting and some code restyling
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/baseclient.go | 7 | ||||
| -rw-r--r-- | internal/clients/catclient.go | 2 | ||||
| -rw-r--r-- | internal/clients/connectors/serverconnection.go | 48 | ||||
| -rw-r--r-- | internal/clients/connectors/serverless.go | 16 | ||||
| -rw-r--r-- | internal/clients/grepclient.go | 5 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 3 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 13 | ||||
| -rw-r--r-- | internal/clients/healthclient.go | 17 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 13 | ||||
| -rw-r--r-- | internal/clients/stats.go | 11 | ||||
| -rw-r--r-- | internal/clients/tailclient.go | 3 |
11 files changed, 77 insertions, 61 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index d5d7c2c..4a7bd84 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -64,7 +64,8 @@ func (c *baseClient) makeConnections(maker maker) { discoveryService := discovery.New(c.Discovery, c.ServersStr, discovery.Shuffle) for _, server := range discoveryService.ServerList() { - c.connections = append(c.connections, c.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback)) + c.connections = append(c.connections, c.makeConnection(server, + c.sshAuthMethods, c.hostKeyCallback)) } c.stats = newTailStats(len(c.connections)) @@ -100,7 +101,9 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i return } -func (c *baseClient) startConnection(ctx context.Context, i int, conn connectors.Connector) (status int) { +func (c *baseClient) startConnection(ctx context.Context, i int, + conn connectors.Connector) (status int) { + for { connCtx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index 2726e7e..bd65560 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -22,7 +22,6 @@ func NewCatClient(args config.Args) (*CatClient, error) { if args.RegexStr != "" { return nil, errors.New("Can't use regex with 'cat' operating mode") } - args.Mode = omode.CatClient c := CatClient{ @@ -35,7 +34,6 @@ func NewCatClient(args config.Args) (*CatClient, error) { c.init() c.makeConnections(c) - return &c, nil } diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 1666a79..2d7b45a 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -16,7 +16,8 @@ import ( "golang.org/x/crypto/ssh" ) -// ServerConnection represents a connection to a single remote dtail server via SSH protocol. +// ServerConnection represents a connection to a single remote dtail server via +// SSH protocol. type ServerConnection struct { server string port int @@ -28,9 +29,11 @@ type ServerConnection struct { } // NewServerConnection returns a new DTail SSH server connection. -func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection { - dlog.Client.Debug(server, "Creating new connection", server, handler, commands) +func NewServerConnection(server string, userName string, + authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, + handler handlers.Handler, commands []string) *ServerConnection { + dlog.Client.Debug(server, "Creating new connection", server, handler, commands) c := ServerConnection{ hostKeyCallback: hostKeyCallback, server: server, @@ -48,10 +51,12 @@ func NewServerConnection(server string, userName string, authMethods []ssh.AuthM return &c } +// Server returns the server hostname connected to. func (c *ServerConnection) Server() string { return c.server } +// Handler returns the handler used for the connection. func (c *ServerConnection) Handler() handlers.Handler { return c.handler } @@ -72,23 +77,29 @@ func (c *ServerConnection) initServerPort() { } } -func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { +// Start the connection to the server. +func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) { + // Throttle how many connections can be established concurrently (based on ch length) dlog.Client.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh)) select { case throttleCh <- struct{}{}: case <-ctx.Done(): - dlog.Client.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Not establishing connection as context is done", + len(throttleCh), cap(throttleCh)) return } - dlog.Client.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Throttling says that the connection can be established", + len(throttleCh), cap(throttleCh)) go func() { defer func() { if !c.throttlingDone { - dlog.Client.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Unthrottling connection (1)", + len(throttleCh), cap(throttleCh)) c.throttlingDone = true <-throttleCh } @@ -107,7 +118,9 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, } // Dail into a new SSH connection. Close connection in case of an error. -func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error { +func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) error { + dlog.Client.Debug(c.server, "Incrementing connection stats") statsCh <- struct{}{} defer func() { @@ -128,31 +141,30 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, } // Create the SSH session. Close the session in case of an error. -func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error { - dlog.Client.Debug(c.server, "Creating SSH session") +func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, + client *ssh.Client, throttleCh chan struct{}) error { + dlog.Client.Debug(c.server, "Creating SSH session") session, err := client.NewSession() if err != nil { return err } defer session.Close() - return c.handle(ctx, cancel, session, throttleCh) } -func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error { - dlog.Client.Debug(c.server, "Creating handler for SSH session") +func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, + session *ssh.Session, throttleCh chan struct{}) error { + dlog.Client.Debug(c.server, "Creating handler for SSH session") stdinPipe, err := session.StdinPipe() if err != nil { return err } - stdoutPipe, err := session.StdoutPipe() if err != nil { return err } - if err := session.Shell(); err != nil { return err } @@ -161,12 +173,10 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc io.Copy(stdinPipe, c.handler) cancel() }() - go func() { io.Copy(c.handler, stdoutPipe) cancel() }() - go func() { select { case <-c.handler.Done(): @@ -182,13 +192,13 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc } if !c.throttlingDone { - dlog.Client.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh)) + dlog.Client.Debug(c.server, "Unthrottling connection (2)", + len(throttleCh), cap(throttleCh)) c.throttlingDone = true <-throttleCh } <-ctx.Done() c.handler.Shutdown() - return nil } diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index 768a5ce..2ff490a 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -18,8 +18,10 @@ type Serverless struct { userName string } -// NewServerConnection returns a new connection. -func NewServerless(userName string, handler handlers.Handler, commands []string) *Serverless { +// NewServerless starts a new serverless session. +func NewServerless(userName string, handler handlers.Handler, + commands []string) *Serverless { + dlog.Client.Debug("Creating new serverless connector", handler, commands) return &Serverless{ userName: userName, @@ -28,15 +30,20 @@ func NewServerless(userName string, handler handlers.Handler, commands []string) } } +// Server returns serverless server indicator. func (s *Serverless) Server() string { return "local(serverless)" } +// Handler returns the handler used for the serverless connection. func (s *Serverless) Handler() handlers.Handler { return s.handler } -func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { +// Start the serverless connection. +func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, + throttleCh, statsCh chan struct{}) { + dlog.Client.Debug("Starting serverless connector") go func() { defer cancel() @@ -81,13 +88,11 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done") terminate() }() - go func() { io.Copy(s.handler, serverHandler) dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done") terminate() }() - go func() { select { case <-s.handler.Done(): @@ -107,6 +112,5 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro <-ctx.Done() dlog.Client.Trace("s.handler.Shutdown()") s.handler.Shutdown() - return nil } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index ae21ff2..7521c67 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -12,7 +12,8 @@ import ( "github.com/mimecast/dtail/internal/omode" ) -// GrepClient searches a remote file for all lines matching a regular expression. Only the matching lines are displayed. +// GrepClient searches a remote file for all lines matching a regular +// expression. Only the matching lines are displayed. type GrepClient struct { baseClient } @@ -34,7 +35,6 @@ func NewGrepClient(args config.Args) (*GrepClient, error) { c.init() c.makeConnections(c) - return &c, nil } @@ -51,6 +51,5 @@ func (c GrepClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } - return } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 10ba1f7..47b594e 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -8,7 +8,8 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// HealthHandler is the handler used on the client side for running mapreduce aggregations. +// HealthHandler is the handler used on the client side for running mapreduce +// aggregations. type HealthHandler struct { baseHandler } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index d1acfbd..8718b35 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -10,7 +10,8 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// MaprHandler is the handler used on the client side for running mapreduce aggregations. +// MaprHandler is the handler used on the client side for running mapreduce +// aggregations. type MaprHandler struct { baseHandler aggregate *client.Aggregate @@ -18,7 +19,9 @@ type MaprHandler struct { } // NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler { +func NewMaprHandler(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *MaprHandler { + return &MaprHandler{ baseHandler: baseHandler{ server: server, @@ -55,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { return len(p), nil } -// Handle a message received from server including mapr aggregation -// related data. +// Handle a message received from server including mapr aggregation related data. func (h *MaprHandler) handleAggregateMessage(message string) { parts := strings.SplitN(message, protocol.FieldDelimiter, 3) if len(parts) != 3 { - dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, + len(parts), "expected 3 parts") return } if err := h.aggregate.Aggregate(parts[2]); err != nil { diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index ac1dc20..1a02827 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -32,7 +32,6 @@ func NewHealthClient(args config.Args) (*HealthClient, error) { c.init() c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser)) c.makeConnections(c) - return &c, nil } @@ -45,28 +44,34 @@ func (c HealthClient) makeCommands() (commands []string) { return } +// Start the health client. func (c *HealthClient) Start(ctx context.Context, statsCh <-chan string) int { status := c.baseClient.Start(ctx, statsCh) switch status { case 0: if c.Serverless { - fmt.Printf("WARNING: All seems fine but the check only run in serverless mode, please specify a remote server via --server hostname:port\n") + fmt.Printf("WARNING: All seems fine but the check only run in serverless mode" + + ", please specify a remote server via --server hostname:port\n") return 1 } fmt.Printf("OK: All fine at %s :-)\n", c.ServersStr) case 2: if c.Serverless { - fmt.Printf("CRITICAL: DTail server not operating properly (using serverless connction)!\n") + fmt.Printf("CRITICAL: DTail server not operating properly (using " + + "serverless connction)!\n") return 2 } - fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n", c.ServersStr) + fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n", + c.ServersStr) default: if c.Serverless { - fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless connection)\n", status) + fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless "+ + "connection)\n", status) return status } - fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n", status, c.ServersStr) + fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n", + status, c.ServersStr) } return status diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 412a219..04f258d 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -107,15 +107,14 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i return } -// NEXT: Make this a callback function rather trying to use polymorphism to call this. -// This applies to all clients. +// NEXT: Make this a callback function rather trying to use polymorphism to call +// this. This applies to all clients. func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) - modeStr := "cat" if c.Mode == omode.TailClient { modeStr = "tail" @@ -134,7 +133,6 @@ func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, c.Args.SerializeOptions(), file, regex)) } - return } @@ -155,7 +153,6 @@ func (c *MaprClient) reportResults() { c.writeResultsToOutfile() return } - c.printResults() } @@ -176,7 +173,6 @@ func (c *MaprClient) printResults() { } else { result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit) } - if err != nil { dlog.Client.FatalPanic(err) } @@ -202,8 +198,8 @@ func (c *MaprClient) printResults() { dlog.Client.Raw(rawQuery) if rowsLimit > 0 && numRows > rowsLimit { - dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output to %d rows! Use 'limit' clause to override!", - numRows, rowsLimit)) + dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output "+ + "to %d rows! Use 'limit' clause to override!", numRows, rowsLimit)) } dlog.Client.Raw(result) } @@ -215,7 +211,6 @@ func (c *MaprClient) writeResultsToOutfile() { } return } - if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil { dlog.Client.FatalPanic(err) } diff --git a/internal/clients/stats.go b/internal/clients/stats.go index fbef572..1315aea 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -36,9 +36,10 @@ func newTailStats(servers int) *stats { // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) { - var connectedLast int +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, + statsCh <-chan string, quiet bool) { + var connectedLast int for { var force bool var messages []string @@ -94,7 +95,9 @@ func (s *stats) printStatsDueInterrupt(messages []string) { dlog.Client.Resume() } -func (s *stats) statsData(connected, newConnections int, throttle int) map[string]interface{} { +func (s *stats) statsData(connected, newConnections int, + throttle int) map[string]interface{} { + percConnected := percentOf(float64(s.servers), float64(connected)) data := make(map[string]interface{}) @@ -112,7 +115,6 @@ func (s *stats) statsData(connected, newConnections int, throttle int) map[strin func (s *stats) statsLine(connected, newConnections int, throttle int) string { sb := strings.Builder{} - i := 0 for k, v := range s.statsData(connected, newConnections, throttle) { if i > 0 { @@ -123,7 +125,6 @@ func (s *stats) statsLine(connected, newConnections int, throttle int) string { sb.WriteString(fmt.Sprintf("%v", v)) i++ } - return sb.String() } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index d42a0e4..35c01d4 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -19,7 +19,6 @@ type TailClient struct { // NewTailClient returns a new TailClient. func NewTailClient(args config.Args) (*TailClient, error) { args.Mode = omode.TailClient - c := TailClient{ baseClient: baseClient{ Args: args, @@ -30,7 +29,6 @@ func NewTailClient(args config.Args) (*TailClient, error) { c.init() c.makeConnections(c) - return &c, nil } @@ -48,6 +46,5 @@ func (c TailClient) makeCommands() (commands []string) { c.Mode.String(), c.Args.SerializeOptions(), file, regex)) } dlog.Client.Debug(commands) - return } |
