summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <git@mx.buetow.org>2020-12-28 09:49:10 +0000
committerPaul Buetow <git@mx.buetow.org>2020-12-28 09:49:10 +0000
commita5a91623ee60f33dafc16e1752f3fb1f6798ee76 (patch)
treec6433ef4a3415cc7206b5fbe733c0539d0e5a60f /internal/clients
parentae8ffc84331ca72f0d33fff69edd85d6e03d29ae (diff)
parent495e9f38220a6d448b15882a235e7a9c21f21d18 (diff)
merge
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/args.go1
-rw-r--r--internal/clients/baseclient.go8
-rw-r--r--internal/clients/catclient.go3
-rw-r--r--internal/clients/client.go2
-rw-r--r--internal/clients/grepclient.go4
-rw-r--r--internal/clients/handlers/basehandler.go18
-rw-r--r--internal/clients/handlers/healthhandler.go3
-rw-r--r--internal/clients/maker.go3
-rw-r--r--internal/clients/maprclient.go11
-rw-r--r--internal/clients/stats.go30
-rw-r--r--internal/clients/tailclient.go4
11 files changed, 52 insertions, 35 deletions
diff --git a/internal/clients/args.go b/internal/clients/args.go
index 34fcfa2..7f782f1 100644
--- a/internal/clients/args.go
+++ b/internal/clients/args.go
@@ -22,4 +22,5 @@ type Args struct {
SSHAuthMethods []gossh.AuthMethod
SSHHostKeyCallback gossh.HostKeyCallback
PrivateKeyPathFile string
+ Quiet bool
}
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index d8d4fde..f20156f 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -39,7 +39,7 @@ type baseClient struct {
}
func (c *baseClient) init() {
- logger.Info("Initiating base client")
+ logger.Debug("Initiating base client")
flag := regex.Default
if c.Args.RegexInvert {
@@ -66,11 +66,11 @@ func (c *baseClient) makeConnections(maker maker) {
c.stats = newTailStats(len(c.connections))
}
-func (c *baseClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) {
+func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
// Periodically check for unknown hosts, and ask the user whether to trust them or not.
go c.hostKeyCallback.PromptAddHosts(ctx)
// Print client stats every time something on statsCh is recieved.
- go c.stats.Start(ctx, c.throttleCh, statsCh)
+ go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet)
// Keep count of active connections
active := make(chan struct{}, len(c.connections))
@@ -127,7 +127,7 @@ func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMe
}
func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
- defer logger.Info("Terminated connection")
+ defer logger.Debug("Terminated connection")
// We want to have at least one active connection
<-active
diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go
index d8e9196..b7b6131 100644
--- a/internal/clients/catclient.go
+++ b/internal/clients/catclient.go
@@ -42,8 +42,9 @@ func (c CatClient) makeHandler(server string) handlers.Handler {
}
func (c CatClient) makeCommands() (commands []string) {
+ options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
}
return
}
diff --git a/internal/clients/client.go b/internal/clients/client.go
index eb8452d..4a547e8 100644
--- a/internal/clients/client.go
+++ b/internal/clients/client.go
@@ -4,5 +4,5 @@ import "context"
// Client is the interface for the end user command line client.
type Client interface {
- Start(ctx context.Context, statsCh <-chan struct{}) int
+ Start(ctx context.Context, statsCh <-chan string) int
}
diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go
index 4024083..652c31b 100644
--- a/internal/clients/grepclient.go
+++ b/internal/clients/grepclient.go
@@ -41,8 +41,10 @@ func (c GrepClient) makeHandler(server string) handlers.Handler {
}
func (c GrepClient) makeCommands() (commands []string) {
+ options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
}
+
return
}
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index b5045e2..f07fd90 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -4,7 +4,6 @@ import (
"encoding/base64"
"fmt"
"io"
- "strconv"
"strings"
"time"
@@ -78,6 +77,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
case <-h.Done():
return 0, io.EOF
}
+
return
}
@@ -111,21 +111,5 @@ func (h *baseHandler) handleHiddenMessage(message string) {
case <-h.Done():
return
}
-
- case strings.HasPrefix(message, ".run exitstatus"):
- splitted := strings.Split(strings.TrimSuffix(message, "\n"), " ")
- if len(splitted) != 3 {
- logger.Error("Unable to retrieve exitstatus", message)
- return
- }
- i, err := strconv.Atoi(splitted[2])
- if err != nil {
- logger.Error("Unable to retrieve exitstatus", message, err)
- return
- }
- logger.Debug("Retrieved exitstatus", h.status)
- if i > h.status {
- h.status = i
- }
}
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 95693ab..0440706 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -19,6 +19,7 @@ type HealthHandler struct {
receive chan<- string
// The remote server address
server string
+ // The return status.
status int
}
@@ -45,10 +46,12 @@ func (h *HealthHandler) Status() int {
return h.status
}
+// Done returns done channel of the handler.
func (h *HealthHandler) Done() <-chan struct{} {
return h.done.Done()
}
+// Shutdown the handler.
func (h *HealthHandler) Shutdown() {
h.done.Shutdown()
}
diff --git a/internal/clients/maker.go b/internal/clients/maker.go
index 1ba6482..d5ffd8b 100644
--- a/internal/clients/maker.go
+++ b/internal/clients/maker.go
@@ -4,6 +4,9 @@ import (
"github.com/mimecast/dtail/internal/clients/handlers"
)
+// maker interface helps to re-use code in all DTail client implementations.
+// All clients share the baseClient but have different connection handlers
+// and send different commands to the DTail server.
type maker interface {
makeHandler(server string) handlers.Handler
makeCommands() (commands []string)
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 149129d..1c0c2cc 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -94,12 +94,12 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*
}
// Start starts the mapreduce client.
-func (c *MaprClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) {
+func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
go c.periodicReportResults(ctx)
status = c.baseClient.Start(ctx, statsCh)
if c.cumulative {
- logger.Info("Received final mapreduce result")
+ logger.Debug("Received final mapreduce result")
c.reportResults()
}
@@ -112,6 +112,7 @@ func (c MaprClient) makeHandler(server string) handlers.Handler {
func (c MaprClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
+ options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
modeStr := "cat"
if c.Mode == omode.TailClient {
@@ -123,7 +124,7 @@ func (c MaprClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize()))
continue
}
- commands = append(commands, fmt.Sprintf("%s %s %s", modeStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize()))
}
return
@@ -133,7 +134,7 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) {
for {
select {
case <-time.After(c.query.Interval):
- logger.Info("Gathering interim mapreduce result")
+ logger.Debug("Gathering interim mapreduce result")
c.reportResults()
case <-ctx.Done():
return
@@ -165,7 +166,7 @@ func (c *MaprClient) printResults() {
}
if numLines == 0 {
- logger.Info("Empty result set this time...")
+ logger.Warn("Empty result set this time...")
return
}
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index e7eabd8..d8163d4 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -8,6 +8,7 @@ import (
"sync"
"time"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
)
@@ -33,16 +34,18 @@ func newTailStats(connectionsTotal int) *stats {
// Start starts printing client connection stats every time a signal is recieved or
// connection count has changed.
-func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) {
+func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) {
var connectedLast int
for {
var force bool
+ var messages []string
select {
- case <-statsCh:
+ case message := <-statsCh:
+ messages = append(messages, message)
force = true
- case <-time.After(time.Second * 2):
+ case <-time.After(time.Second * 3):
case <-ctx.Done():
return
}
@@ -52,11 +55,18 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{})
newConnections := connected - connectedLast
- if connected == connectedLast && !force {
+ if (connected == connectedLast || quiet) && !force {
continue
}
- logger.Info(s.statsLine(connected, newConnections, throttle))
+ stats := s.statsLine(connected, newConnections, throttle)
+ switch force {
+ case true:
+ messages = append(messages, fmt.Sprintf("Connection stats: %s", stats))
+ s.printStatsDueInterrupt(messages)
+ default:
+ logger.Info(stats)
+ }
connectedLast = connected
s.mutex.Lock()
@@ -65,6 +75,15 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{})
}
}
+func (s *stats) printStatsDueInterrupt(messages []string) {
+ logger.Pause()
+ for _, message := range messages {
+ fmt.Println(fmt.Sprintf(" %s", message))
+ }
+ time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS))
+ logger.Resume()
+}
+
func (s *stats) statsLine(connected, newConnections int, throttle int) string {
percConnected := percentOf(float64(s.connectionsTotal), float64(connected))
@@ -88,5 +107,6 @@ func percentOf(total float64, value float64) float64 {
if total == 0 || total == value {
return 100
}
+
return value / (total / 100.0)
}
diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go
index 53b5ba4..cefbaa7 100644
--- a/internal/clients/tailclient.go
+++ b/internal/clients/tailclient.go
@@ -29,6 +29,7 @@ func NewTailClient(args Args) (*TailClient, error) {
c.init()
c.makeConnections(c)
+
return &c, nil
}
@@ -37,8 +38,9 @@ func (c TailClient) makeHandler(server string) handlers.Handler {
}
func (c TailClient) makeCommands() (commands []string) {
+ options := fmt.Sprintf("quiet=%v", c.Args.Quiet)
for _, file := range strings.Split(c.What, ",") {
- commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize()))
}
logger.Debug(commands)