summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-09 21:10:29 +0300
committerPaul Buetow <paul@buetow.org>2021-10-10 13:36:41 +0300
commit97747ea0f3178f7f5890512d483fdccaa82846b0 (patch)
tree9ff1335ca26afc90e55fd6de416457e252d75a35 /internal/clients
parent7a7169791a64190e1002e38bc9c04ad0d5c1ce1f (diff)
vetting and linting and some code restyling
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/baseclient.go7
-rw-r--r--internal/clients/catclient.go2
-rw-r--r--internal/clients/connectors/serverconnection.go48
-rw-r--r--internal/clients/connectors/serverless.go16
-rw-r--r--internal/clients/grepclient.go5
-rw-r--r--internal/clients/handlers/healthhandler.go3
-rw-r--r--internal/clients/handlers/maprhandler.go13
-rw-r--r--internal/clients/healthclient.go17
-rw-r--r--internal/clients/maprclient.go13
-rw-r--r--internal/clients/stats.go11
-rw-r--r--internal/clients/tailclient.go3
11 files changed, 77 insertions, 61 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index d5d7c2c..4a7bd84 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -64,7 +64,8 @@ func (c *baseClient) makeConnections(maker maker) {
discoveryService := discovery.New(c.Discovery, c.ServersStr, discovery.Shuffle)
for _, server := range discoveryService.ServerList() {
- c.connections = append(c.connections, c.makeConnection(server, c.sshAuthMethods, c.hostKeyCallback))
+ c.connections = append(c.connections, c.makeConnection(server,
+ c.sshAuthMethods, c.hostKeyCallback))
}
c.stats = newTailStats(len(c.connections))
@@ -100,7 +101,9 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
return
}
-func (c *baseClient) startConnection(ctx context.Context, i int, conn connectors.Connector) (status int) {
+func (c *baseClient) startConnection(ctx context.Context, i int,
+ conn connectors.Connector) (status int) {
+
for {
connCtx, cancel := context.WithCancel(ctx)
defer cancel()
diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go
index 2726e7e..bd65560 100644
--- a/internal/clients/catclient.go
+++ b/internal/clients/catclient.go
@@ -22,7 +22,6 @@ func NewCatClient(args config.Args) (*CatClient, error) {
if args.RegexStr != "" {
return nil, errors.New("Can't use regex with 'cat' operating mode")
}
-
args.Mode = omode.CatClient
c := CatClient{
@@ -35,7 +34,6 @@ func NewCatClient(args config.Args) (*CatClient, error) {
c.init()
c.makeConnections(c)
-
return &c, nil
}
diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go
index 1666a79..2d7b45a 100644
--- a/internal/clients/connectors/serverconnection.go
+++ b/internal/clients/connectors/serverconnection.go
@@ -16,7 +16,8 @@ import (
"golang.org/x/crypto/ssh"
)
-// ServerConnection represents a connection to a single remote dtail server via SSH protocol.
+// ServerConnection represents a connection to a single remote dtail server via
+// SSH protocol.
type ServerConnection struct {
server string
port int
@@ -28,9 +29,11 @@ type ServerConnection struct {
}
// NewServerConnection returns a new DTail SSH server connection.
-func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection {
- dlog.Client.Debug(server, "Creating new connection", server, handler, commands)
+func NewServerConnection(server string, userName string,
+ authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback,
+ handler handlers.Handler, commands []string) *ServerConnection {
+ dlog.Client.Debug(server, "Creating new connection", server, handler, commands)
c := ServerConnection{
hostKeyCallback: hostKeyCallback,
server: server,
@@ -48,10 +51,12 @@ func NewServerConnection(server string, userName string, authMethods []ssh.AuthM
return &c
}
+// Server returns the server hostname connected to.
func (c *ServerConnection) Server() string {
return c.server
}
+// Handler returns the handler used for the connection.
func (c *ServerConnection) Handler() handlers.Handler {
return c.handler
}
@@ -72,23 +77,29 @@ func (c *ServerConnection) initServerPort() {
}
}
-func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) {
+// Start the connection to the server.
+func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc,
+ throttleCh, statsCh chan struct{}) {
+
// Throttle how many connections can be established concurrently (based on ch length)
dlog.Client.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh))
select {
case throttleCh <- struct{}{}:
case <-ctx.Done():
- dlog.Client.Debug(c.server, "Not establishing connection as context is done", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Not establishing connection as context is done",
+ len(throttleCh), cap(throttleCh))
return
}
- dlog.Client.Debug(c.server, "Throttling says that the connection can be established", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Throttling says that the connection can be established",
+ len(throttleCh), cap(throttleCh))
go func() {
defer func() {
if !c.throttlingDone {
- dlog.Client.Debug(c.server, "Unthrottling connection (1)", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Unthrottling connection (1)",
+ len(throttleCh), cap(throttleCh))
c.throttlingDone = true
<-throttleCh
}
@@ -107,7 +118,9 @@ func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc,
}
// Dail into a new SSH connection. Close connection in case of an error.
-func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) error {
+func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc,
+ throttleCh, statsCh chan struct{}) error {
+
dlog.Client.Debug(c.server, "Incrementing connection stats")
statsCh <- struct{}{}
defer func() {
@@ -128,31 +141,30 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc,
}
// Create the SSH session. Close the session in case of an error.
-func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error {
- dlog.Client.Debug(c.server, "Creating SSH session")
+func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc,
+ client *ssh.Client, throttleCh chan struct{}) error {
+ dlog.Client.Debug(c.server, "Creating SSH session")
session, err := client.NewSession()
if err != nil {
return err
}
defer session.Close()
-
return c.handle(ctx, cancel, session, throttleCh)
}
-func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error {
- dlog.Client.Debug(c.server, "Creating handler for SSH session")
+func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc,
+ session *ssh.Session, throttleCh chan struct{}) error {
+ dlog.Client.Debug(c.server, "Creating handler for SSH session")
stdinPipe, err := session.StdinPipe()
if err != nil {
return err
}
-
stdoutPipe, err := session.StdoutPipe()
if err != nil {
return err
}
-
if err := session.Shell(); err != nil {
return err
}
@@ -161,12 +173,10 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc
io.Copy(stdinPipe, c.handler)
cancel()
}()
-
go func() {
io.Copy(c.handler, stdoutPipe)
cancel()
}()
-
go func() {
select {
case <-c.handler.Done():
@@ -182,13 +192,13 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc
}
if !c.throttlingDone {
- dlog.Client.Debug(c.server, "Unthrottling connection (2)", len(throttleCh), cap(throttleCh))
+ dlog.Client.Debug(c.server, "Unthrottling connection (2)",
+ len(throttleCh), cap(throttleCh))
c.throttlingDone = true
<-throttleCh
}
<-ctx.Done()
c.handler.Shutdown()
-
return nil
}
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index 768a5ce..2ff490a 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -18,8 +18,10 @@ type Serverless struct {
userName string
}
-// NewServerConnection returns a new connection.
-func NewServerless(userName string, handler handlers.Handler, commands []string) *Serverless {
+// NewServerless starts a new serverless session.
+func NewServerless(userName string, handler handlers.Handler,
+ commands []string) *Serverless {
+
dlog.Client.Debug("Creating new serverless connector", handler, commands)
return &Serverless{
userName: userName,
@@ -28,15 +30,20 @@ func NewServerless(userName string, handler handlers.Handler, commands []string)
}
}
+// Server returns serverless server indicator.
func (s *Serverless) Server() string {
return "local(serverless)"
}
+// Handler returns the handler used for the serverless connection.
func (s *Serverless) Handler() handlers.Handler {
return s.handler
}
-func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) {
+// Start the serverless connection.
+func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc,
+ throttleCh, statsCh chan struct{}) {
+
dlog.Client.Debug("Starting serverless connector")
go func() {
defer cancel()
@@ -81,13 +88,11 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done")
terminate()
}()
-
go func() {
io.Copy(s.handler, serverHandler)
dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done")
terminate()
}()
-
go func() {
select {
case <-s.handler.Done():
@@ -107,6 +112,5 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
<-ctx.Done()
dlog.Client.Trace("s.handler.Shutdown()")
s.handler.Shutdown()
-
return nil
}
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go
index ae21ff2..7521c67 100644
--- a/internal/clients/grepclient.go
+++ b/internal/clients/grepclient.go
@@ -12,7 +12,8 @@ import (
"github.com/mimecast/dtail/internal/omode"
)
-// GrepClient searches a remote file for all lines matching a regular expression. Only the matching lines are displayed.
+// GrepClient searches a remote file for all lines matching a regular
+// expression. Only the matching lines are displayed.
type GrepClient struct {
baseClient
}
@@ -34,7 +35,6 @@ func NewGrepClient(args config.Args) (*GrepClient, error) {
c.init()
c.makeConnections(c)
-
return &c, nil
}
@@ -51,6 +51,5 @@ func (c GrepClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("%s:%s %s %s",
c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
-
return
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 10ba1f7..47b594e 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -8,7 +8,8 @@ import (
"github.com/mimecast/dtail/internal/protocol"
)
-// HealthHandler is the handler used on the client side for running mapreduce aggregations.
+// HealthHandler is the handler used on the client side for running mapreduce
+// aggregations.
type HealthHandler struct {
baseHandler
}
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index d1acfbd..8718b35 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -10,7 +10,8 @@ import (
"github.com/mimecast/dtail/internal/protocol"
)
-// MaprHandler is the handler used on the client side for running mapreduce aggregations.
+// MaprHandler is the handler used on the client side for running mapreduce
+// aggregations.
type MaprHandler struct {
baseHandler
aggregate *client.Aggregate
@@ -18,7 +19,9 @@ type MaprHandler struct {
}
// NewMaprHandler returns a new mapreduce client handler.
-func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler {
+func NewMaprHandler(server string, query *mapr.Query,
+ globalGroup *mapr.GlobalGroupSet) *MaprHandler {
+
return &MaprHandler{
baseHandler: baseHandler{
server: server,
@@ -55,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
return len(p), nil
}
-// Handle a message received from server including mapr aggregation
-// related data.
+// Handle a message received from server including mapr aggregation related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
parts := strings.SplitN(message, protocol.FieldDelimiter, 3)
if len(parts) != 3 {
- dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts")
+ dlog.Client.Error("Unable to aggregate data", h.server, message, parts,
+ len(parts), "expected 3 parts")
return
}
if err := h.aggregate.Aggregate(parts[2]); err != nil {
diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go
index ac1dc20..1a02827 100644
--- a/internal/clients/healthclient.go
+++ b/internal/clients/healthclient.go
@@ -32,7 +32,6 @@ func NewHealthClient(args config.Args) (*HealthClient, error) {
c.init()
c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser))
c.makeConnections(c)
-
return &c, nil
}
@@ -45,28 +44,34 @@ func (c HealthClient) makeCommands() (commands []string) {
return
}
+// Start the health client.
func (c *HealthClient) Start(ctx context.Context, statsCh <-chan string) int {
status := c.baseClient.Start(ctx, statsCh)
switch status {
case 0:
if c.Serverless {
- fmt.Printf("WARNING: All seems fine but the check only run in serverless mode, please specify a remote server via --server hostname:port\n")
+ fmt.Printf("WARNING: All seems fine but the check only run in serverless mode" +
+ ", please specify a remote server via --server hostname:port\n")
return 1
}
fmt.Printf("OK: All fine at %s :-)\n", c.ServersStr)
case 2:
if c.Serverless {
- fmt.Printf("CRITICAL: DTail server not operating properly (using serverless connction)!\n")
+ fmt.Printf("CRITICAL: DTail server not operating properly (using " +
+ "serverless connction)!\n")
return 2
}
- fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n", c.ServersStr)
+ fmt.Printf("CRITICAL: DTail server not operating properly at %s!\n",
+ c.ServersStr)
default:
if c.Serverless {
- fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless connection)\n", status)
+ fmt.Printf("UNKNOWN: Received unknown status code %d (using serverless "+
+ "connection)\n", status)
return status
}
- fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n", status, c.ServersStr)
+ fmt.Printf("UNKNOWN: Received unknown status code %d from %s!\n",
+ status, c.ServersStr)
}
return status
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 412a219..04f258d 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -107,15 +107,14 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
return
}
-// NEXT: Make this a callback function rather trying to use polymorphism to call this.
-// This applies to all clients.
+// NEXT: Make this a callback function rather trying to use polymorphism to call
+// this. This applies to all clients.
func (c MaprClient) makeHandler(server string) handlers.Handler {
return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}
func (c MaprClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
-
modeStr := "cat"
if c.Mode == omode.TailClient {
modeStr = "tail"
@@ -134,7 +133,6 @@ func (c MaprClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("%s:%s %s %s",
modeStr, c.Args.SerializeOptions(), file, regex))
}
-
return
}
@@ -155,7 +153,6 @@ func (c *MaprClient) reportResults() {
c.writeResultsToOutfile()
return
}
-
c.printResults()
}
@@ -176,7 +173,6 @@ func (c *MaprClient) printResults() {
} else {
result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit)
}
-
if err != nil {
dlog.Client.FatalPanic(err)
}
@@ -202,8 +198,8 @@ func (c *MaprClient) printResults() {
dlog.Client.Raw(rawQuery)
if rowsLimit > 0 && numRows > rowsLimit {
- dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output to %d rows! Use 'limit' clause to override!",
- numRows, rowsLimit))
+ dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output "+
+ "to %d rows! Use 'limit' clause to override!", numRows, rowsLimit))
}
dlog.Client.Raw(result)
}
@@ -215,7 +211,6 @@ func (c *MaprClient) writeResultsToOutfile() {
}
return
}
-
if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil {
dlog.Client.FatalPanic(err)
}
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index fbef572..1315aea 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -36,9 +36,10 @@ func newTailStats(servers int) *stats {
// Start starts printing client connection stats every time a signal is recieved or
// connection count has changed.
-func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) {
- var connectedLast int
+func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{},
+ statsCh <-chan string, quiet bool) {
+ var connectedLast int
for {
var force bool
var messages []string
@@ -94,7 +95,9 @@ func (s *stats) printStatsDueInterrupt(messages []string) {
dlog.Client.Resume()
}
-func (s *stats) statsData(connected, newConnections int, throttle int) map[string]interface{} {
+func (s *stats) statsData(connected, newConnections int,
+ throttle int) map[string]interface{} {
+
percConnected := percentOf(float64(s.servers), float64(connected))
data := make(map[string]interface{})
@@ -112,7 +115,6 @@ func (s *stats) statsData(connected, newConnections int, throttle int) map[strin
func (s *stats) statsLine(connected, newConnections int, throttle int) string {
sb := strings.Builder{}
-
i := 0
for k, v := range s.statsData(connected, newConnections, throttle) {
if i > 0 {
@@ -123,7 +125,6 @@ func (s *stats) statsLine(connected, newConnections int, throttle int) string {
sb.WriteString(fmt.Sprintf("%v", v))
i++
}
-
return sb.String()
}
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index d42a0e4..35c01d4 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -19,7 +19,6 @@ type TailClient struct {
// NewTailClient returns a new TailClient.
func NewTailClient(args config.Args) (*TailClient, error) {
args.Mode = omode.TailClient
-
c := TailClient{
baseClient: baseClient{
Args: args,
@@ -30,7 +29,6 @@ func NewTailClient(args config.Args) (*TailClient, error) {
c.init()
c.makeConnections(c)
-
return &c, nil
}
@@ -48,6 +46,5 @@ func (c TailClient) makeCommands() (commands []string) {
c.Mode.String(), c.Args.SerializeOptions(), file, regex))
}
dlog.Client.Debug(commands)
-
return
}