diff options
| author | Paul Buetow <git@mx.buetow.org> | 2020-12-28 09:49:10 +0000 |
|---|---|---|
| committer | Paul Buetow <git@mx.buetow.org> | 2020-12-28 09:49:10 +0000 |
| commit | a5a91623ee60f33dafc16e1752f3fb1f6798ee76 (patch) | |
| tree | c6433ef4a3415cc7206b5fbe733c0539d0e5a60f /internal/clients | |
| parent | ae8ffc84331ca72f0d33fff69edd85d6e03d29ae (diff) | |
| parent | 495e9f38220a6d448b15882a235e7a9c21f21d18 (diff) | |
merge
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/args.go | 1 | ||||
| -rw-r--r-- | internal/clients/baseclient.go | 8 | ||||
| -rw-r--r-- | internal/clients/catclient.go | 3 | ||||
| -rw-r--r-- | internal/clients/client.go | 2 | ||||
| -rw-r--r-- | internal/clients/grepclient.go | 4 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 18 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 3 | ||||
| -rw-r--r-- | internal/clients/maker.go | 3 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 11 | ||||
| -rw-r--r-- | internal/clients/stats.go | 30 | ||||
| -rw-r--r-- | internal/clients/tailclient.go | 4 |
11 files changed, 52 insertions, 35 deletions
diff --git a/internal/clients/args.go b/internal/clients/args.go index 34fcfa2..7f782f1 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -22,4 +22,5 @@ type Args struct { SSHAuthMethods []gossh.AuthMethod SSHHostKeyCallback gossh.HostKeyCallback PrivateKeyPathFile string + Quiet bool } diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index d8d4fde..f20156f 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -39,7 +39,7 @@ type baseClient struct { } func (c *baseClient) init() { - logger.Info("Initiating base client") + logger.Debug("Initiating base client") flag := regex.Default if c.Args.RegexInvert { @@ -66,11 +66,11 @@ func (c *baseClient) makeConnections(maker maker) { c.stats = newTailStats(len(c.connections)) } -func (c *baseClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) { +func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) { // Periodically check for unknown hosts, and ask the user whether to trust them or not. go c.hostKeyCallback.PromptAddHosts(ctx) // Print client stats every time something on statsCh is recieved. - go c.stats.Start(ctx, c.throttleCh, statsCh) + go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) // Keep count of active connections active := make(chan struct{}, len(c.connections)) @@ -127,7 +127,7 @@ func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMe } func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { - defer logger.Info("Terminated connection") + defer logger.Debug("Terminated connection") // We want to have at least one active connection <-active diff --git a/internal/clients/catclient.go b/internal/clients/catclient.go index d8e9196..b7b6131 100644 --- a/internal/clients/catclient.go +++ b/internal/clients/catclient.go @@ -42,8 +42,9 @@ func (c CatClient) makeHandler(server string) handlers.Handler { } func (c CatClient) makeCommands() (commands []string) { + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } return } diff --git a/internal/clients/client.go b/internal/clients/client.go index eb8452d..4a547e8 100644 --- a/internal/clients/client.go +++ b/internal/clients/client.go @@ -4,5 +4,5 @@ import "context" // Client is the interface for the end user command line client. type Client interface { - Start(ctx context.Context, statsCh <-chan struct{}) int + Start(ctx context.Context, statsCh <-chan string) int } diff --git a/internal/clients/grepclient.go b/internal/clients/grepclient.go index 4024083..652c31b 100644 --- a/internal/clients/grepclient.go +++ b/internal/clients/grepclient.go @@ -41,8 +41,10 @@ func (c GrepClient) makeHandler(server string) handlers.Handler { } func (c GrepClient) makeCommands() (commands []string) { + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } + return } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index b5045e2..f07fd90 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -4,7 +4,6 @@ import ( "encoding/base64" "fmt" "io" - "strconv" "strings" "time" @@ -78,6 +77,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.Done(): return 0, io.EOF } + return } @@ -111,21 +111,5 @@ func (h *baseHandler) handleHiddenMessage(message string) { case <-h.Done(): return } - - case strings.HasPrefix(message, ".run exitstatus"): - splitted := strings.Split(strings.TrimSuffix(message, "\n"), " ") - if len(splitted) != 3 { - logger.Error("Unable to retrieve exitstatus", message) - return - } - i, err := strconv.Atoi(splitted[2]) - if err != nil { - logger.Error("Unable to retrieve exitstatus", message, err) - return - } - logger.Debug("Retrieved exitstatus", h.status) - if i > h.status { - h.status = i - } } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 95693ab..0440706 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -19,6 +19,7 @@ type HealthHandler struct { receive chan<- string // The remote server address server string + // The return status. status int } @@ -45,10 +46,12 @@ func (h *HealthHandler) Status() int { return h.status } +// Done returns done channel of the handler. func (h *HealthHandler) Done() <-chan struct{} { return h.done.Done() } +// Shutdown the handler. func (h *HealthHandler) Shutdown() { h.done.Shutdown() } diff --git a/internal/clients/maker.go b/internal/clients/maker.go index 1ba6482..d5ffd8b 100644 --- a/internal/clients/maker.go +++ b/internal/clients/maker.go @@ -4,6 +4,9 @@ import ( "github.com/mimecast/dtail/internal/clients/handlers" ) +// maker interface helps to re-use code in all DTail client implementations. +// All clients share the baseClient but have different connection handlers +// and send different commands to the DTail server. type maker interface { makeHandler(server string) handlers.Handler makeCommands() (commands []string) diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 149129d..1c0c2cc 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -94,12 +94,12 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* } // Start starts the mapreduce client. -func (c *MaprClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) { +func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status int) { go c.periodicReportResults(ctx) status = c.baseClient.Start(ctx, statsCh) if c.cumulative { - logger.Info("Received final mapreduce result") + logger.Debug("Received final mapreduce result") c.reportResults() } @@ -112,6 +112,7 @@ func (c MaprClient) makeHandler(server string) handlers.Handler { func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) modeStr := "cat" if c.Mode == omode.TailClient { @@ -123,7 +124,7 @@ func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) continue } - commands = append(commands, fmt.Sprintf("%s %s %s", modeStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, options, file, c.Regex.Serialize())) } return @@ -133,7 +134,7 @@ func (c *MaprClient) periodicReportResults(ctx context.Context) { for { select { case <-time.After(c.query.Interval): - logger.Info("Gathering interim mapreduce result") + logger.Debug("Gathering interim mapreduce result") c.reportResults() case <-ctx.Done(): return @@ -165,7 +166,7 @@ func (c *MaprClient) printResults() { } if numLines == 0 { - logger.Info("Empty result set this time...") + logger.Warn("Empty result set this time...") return } diff --git a/internal/clients/stats.go b/internal/clients/stats.go index e7eabd8..d8163d4 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" ) @@ -33,16 +34,18 @@ func newTailStats(connectionsTotal int) *stats { // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) { +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string, quiet bool) { var connectedLast int for { var force bool + var messages []string select { - case <-statsCh: + case message := <-statsCh: + messages = append(messages, message) force = true - case <-time.After(time.Second * 2): + case <-time.After(time.Second * 3): case <-ctx.Done(): return } @@ -52,11 +55,18 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) newConnections := connected - connectedLast - if connected == connectedLast && !force { + if (connected == connectedLast || quiet) && !force { continue } - logger.Info(s.statsLine(connected, newConnections, throttle)) + stats := s.statsLine(connected, newConnections, throttle) + switch force { + case true: + messages = append(messages, fmt.Sprintf("Connection stats: %s", stats)) + s.printStatsDueInterrupt(messages) + default: + logger.Info(stats) + } connectedLast = connected s.mutex.Lock() @@ -65,6 +75,15 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) } } +func (s *stats) printStatsDueInterrupt(messages []string) { + logger.Pause() + for _, message := range messages { + fmt.Println(fmt.Sprintf(" %s", message)) + } + time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS)) + logger.Resume() +} + func (s *stats) statsLine(connected, newConnections int, throttle int) string { percConnected := percentOf(float64(s.connectionsTotal), float64(connected)) @@ -88,5 +107,6 @@ func percentOf(total float64, value float64) float64 { if total == 0 || total == value { return 100 } + return value / (total / 100.0) } diff --git a/internal/clients/tailclient.go b/internal/clients/tailclient.go index 53b5ba4..cefbaa7 100644 --- a/internal/clients/tailclient.go +++ b/internal/clients/tailclient.go @@ -29,6 +29,7 @@ func NewTailClient(args Args) (*TailClient, error) { c.init() c.makeConnections(c) + return &c, nil } @@ -37,8 +38,9 @@ func (c TailClient) makeHandler(server string) handlers.Handler { } func (c TailClient) makeCommands() (commands []string) { + options := fmt.Sprintf("quiet=%v", c.Args.Quiet) for _, file := range strings.Split(c.What, ",") { - commands = append(commands, fmt.Sprintf("%s %s %s", c.Mode.String(), file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", c.Mode.String(), options, file, c.Regex.Serialize())) } logger.Debug(commands) |
