summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-12-08 14:49:41 +0000
committerPaul Buetow <pbuetow@mimecast.com>2020-12-08 14:49:41 +0000
commit799b9b69ba08b898e13026b7ecab9f9f58580a82 (patch)
tree34bc0e5e539aed99dd1f13e7489e9d3111ba050f /internal/clients
parent6b2d8539a66f1b36ffd55c56723376b9b068a5dc (diff)
merge develop
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/baseclient.go4
-rw-r--r--internal/clients/client.go2
-rw-r--r--internal/clients/handlers/basehandler.go23
-rw-r--r--internal/clients/handlers/clienthandler.go5
-rw-r--r--internal/clients/handlers/handler.go3
-rw-r--r--internal/clients/handlers/healthhandler.go19
-rw-r--r--internal/clients/handlers/maprhandler.go5
-rw-r--r--internal/clients/healthclient.go2
-rw-r--r--internal/clients/maprclient.go4
-rw-r--r--internal/clients/remote/connection.go7
-rw-r--r--internal/clients/stats.go42
11 files changed, 79 insertions, 37 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 008a01e..69055a3 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -66,7 +66,7 @@ func (c *baseClient) makeConnections(maker maker) {
c.stats = newTailStats(len(c.connections))
}
-func (c *baseClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) {
+func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
// Periodically check for unknown hosts, and ask the user whether to trust them or not.
go c.hostKeyCallback.PromptAddHosts(ctx)
// Print client stats every time something on statsCh is recieved.
@@ -99,7 +99,7 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con
defer func() { <-active }()
for {
- connCtx, cancel := conn.Handler.WithCancel(ctx)
+ connCtx, cancel := context.WithCancel(ctx)
defer cancel()
conn.Start(connCtx, cancel, c.throttleCh, c.stats.connectionsEstCh)
diff --git a/internal/clients/client.go b/internal/clients/client.go
index eb8452d..4a547e8 100644
--- a/internal/clients/client.go
+++ b/internal/clients/client.go
@@ -4,5 +4,5 @@ import "context"
// Client is the interface for the end user command line client.
type Client interface {
- Start(ctx context.Context, statsCh <-chan struct{}) int
+ Start(ctx context.Context, statsCh <-chan string) int
}
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 65bbfd7..b5045e2 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -8,12 +8,13 @@ import (
"strings"
"time"
+ "github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/version"
)
type baseHandler struct {
- withCancel
+ done *internal.Done
server string
shellStarted bool
commands chan string
@@ -29,6 +30,14 @@ func (h *baseHandler) Status() int {
return h.status
}
+func (h *baseHandler) Done() <-chan struct{} {
+ return h.done.Done()
+}
+
+func (h *baseHandler) Shutdown() {
+ h.done.Shutdown()
+}
+
// SendMessage to the server.
func (h *baseHandler) SendMessage(command string) error {
encoded := base64.StdEncoding.EncodeToString([]byte(command))
@@ -38,7 +47,8 @@ func (h *baseHandler) SendMessage(command string) error {
case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded):
case <-time.After(time.Second * 5):
return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded)
- case <-h.ctx.Done():
+ case <-h.Done():
+ return nil
}
return nil
@@ -65,7 +75,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
select {
case command := <-h.commands:
n = copy(p, []byte(command))
- case <-h.ctx.Done():
+ case <-h.Done():
return 0, io.EOF
}
return
@@ -95,10 +105,11 @@ func (h *baseHandler) handleHiddenMessage(message string) {
case strings.HasPrefix(message, ".syn close connection"):
h.SendMessage(".ack close connection")
select {
- case <-time.After(time.Second * 1):
+ case <-time.After(time.Second * 5):
logger.Debug("Shutting down client after timeout and sending ack to server")
- h.withCancel.shutdown()
- case <-h.ctx.Done():
+ h.Shutdown()
+ case <-h.Done():
+ return
}
case strings.HasPrefix(message, ".run exitstatus"):
diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go
index fcd8052..2bcb038 100644
--- a/internal/clients/handlers/clienthandler.go
+++ b/internal/clients/handlers/clienthandler.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/logger"
)
@@ -19,9 +20,7 @@ func NewClientHandler(server string) *ClientHandler {
shellStarted: false,
commands: make(chan string),
status: -1,
- withCancel: withCancel{
- done: make(chan struct{}),
- },
+ done: internal.NewDone(),
},
}
}
diff --git a/internal/clients/handlers/handler.go b/internal/clients/handlers/handler.go
index c53ca34..afa87e2 100644
--- a/internal/clients/handlers/handler.go
+++ b/internal/clients/handlers/handler.go
@@ -1,7 +1,6 @@
package handlers
import (
- "context"
"io"
)
@@ -11,6 +10,6 @@ type Handler interface {
SendMessage(command string) error
Server() string
Status() int
- WithCancel(ctx context.Context) (context.Context, context.CancelFunc)
+ Shutdown()
Done() <-chan struct{}
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 9051015..08ed137 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -4,11 +4,13 @@ import (
"errors"
"fmt"
"time"
+
+ "github.com/mimecast/dtail/internal"
)
// HealthHandler implements the handler required for health checks.
type HealthHandler struct {
- withCancel
+ done *internal.Done
// Buffer of incoming data from server.
receiveBuf []byte
// To send commands to the server.
@@ -27,9 +29,7 @@ func NewHealthHandler(server string, receive chan<- string) *HealthHandler {
receive: receive,
commands: make(chan string),
status: -1,
- withCancel: withCancel{
- done: make(chan struct{}),
- },
+ done: internal.NewDone(),
}
return &h
@@ -45,12 +45,23 @@ func (h *HealthHandler) Status() int {
return h.status
}
+// Done returns done channel of the handler.
+func (h *HealthHandler) Done() <-chan struct{} {
+ return h.done.Done()
+}
+
+// Shutdown the handler.
+func (h *HealthHandler) Shutdown() {
+ h.done.Shutdown()
+}
+
// SendMessage sends a DTail command to the server.
func (h *HealthHandler) SendMessage(command string) error {
select {
case h.commands <- fmt.Sprintf("%s;", command):
case <-time.NewTimer(time.Second * 10).C:
return errors.New("Timed out sending command " + command)
+ case <-h.Done():
}
return nil
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index b908f3b..fb71c8f 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -3,6 +3,7 @@ package handlers
import (
"strings"
+ "github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
@@ -24,9 +25,7 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr
shellStarted: false,
commands: make(chan string),
status: -1,
- withCancel: withCancel{
- done: make(chan struct{}),
- },
+ done: internal.NewDone(),
},
query: query,
aggregate: client.NewAggregate(server, query, globalGroup),
diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go
index 7313583..e93f6be 100644
--- a/internal/clients/healthclient.go
+++ b/internal/clients/healthclient.go
@@ -50,7 +50,7 @@ func (c *HealthClient) Start(ctx context.Context) (status int) {
conn.Handler = handlers.NewHealthHandler(c.server, receive)
conn.Commands = []string{c.mode.String()}
- connCtx, cancel := conn.Handler.WithCancel(ctx)
+ connCtx, cancel := context.WithCancel(ctx)
go conn.Start(connCtx, cancel, throttleCh, statsCh)
for {
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 581db44..6aadd6b 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -94,7 +94,7 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*
}
// Start starts the mapreduce client.
-func (c *MaprClient) Start(ctx context.Context, statsCh <-chan struct{}) (status int) {
+func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
go c.periodicReportResults(ctx)
status = c.baseClient.Start(ctx, statsCh)
@@ -123,7 +123,7 @@ func (c MaprClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize()))
continue
}
- commands = append(commands, fmt.Sprintf("%s %s regex %s", modeStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s %s %s", modeStr, file, c.Regex.Serialize()))
}
return
diff --git a/internal/clients/remote/connection.go b/internal/clients/remote/connection.go
index 2d97d14..b29ffed 100644
--- a/internal/clients/remote/connection.go
+++ b/internal/clients/remote/connection.go
@@ -177,21 +177,21 @@ func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, sess
}
go func() {
- defer cancel()
io.Copy(stdinPipe, c.Handler)
+ cancel()
}()
go func() {
- defer cancel()
io.Copy(c.Handler, stdoutPipe)
+ cancel()
}()
go func() {
- defer cancel()
select {
case <-c.Handler.Done():
case <-ctx.Done():
}
+ cancel()
}()
// Send all commands to client.
@@ -207,5 +207,6 @@ func (c *Connection) handle(ctx context.Context, cancel context.CancelFunc, sess
}
<-ctx.Done()
+ c.Handler.Shutdown()
return nil
}
diff --git a/internal/clients/stats.go b/internal/clients/stats.go
index a6ac0c5..17343b5 100644
--- a/internal/clients/stats.go
+++ b/internal/clients/stats.go
@@ -4,9 +4,11 @@ import (
"context"
"fmt"
"runtime"
+ "strings"
"sync"
"time"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
)
@@ -32,16 +34,18 @@ func newTailStats(connectionsTotal int) *stats {
// Start starts printing client connection stats every time a signal is recieved or
// connection count has changed.
-func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{}) {
+func (s *stats) Start(ctx context.Context, throttleCh <-chan struct{}, statsCh <-chan string) {
var connectedLast int
for {
var force bool
+ var messages []string
select {
- case <-statsCh:
+ case message := <-statsCh:
+ messages = append(messages, message)
force = true
- case <-time.After(time.Second * 2):
+ case <-time.After(time.Second * 10):
case <-ctx.Done():
return
}
@@ -54,7 +58,15 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{})
if connected == connectedLast && !force {
continue
}
- s.log(connected, newConnections, throttle)
+
+ stats := s.statsLine(connected, newConnections, throttle)
+ switch force {
+ case true:
+ messages = append(messages, fmt.Sprintf("Connection stats: %s", stats))
+ s.printStatsOnInterrupt(messages)
+ default:
+ logger.Info(stats)
+ }
connectedLast = connected
s.mutex.Lock()
@@ -63,15 +75,25 @@ func (s *stats) Start(ctx context.Context, throttleCh, statsCh <-chan struct{})
}
}
-func (s *stats) log(connected, newConnections int, throttle int) {
+func (s *stats) printStatsOnInterrupt(messages []string) {
+ logger.Pause()
+ for _, message := range messages {
+ fmt.Println(fmt.Sprintf(" %s", message))
+ }
+ time.Sleep(time.Second * time.Duration(config.InterruptTimeoutS))
+ logger.Resume()
+}
+
+func (s *stats) statsLine(connected, newConnections int, throttle int) string {
percConnected := percentOf(float64(s.connectionsTotal), float64(connected))
- connectedStr := fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected))
- newConnStr := fmt.Sprintf("new=%d", newConnections)
- throttleStr := fmt.Sprintf("throttle=%d", throttle)
- cpusGoroutinesStr := fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine())
+ var stats []string
+ stats = append(stats, fmt.Sprintf("connected=%d/%d(%d%%)", connected, s.connectionsTotal, int(percConnected)))
+ stats = append(stats, fmt.Sprintf("new=%d", newConnections))
+ stats = append(stats, fmt.Sprintf("throttle=%d", throttle))
+ stats = append(stats, fmt.Sprintf("cpus/goroutines=%d/%d", runtime.NumCPU(), runtime.NumGoroutine()))
- logger.Info("stats", connectedStr, newConnStr, throttleStr, cpusGoroutinesStr)
+ return strings.Join(stats, "|")
}
func (s *stats) numConnected() int {