diff options
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/args.go | 1 | ||||
| -rw-r--r-- | internal/clients/baseclient.go | 6 | ||||
| -rw-r--r-- | internal/clients/catclient.go | 3 | ||||
| -rw-r--r-- | internal/clients/grepclient.go | 4 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 18 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 1 | ||||
| -rw-r--r-- | internal/clients/maker.go | 3 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 9 | ||||
| -rw-r--r-- | internal/clients/runclient.go | 87 | ||||
| -rw-r--r-- | internal/clients/stats.go | 11 | ||||
| -rw-r--r-- | internal/clients/tailclient.go | 4 |
11 files changed, 28 insertions, 119 deletions
diff --git a/internal/clients/args.go b/internal/clients/args.go index 34fcfa2..7f782f1 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -22,4 +22,5 @@ type Args struct { SSHAuthMethods []gossh.AuthMethod SSHHostKeyCallback gossh.HostKeyCallback PrivateKeyPathFile string + Quiet bool } diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 69055a3..f20156f 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -39,7 +39,7 @@ type baseClient struct { } func (c *baseClient) init() { - logger.Info("Initiating base client") + logger.Debug("Initiating base client") flag := regex.Default if c.Args.RegexInvert { @@ -70,7 +70,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i // Periodically check for unknown hosts, and ask the user whether to trust them or not. go c.hostKeyCallback.PromptAddHosts(ctx) // Print client stats every time something on statsCh is recieved. - go c.stats.Start(ctx, c.throttleCh, statsCh) + go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) // Keep count of active connections active := make(chan struct{}, len(c.connections)) @@ -127,7 +127,7 @@ func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMe } func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { - defer logger.Info("Terminated connection") + defer logger.Debug("Terminated connection") // We want to have at least one active connection <-active diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index d8e9196..b7b6131 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -42,8 +42,9 @@ func (c CatClient) makeHandler(server string) handlers.Handler { } func (c CatClient) makeCommands() (commands []string) { + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } return } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 4024083..652c31b 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -41,8 +41,10 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { } func (c GrepClient) makeCommands() (commands []string) { + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } + return } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index b5045e2..f07fd90 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -4,7 +4,6 @@ import ( "encoding/base64" "fmt" "io" - "strconv" "strings" "time" @@ -78,6 +77,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.Done(): return 0, io.EOF } + return } @@ -111,21 +111,5 @@ func (h *baseHandler) handleHiddenMessage(message string) { case <-h.Done(): return } - - case strings.HasPrefix(message, ".run exitstatus"): - splitted := strings.Split(strings.TrimSuffix(message, "\n"), " ") - if len(splitted) != 3 { - logger.Error("Unable to retrieve exitstatus", message) - return - } - i, err := strconv.Atoi(splitted[2]) - if err != nil { - logger.Error("Unable to retrieve exitstatus", message, err) - return - } - logger.Debug("Retrieved exitstatus", h.status) - if i > h.status { - h.status = i - } } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 08ed137..0440706 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -19,6 +19,7 @@ type HealthHandler struct { receive chan<- string // The remote server address server string + // The return status. status int } diff --git a/internal/clients/maker.go b/internal/clients/maker.go index 1ba6482..d5ffd8b 100644 --- a/internal/clients/maker.go +++ b/internal/clients/maker.go @@ -4,6 +4,9 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" ) +// maker interface helps to re-use code in all DTail client implementations. +// All clients share the baseClient but have different connection handlers +// and send different commands to the DTail server. type maker interface { makeHandler(server string) handlers.Handler makeCommands() (commands []string) diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 6aadd6b..1c0c2cc 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -99,7 +99,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i status = c.baseClient.Start(ctx, statsCh) if c.cumulative { - logger.Info("Received final mapreduce result") + logger.Debug("Received final mapreduce result") c.reportResults() } @@ -112,6 +112,7 @@ func (c MaprClient) makeHandler(server string) handlers.Handler { func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) modeStr := "cat" if c.Mode == omode.TailClient { @@ -123,7 +124,7 @@ func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) continue } - commands = append(commands, fmt.Sprintf("%s %s %s", modeStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize())) } return @@ -133,7 +134,7 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) { for { select { case <-time.After(c.query.Interval): - logger.Info("Gathering interim mapreduce result") + logger.Debug("Gathering interim mapreduce result") c.reportResults() case <-ctx.Done(): return @@ -165,7 +166,7 @@ func (c *MaprClient) printResults() { } if numLines == 0 { - logger.Info("Empty result set this time...") + logger.Warn("Empty result set this time...") return } diff --git a/internal/clients/runclient.go b/internal/clients/runclient.go deleted file mode 100644 index 5464d54..0000000 --- a/internal/clients/runclient.go +++ /dev/null @@ -1,87 +0,0 @@ -package clients - -import ( - "crypto/sha256" - "encoding/base64" - "encoding/hex" - "fmt" - "runtime" - "strings" - - "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/omode" -) - -// RunClient is a client to run various commands on the server. -type RunClient struct { - baseClient - jobName string - background string -} - -// NewRunClient returns a new run client to execute commands on the remote server. -func NewRunClient(args Args, background, jobName string) (*RunClient, error) { - args.Mode = omode.RunClient - - if jobName == "" { - jobName = hash(strings.Join(args.Arguments, " ")) - } - - c := RunClient{ - baseClient: baseClient{ - Args: args, - throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), - retry: false, - }, - jobName: jobName, - background: background, - } - - c.init() - c.makeConnections(c) - - return &c, nil -} - -func (c RunClient) makeHandler(server string) handlers.Handler { - return handlers.NewClientHandler(server) -} - -func (c RunClient) makeCommands() (commands []string) { - if c.Timeout > 0 { - commands = append(commands, fmt.Sprintf("timeout %d run%s %s", c.Timeout, c.options(), c.What)) - return - } - - commands = append(commands, fmt.Sprintf("run%s %s", c.options(), c.What)) - return -} - -func (c RunClient) options() string { - var sb strings.Builder - - logger.Debug("options", fmt.Sprintf(":background=%s", c.background)) - sb.WriteString(fmt.Sprintf(":background=%s", c.background)) - - logger.Debug("options", fmt.Sprintf(":jobName=%s", c.jobName)) - sb.WriteString(fmt.Sprintf(":jobName=%s", c.jobName)) - - if len(c.Arguments) > 0 { - logger.Debug("options", fmt.Sprintf(":outerArgs=base64%%%s", strings.Join(c.Arguments, " "))) - sb.WriteString(fmt.Sprintf(":outerArgs=base64%%%s", encode64(strings.Join(c.Arguments, " ")))) - } - - return sb.String() -} - -func encode64(str string) string { - return base64.StdEncoding.EncodeToString([]byte(str)) -} - -func hash(str string) string { - h := sha256.New() - h.Write([]byte(str)) - - return hex.EncodeToString(h.Sum(nil)) -} diff --git a/internal/clients/stats.go b/internal/clients/stats.go index 17343b5..d8163d4 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -34,7 +34,7 @@ func newTailStats(connectionsTotal int) *stats { // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string) { +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) { var connectedLast int for { @@ -45,7 +45,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < case message := <-statsCh: messages = append(messages, message) force = true - case <-time.After(time.Second * 10): + case <-time.After(time.Second * 3): case <-ctx.Done(): return } @@ -55,7 +55,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < newConnections := connected - connectedLast - if connected == connectedLast && !force { + if (connected == connectedLast || quiet) && !force { continue } @@ -63,7 +63,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < switch force { case true: messages = append(messages, fmt.Sprintf("Connection stats: %s", stats)) - s.printStatsOnInterrupt(messages) + s.printStatsDueInterrupt(messages) default: logger.Info(stats) } @@ -75,7 +75,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh < } } -func (s *stats) printStatsOnInterrupt(messages []string) { +func (s *stats) printStatsDueInterrupt(messages []string) { logger.Pause() for _, message := range messages { fmt.Println(fmt.Sprintf(" %s", message)) @@ -107,5 +107,6 @@ func percentOf(total float64, value float64) float64 { if total == 0 || total == value { return 100 } + return value / (total / 100.0) } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index 53b5ba4..cefbaa7 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -29,6 +29,7 @@ func NewTailClient(args Args) (*TailClient, error) { c.init() c.makeConnections(c) + return &c, nil } @@ -37,8 +38,9 @@ func (c TailClient) makeHandler(server string) handlers.Handler { } func (c TailClient) makeCommands() (commands []string) { + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } logger.Debug(commands) |
