summaryrefslogtreecommitdiff
path: root/internal/clients/handlers
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-26 11:26:53 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-02-07 13:31:15 +0000
commit0945da8dfefcbb723eecea0e5f4eafff63398253 (patch)
treef06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/clients/handlers
parent2a8e5de265a0e0a31a5834909d6879f5c9941467 (diff)
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/clients/handlers')
-rw-r--r--internal/clients/handlers/basehandler.go84
-rw-r--r--internal/clients/handlers/clienthandler.go11
-rw-r--r--internal/clients/handlers/handler.go12
-rw-r--r--internal/clients/handlers/healthhandler.go21
-rw-r--r--internal/clients/handlers/maprhandler.go21
-rw-r--r--internal/clients/handlers/withcancel.go24
6 files changed, 96 insertions, 77 deletions
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 19246f9..68b8ddc 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -1,60 +1,44 @@
package handlers
import (
- "github.com/mimecast/dtail/internal/logger"
- "errors"
+ "encoding/base64"
"fmt"
"io"
+ "strconv"
"strings"
"time"
+
+ "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/version"
)
type baseHandler struct {
+ withCancel
server string
shellStarted bool
commands chan string
- pong chan struct{}
receiveBuf []byte
- stop chan struct{}
- pingTimeout int
+ status int
}
func (h *baseHandler) Server() string {
return h.server
}
-// Used to determine whether server is still responding to requests or not.
-func (h *baseHandler) Ping() error {
- if h.pingTimeout == 0 {
- // Server ping disabled
- return nil
- }
-
- if err := h.SendCommand("ping"); err != nil {
- return err
- }
-
- select {
- case <-h.pong:
- return nil
- case <-time.After(time.Duration(h.pingTimeout) * time.Second):
- }
-
- return errors.New("Didn't receive any server pongs (ping replies)")
+func (h *baseHandler) Status() int {
+ return h.status
}
-func (h *baseHandler) SendCommand(command string) error {
- if command == "ping" {
- logger.Trace("Sending command", h.server, command)
- } else {
- logger.Debug("Sending command", h.server, command)
- }
+// SendMessage to the server.
+func (h *baseHandler) SendMessage(command string) error {
+ encoded := base64.StdEncoding.EncodeToString([]byte(command))
+ logger.Debug("Sending command", h.server, command, encoded)
select {
- case h.commands <- fmt.Sprintf("%s;", command):
+ case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded):
case <-time.After(time.Second * 5):
- return errors.New("Timed out sending command " + command)
- case <-h.stop:
+ return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded)
+ case <-h.ctx.Done():
}
return nil
@@ -81,7 +65,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
select {
case command := <-h.commands:
n = copy(p, []byte(command))
- case <-h.stop:
+ case <-h.ctx.Done():
return 0, io.EOF
}
return
@@ -92,6 +76,7 @@ func (h *baseHandler) handleMessageType(message string) {
if len(h.receiveBuf) == 0 {
return
}
+
// Hidden server commands starti with a dot "."
if h.receiveBuf[0] == '.' {
h.handleHiddenMessage(message)
@@ -108,6 +93,7 @@ func (h *baseHandler) handleMessageType(message string) {
h.receiveBuf = h.receiveBuf[:0]
return
}
+
logger.Raw(message)
h.receiveBuf = h.receiveBuf[:0]
}
@@ -116,19 +102,27 @@ func (h *baseHandler) handleMessageType(message string) {
// to the end user.
func (h *baseHandler) handleHiddenMessage(message string) {
switch {
- case strings.HasPrefix(message, ".pong"):
- h.pong <- struct{}{}
case strings.HasPrefix(message, ".syn close connection"):
- h.SendCommand("ack close connection")
- }
-}
+ h.SendMessage(".ack close connection")
+ select {
+ case <-time.After(time.Second * 1):
+ logger.Debug("Shutting down client after timeout and sending ack to server")
+ h.withCancel.shutdown()
+ case <-h.ctx.Done():
+ }
-// Stop the handler.
-func (h *baseHandler) Stop() {
- select {
- case <-h.stop:
- default:
- logger.Debug("Stopping base handler", h.server)
- close(h.stop)
+ case strings.HasPrefix(message, ".run exitstatus"):
+ splitted := strings.Split(strings.TrimSuffix(message, "\n"), " ")
+ if len(splitted) != 3 {
+ logger.Error("Unable to retrieve exitstatus", message)
+ return
+ }
+ i, err := strconv.Atoi(splitted[2])
+ if err != nil {
+ logger.Error("Unable to retrieve exitstatus", message, err)
+ return
+ }
+ h.status = i
+ logger.Debug("Retrieved exitstatus", h.status)
}
}
diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go
index 4738cd3..fcd8052 100644
--- a/internal/clients/handlers/clienthandler.go
+++ b/internal/clients/handlers/clienthandler.go
@@ -1,7 +1,7 @@
package handlers
import (
- "github.com/mimecast/dtail/internal/logger"
+ "github.com/mimecast/dtail/internal/io/logger"
)
// ClientHandler is the basic client handler interface.
@@ -10,7 +10,7 @@ type ClientHandler struct {
}
// NewClientHandler creates a new client handler.
-func NewClientHandler(server string, pingTimeout int) *ClientHandler {
+func NewClientHandler(server string) *ClientHandler {
logger.Debug(server, "Creating new client handler")
return &ClientHandler{
@@ -18,9 +18,10 @@ func NewClientHandler(server string, pingTimeout int) *ClientHandler {
server: server,
shellStarted: false,
commands: make(chan string),
- pong: make(chan struct{}, 1),
- stop: make(chan struct{}),
- pingTimeout: pingTimeout,
+ status: -1,
+ withCancel: withCancel{
+ done: make(chan struct{}),
+ },
},
}
}
diff --git a/internal/clients/handlers/handler.go b/internal/clients/handlers/handler.go
index 2013be0..c53ca34 100644
--- a/internal/clients/handlers/handler.go
+++ b/internal/clients/handlers/handler.go
@@ -1,12 +1,16 @@
package handlers
-import "io"
+import (
+ "context"
+ "io"
+)
// Handler provides all methods which can be run on any client handler.
type Handler interface {
io.ReadWriter
- Ping() error
- Stop()
- SendCommand(command string) error
+ SendMessage(command string) error
Server() string
+ Status() int
+ WithCancel(ctx context.Context) (context.Context, context.CancelFunc)
+ Done() <-chan struct{}
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 4051e2c..9051015 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -8,6 +8,7 @@ import (
// HealthHandler implements the handler required for health checks.
type HealthHandler struct {
+ withCancel
// Buffer of incoming data from server.
receiveBuf []byte
// To send commands to the server.
@@ -16,6 +17,7 @@ type HealthHandler struct {
receive chan<- string
// The remote server address
server string
+ status int
}
// NewHealthHandler returns a new health check handler.
@@ -24,6 +26,10 @@ func NewHealthHandler(server string, receive chan<- string) *HealthHandler {
server: server,
receive: receive,
commands: make(chan string),
+ status: -1,
+ withCancel: withCancel{
+ done: make(chan struct{}),
+ },
}
return &h
@@ -34,18 +40,13 @@ func (h *HealthHandler) Server() string {
return h.server
}
-// Stop is not of use for health check handler.
-func (h *HealthHandler) Stop() {
- // Nothing done here.
+// Status of the handler.
+func (h *HealthHandler) Status() int {
+ return h.status
}
-// Ping is not of use for health check handler.
-func (h *HealthHandler) Ping() error {
- return nil
-}
-
-// SendCommand send a DTail command to the server.
-func (h *HealthHandler) SendCommand(command string) error {
+// SendMessage sends a DTail command to the server.
+func (h *HealthHandler) SendMessage(command string) error {
select {
case h.commands <- fmt.Sprintf("%s;", command):
case <-time.NewTimer(time.Second * 10).C:
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index d76cdfd..874bb7d 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -1,10 +1,11 @@
package handlers
import (
- "github.com/mimecast/dtail/internal/logger"
+ "strings"
+
+ "github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
- "strings"
)
// MaprHandler is the handler used on the client side for running mapreduce aggregations.
@@ -16,15 +17,16 @@ type MaprHandler struct {
}
// NewMaprHandler returns a new mapreduce client handler.
-func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet, pingTimeout int) *MaprHandler {
+func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler {
return &MaprHandler{
baseHandler: baseHandler{
server: server,
shellStarted: false,
commands: make(chan string),
- pong: make(chan struct{}, 1),
- stop: make(chan struct{}),
- pingTimeout: pingTimeout,
+ status: -1,
+ withCancel: withCancel{
+ done: make(chan struct{}),
+ },
},
query: query,
aggregate: client.NewAggregate(server, query, globalGroup),
@@ -65,10 +67,3 @@ func (h *MaprHandler) handleAggregateMessage(message string) {
h.aggregate.Aggregate(parts[2:])
logger.Debug("Aggregated aggregate data", h.server, h.count)
}
-
-// Stop stops the mapreduce client handler.
-func (h *MaprHandler) Stop() {
- logger.Debug("Stopping mapreduce handler", h.server)
- h.aggregate.Stop()
- h.baseHandler.Stop()
-}
diff --git a/internal/clients/handlers/withcancel.go b/internal/clients/handlers/withcancel.go
new file mode 100644
index 0000000..7c9cf4e
--- /dev/null
+++ b/internal/clients/handlers/withcancel.go
@@ -0,0 +1,24 @@
+package handlers
+
+import "context"
+
+type withCancel struct {
+ ctx context.Context
+ done chan struct{}
+}
+
+// WithCancel sets and returns the context used.
+func (w *withCancel) WithCancel(ctx context.Context) (context.Context, context.CancelFunc) {
+ cancelCtx, cancel := context.WithCancel(ctx)
+ w.ctx = cancelCtx
+
+ return cancelCtx, cancel
+}
+
+func (w *withCancel) Done() <-chan struct{} {
+ return w.done
+}
+
+func (w *withCancel) shutdown() {
+ close(w.done)
+}