diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-12-08 14:49:41 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-12-08 14:49:41 +0000 |
| commit | 799b9b69ba08b898e13026b7ecab9f9f58580a82 (patch) | |
| tree | 34bc0e5e539aed99dd1f13e7489e9d3111ba050f /internal/clients | |
| parent | 6b2d8539a66f1b36ffd55c56723376b9b068a5dc (diff) | |
merge develop
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/baseclient.go | 4 | ||||
| -rw-r--r-- | internal/clients/client.go | 2 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 23 | ||||
| -rw-r--r-- | internal/clients/handlers/clienthandler.go | 5 | ||||
| -rw-r--r-- | internal/clients/handlers/handler.go | 3 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 19 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 5 | ||||
| -rw-r--r-- | internal/clients/healthclient.go | 2 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 4 | ||||
| -rw-r--r-- | internal/clients/remote/connection.go | 7 | ||||
| -rw-r--r-- | internal/clients/stats.go | 42 |
11 files changed, 79 insertions, 37 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 008a01e..69055a3 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -66,7 +66,7 @@ func (c *baseClient) makeConnections(maker maker) { c.stats = newTailStats(len(c.connections)) } -func (c *baseClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) { +func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) { // Periodically check for unknown hosts, and ask the user whether to trust them or not. go c.hostKeyCallback.PromptAddHosts(ctx) // Print client stats every time something on statsCh is recieved. @@ -99,7 +99,7 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con defer func() { <-active }() for { - connCtx, cancel := conn.Handler.WithCancel(ctx) + connCtx, cancel := context.WithCancel(ctx) defer cancel() conn.Start(connCtx, cancel, c.throttleCh, c.stats.connectionsEstCh) diff --git a/internal/clients/client.go b/internal/clients/client.go index eb8452d..4a547e8 100644 --- a/internal/clients/client.go +++ b/internal/clients/client.go @@ -4,5 +4,5 @@ import "context" // Client is the interface for the end user command line client. type Client interface { - Start(ctx context.Context, statsCh <-chan struct{}) int + Start(ctx context.Context, statsCh <-chan string) int } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 65bbfd7..b5045e2 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -8,12 +8,13 @@ import ( "strings" "time" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/version" ) type baseHandler struct { - withCancel + done *internal.Done server string shellStarted bool commands chan string @@ -29,6 +30,14 @@ func (h *baseHandler) Status() int { return h.status } +func (h *baseHandler) Done() <-chan struct{} { + return h.done.Done() +} + +func (h *baseHandler) Shutdown() { + h.done.Shutdown() +} + // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) @@ -38,7 +47,8 @@ func (h *baseHandler) SendMessage(command string) error { case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded): case <-time.After(time.Second * 5): return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) - case <-h.ctx.Done(): + case <-h.Done(): + return nil } return nil @@ -65,7 +75,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { select { case command := <-h.commands: n = copy(p, []byte(command)) - case <-h.ctx.Done(): + case <-h.Done(): return 0, io.EOF } return @@ -95,10 +105,11 @@ func (h *baseHandler) handleHiddenMessage(message string) { case strings.HasPrefix(message, ".syn close connection"): h.SendMessage(".ack close connection") select { - case <-time.After(time.Second * 1): + case <-time.After(time.Second * 5): logger.Debug("Shutting down client after timeout and sending ack to server") - h.withCancel.shutdown() - case <-h.ctx.Done(): + h.Shutdown() + case <-h.Done(): + return } case strings.HasPrefix(message, ".run exitstatus"): diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go index fcd8052..2bcb038 100644 --- a/internal/clients/handlers/clienthandler.go +++ b/internal/clients/handlers/clienthandler.go @@ -1,6 +1,7 @@ package handlers import ( + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" ) @@ -19,9 +20,7 @@ func NewClientHandler(server string) *ClientHandler { shellStarted: false, commands: make(chan string), status: -1, - withCancel: withCancel{ - done: make(chan struct{}), - }, + done: internal.NewDone(), }, } } diff --git a/internal/clients/handlers/handler.go b/internal/clients/handlers/handler.go index c53ca34..afa87e2 100644 --- a/internal/clients/handlers/handler.go +++ b/internal/clients/handlers/handler.go @@ -1,7 +1,6 @@ package handlers import ( - "context" "io" ) @@ -11,6 +10,6 @@ type Handler interface { SendMessage(command string) error Server() string Status() int - WithCancel(ctx context.Context) (context.Context, context.CancelFunc) + Shutdown() Done() <-chan struct{} } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 9051015..08ed137 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -4,11 +4,13 @@ import ( "errors" "fmt" "time" + + "github.com/mimecast/dtail/internal" ) // HealthHandler implements the handler required for health checks. type HealthHandler struct { - withCancel + done *internal.Done // Buffer of incoming data from server. receiveBuf []byte // To send commands to the server. @@ -27,9 +29,7 @@ func NewHealthHandler(server string, receive chan<- string) *HealthHandler { receive: receive, commands: make(chan string), status: -1, - withCancel: withCancel{ - done: make(chan struct{}), - }, + done: internal.NewDone(), } return &h @@ -45,12 +45,23 @@ func (h *HealthHandler) Status() int { return h.status } +// Done returns done channel of the handler. +func (h *HealthHandler) Done() <-chan struct{} { + return h.done.Done() +} + +// Shutdown the handler. +func (h *HealthHandler) Shutdown() { + h.done.Shutdown() +} + // SendMessage sends a DTail command to the server. func (h *HealthHandler) SendMessage(command string) error { select { case h.commands <- fmt.Sprintf("%s;", command): case <-time.NewTimer(time.Second * 10).C: return errors.New("Timed out sending command " + command) + case <-h.Done(): } return nil diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index b908f3b..fb71c8f 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -3,6 +3,7 @@ package handlers import ( "strings" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" @@ -24,9 +25,7 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr shellStarted: false, commands: make(chan string), status: -1, - withCancel: withCancel{ - done: make(chan struct{}), - }, + done: internal.NewDone(), }, query: query, aggregate: client.NewAggregate(server, query, globalGroup), diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index 7313583..e93f6be 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -50,7 +50,7 @@ func (c *HealthClient) Start(ctx context.Context) (status int) { conn.Handler = handlers.NewHealthHandler(c.server, receive) conn.Commands = []string{c.mode.String()} - connCtx, cancel := conn.Handler.WithCancel(ctx) + connCtx, cancel := context.WithCancel(ctx) go conn.Start(connCtx, cancel, throttleCh, statsCh) for { diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 581db44..6aadd6b 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -94,7 +94,7 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* } // Start starts the mapreduce client. -func (c *MaprClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) { +func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status int) { go c.periodicReportResults(ctx) status = c.baseClient.Start(ctx, statsCh) @@ -123,7 +123,7 @@ func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) continue } - commands = append(commands, fmt.Sprintf("%s %s regex %s", modeStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s %s %s", modeStr, file, c.Regex.Serialize())) } return diff --git a/internal/clients/remote/connection.go b/internal/clients/remote/connection.go index 2d97d14..b29ffed 100644 --- a/internal/clients/remote/connection.go +++ b/internal/clients/remote/connection.go @@ -177,21 +177,21 @@ func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, sess } go func() { - defer cancel() io.Copy(stdinPipe, c.Handler) + cancel() }() go func() { - defer cancel() io.Copy(c.Handler, stdoutPipe) + cancel() }() go func() { - defer cancel() select { case <-c.Handler.Done(): case <-ctx.Done(): } + cancel() }() // Send all commands to client. @@ -207,5 +207,6 @@ func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, sess } <-ctx.Done() + c.Handler.Shutdown() return nil } diff --git a/internal/clients/stats.go b/internal/clients/stats.go index a6ac0c5..17343b5 100644 --- a/internal/clients/stats.go +++ b/internal/clients/stats.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "runtime" + "strings" "sync" "time" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" ) @@ -32,16 +34,18 @@ func newTailStats(connectionsTotal int) *stats { // Start starts printing client connection stats every time a signal is recieved or // connection count has changed. -func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) { +func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string) { var connectedLast int for { var force bool + var messages []string select { - case <-statsCh: + case message := <-statsCh: + messages = append(messages, message) force = true - case <-time.After(time.Second * 2): + case <-time.After(time.Second * 10): case <-ctx.Done(): return } @@ -54,7 +58,15 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) if connected == connectedLast && !force { continue } - s.log(connected, newConnections, throttle) + + stats := s.statsLine(connected, newConnections, throttle) + switch force { + case true: + messages = append(messages, fmt.Sprintf("Connection stats: %s", stats)) + s.printStatsOnInterrupt(messages) + default: + logger.Info(stats) + } connectedLast = connected s.mutex.Lock() @@ -63,15 +75,25 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) } } -func (s *stats) log(connected, newConnections int, throttle int) { +func (s *stats) printStatsOnInterrupt(messages []string) { + logger.Pause() + for _, message := range messages { + fmt.Println(fmt.Sprintf(" %s", message)) + } + time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS)) + logger.Resume() +} + +func (s *stats) statsLine(connected, newConnections int, throttle int) string { percConnected := percentOf(float64(s.connectionsTotal), float64(connected)) - connectedStr := fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected)) - newConnStr := fmt.Sprintf("new=%d", newConnections) - throttleStr := fmt.Sprintf("throttle=%d", throttle) - cpusGoroutinesStr := fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine()) + var stats []string + stats = append(stats, fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected))) + stats = append(stats, fmt.Sprintf("new=%d", newConnections)) + stats = append(stats, fmt.Sprintf("throttle=%d", throttle)) + stats = append(stats, fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine())) - logger.Info("stats", connectedStr, newConnStr, throttleStr, cpusGoroutinesStr) + return strings.Join(stats, "|") } func (s *stats) numConnected() int { |
