summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/args.go1
-rw-r--r--internal/clients/baseclient.go6
-rw-r--r--internal/clients/catclient.go3
-rw-r--r--internal/clients/grepclient.go4
-rw-r--r--internal/clients/handlers/basehandler.go18
-rw-r--r--internal/clients/handlers/healthhandler.go1
-rw-r--r--internal/clients/maker.go3
-rw-r--r--internal/clients/maprclient.go9
-rw-r--r--internal/clients/runclient.go87
-rw-r--r--internal/clients/stats.go11
-rw-r--r--internal/clients/tailclient.go4
11 files changed, 28 insertions, 119 deletions
diff --git a/internal/clients/args.go b/internal/clients/args.go
index 34fcfa2..7f782f1 100644
--- a/internal/clients/args.go
+++ b/internal/clients/args.go
@@ -22,4 +22,5 @@ type Args struct {
SSHAuthMethods []gossh.AuthMethod
SSHHostKeyCallback gossh.HostKeyCallback
PrivateKeyPathFile string
+ Quiet bool
}
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 69055a3..f20156f 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -39,7 +39,7 @@ type baseClient struct {
}
func (c *baseClient) init() {
- logger.Info("Initiating base client")
+ logger.Debug("Initiating base client")
flag := regex.Default
if c.Args.RegexInvert {
@@ -70,7 +70,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
// Periodically check for unknown hosts, and ask the user whether to trust them or not.
go c.hostKeyCallback.PromptAddHosts(ctx)
// Print client stats every time something on statsCh is recieved.
- go c.stats.Start(ctx, c.throttleCh, statsCh)
+ go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet)
// Keep count of active connections
active := make(chan struct{}, len(c.connections))
@@ -127,7 +127,7 @@ func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMe
}
func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
- defer logger.Info("Terminated connection")
+ defer logger.Debug("Terminated connection")
// We want to have at least one active connection
<-active
diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go
index d8e9196..b7b6131 100644
--- a/internal/clients/catclient.go
+++ b/internal/clients/catclient.go
@@ -42,8 +42,9 @@ func (c CatClient) makeHandler(server string) handlers.Handler {
}
func (c CatClient) makeCommands() (commands []string) {
+ options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
}
return
}
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go
index 4024083..652c31b 100644
--- a/internal/clients/grepclient.go
+++ b/internal/clients/grepclient.go
@@ -41,8 +41,10 @@ func (c GrepClient) makeHandler(server string) handlers.Handler {
}
func (c GrepClient) makeCommands() (commands []string) {
+ options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
}
+
return
}
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index b5045e2..f07fd90 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -4,7 +4,6 @@ import (
"encoding/base64"
"fmt"
"io"
- "strconv"
"strings"
"time"
@@ -78,6 +77,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
case <-h.Done():
return 0, io.EOF
}
+
return
}
@@ -111,21 +111,5 @@ func (h *baseHandler) handleHiddenMessage(message string) {
case <-h.Done():
return
}
-
- case strings.HasPrefix(message, ".run exitstatus"):
- splitted := strings.Split(strings.TrimSuffix(message, "\n"), " ")
- if len(splitted) != 3 {
- logger.Error("Unable to retrieve exitstatus", message)
- return
- }
- i, err := strconv.Atoi(splitted[2])
- if err != nil {
- logger.Error("Unable to retrieve exitstatus", message, err)
- return
- }
- logger.Debug("Retrieved exitstatus", h.status)
- if i > h.status {
- h.status = i
- }
}
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 08ed137..0440706 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -19,6 +19,7 @@ type HealthHandler struct {
receive chan<- string
// The remote server address
server string
+ // The return status.
status int
}
diff --git a/internal/clients/maker.go b/internal/clients/maker.go
index 1ba6482..d5ffd8b 100644
--- a/internal/clients/maker.go
+++ b/internal/clients/maker.go
@@ -4,6 +4,9 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
)
+// maker interface helps to re-use code in all DTail client implementations.
+// All clients share the baseClient but have different connection handlers
+// and send different commands to the DTail server.
type maker interface {
makeHandler(server string) handlers.Handler
makeCommands() (commands []string)
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 6aadd6b..1c0c2cc 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -99,7 +99,7 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
status = c.baseClient.Start(ctx, statsCh)
if c.cumulative {
- logger.Info("Received final mapreduce result")
+ logger.Debug("Received final mapreduce result")
c.reportResults()
}
@@ -112,6 +112,7 @@ func (c MaprClient) makeHandler(server string) handlers.Handler {
func (c MaprClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
+ options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
modeStr := "cat"
if c.Mode == omode.TailClient {
@@ -123,7 +124,7 @@ func (c MaprClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize()))
continue
}
- commands = append(commands, fmt.Sprintf("%s %s %s", modeStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize()))
}
return
@@ -133,7 +134,7 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) {
for {
select {
case <-time.After(c.query.Interval):
- logger.Info("Gathering interim mapreduce result")
+ logger.Debug("Gathering interim mapreduce result")
c.reportResults()
case <-ctx.Done():
return
@@ -165,7 +166,7 @@ func (c *MaprClient) printResults() {
}
if numLines == 0 {
- logger.Info("Empty result set this time...")
+ logger.Warn("Empty result set this time...")
return
}
diff --git a/internal/clients/runclient.go b/internal/clients/runclient.go
deleted file mode 100644
index 5464d54..0000000
--- a/internal/clients/runclient.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package clients
-
-import (
- "crypto/sha256"
- "encoding/base64"
- "encoding/hex"
- "fmt"
- "runtime"
- "strings"
-
- "github.com/mimecast/dtail/internal/clients/handlers"
- "github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/omode"
-)
-
-// RunClient is a client to run various commands on the server.
-type RunClient struct {
- baseClient
- jobName string
- background string
-}
-
-// NewRunClient returns a new run client to execute commands on the remote server.
-func NewRunClient(args Args, background, jobName string) (*RunClient, error) {
- args.Mode = omode.RunClient
-
- if jobName == "" {
- jobName = hash(strings.Join(args.Arguments, " "))
- }
-
- c := RunClient{
- baseClient: baseClient{
- Args: args,
- throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
- retry: false,
- },
- jobName: jobName,
- background: background,
- }
-
- c.init()
- c.makeConnections(c)
-
- return &c, nil
-}
-
-func (c RunClient) makeHandler(server string) handlers.Handler {
- return handlers.NewClientHandler(server)
-}
-
-func (c RunClient) makeCommands() (commands []string) {
- if c.Timeout > 0 {
- commands = append(commands, fmt.Sprintf("timeout %d run%s %s", c.Timeout, c.options(), c.What))
- return
- }
-
- commands = append(commands, fmt.Sprintf("run%s %s", c.options(), c.What))
- return
-}
-
-func (c RunClient) options() string {
- var sb strings.Builder
-
- logger.Debug("options", fmt.Sprintf(":background=%s", c.background))
- sb.WriteString(fmt.Sprintf(":background=%s", c.background))
-
- logger.Debug("options", fmt.Sprintf(":jobName=%s", c.jobName))
- sb.WriteString(fmt.Sprintf(":jobName=%s", c.jobName))
-
- if len(c.Arguments) > 0 {
- logger.Debug("options", fmt.Sprintf(":outerArgs=base64%%%s", strings.Join(c.Arguments, " ")))
- sb.WriteString(fmt.Sprintf(":outerArgs=base64%%%s", encode64(strings.Join(c.Arguments, " "))))
- }
-
- return sb.String()
-}
-
-func encode64(str string) string {
- return base64.StdEncoding.EncodeToString([]byte(str))
-}
-
-func hash(str string) string {
- h := sha256.New()
- h.Write([]byte(str))
-
- return hex.EncodeToString(h.Sum(nil))
-}
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index 17343b5..d8163d4 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -34,7 +34,7 @@ func newTailStats(connectionsTotal int) *stats {
// Start starts printing client connection stats every time a signal is recieved or
// connection count has changed.
-func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string) {
+func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) {
var connectedLast int
for {
@@ -45,7 +45,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
case message := <-statsCh:
messages = append(messages, message)
force = true
- case <-time.After(time.Second * 10):
+ case <-time.After(time.Second * 3):
case <-ctx.Done():
return
}
@@ -55,7 +55,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
newConnections := connected - connectedLast
- if connected == connectedLast && !force {
+ if (connected == connectedLast || quiet) && !force {
continue
}
@@ -63,7 +63,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
switch force {
case true:
messages = append(messages, fmt.Sprintf("Connection stats: %s", stats))
- s.printStatsOnInterrupt(messages)
+ s.printStatsDueInterrupt(messages)
default:
logger.Info(stats)
}
@@ -75,7 +75,7 @@ func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <
}
}
-func (s *stats) printStatsOnInterrupt(messages []string) {
+func (s *stats) printStatsDueInterrupt(messages []string) {
logger.Pause()
for _, message := range messages {
fmt.Println(fmt.Sprintf(" %s", message))
@@ -107,5 +107,6 @@ func percentOf(total float64, value float64) float64 {
if total == 0 || total == value {
return 100
}
+
return value / (total / 100.0)
}
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index 53b5ba4..cefbaa7 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -29,6 +29,7 @@ func NewTailClient(args Args) (*TailClient, error) {
c.init()
c.makeConnections(c)
+
return &c, nil
}
@@ -37,8 +38,9 @@ func (c TailClient) makeHandler(server string) handlers.Handler {
}
func (c TailClient) makeCommands() (commands []string) {
+ options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
}
logger.Debug(commands)