diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-01-26 11:26:53 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-02-07 13:31:15 +0000 |
| commit | 0945da8dfefcbb723eecea0e5f4eafff63398253 (patch) | |
| tree | f06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/clients/handlers | |
| parent | 2a8e5de265a0e0a31a5834909d6879f5c9941467 (diff) | |
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/clients/handlers')
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 84 | ||||
| -rw-r--r-- | internal/clients/handlers/clienthandler.go | 11 | ||||
| -rw-r--r-- | internal/clients/handlers/handler.go | 12 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 21 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 21 | ||||
| -rw-r--r-- | internal/clients/handlers/withcancel.go | 24 |
6 files changed, 96 insertions, 77 deletions
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 19246f9..68b8ddc 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -1,60 +1,44 @@ package handlers import ( - "github.com/mimecast/dtail/internal/logger" - "errors" + "encoding/base64" "fmt" "io" + "strconv" "strings" "time" + + "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/version" ) type baseHandler struct { + withCancel server string shellStarted bool commands chan string - pong chan struct{} receiveBuf []byte - stop chan struct{} - pingTimeout int + status int } func (h *baseHandler) Server() string { return h.server } -// Used to determine whether server is still responding to requests or not. -func (h *baseHandler) Ping() error { - if h.pingTimeout == 0 { - // Server ping disabled - return nil - } - - if err := h.SendCommand("ping"); err != nil { - return err - } - - select { - case <-h.pong: - return nil - case <-time.After(time.Duration(h.pingTimeout) * time.Second): - } - - return errors.New("Didn't receive any server pongs (ping replies)") +func (h *baseHandler) Status() int { + return h.status } -func (h *baseHandler) SendCommand(command string) error { - if command == "ping" { - logger.Trace("Sending command", h.server, command) - } else { - logger.Debug("Sending command", h.server, command) - } +// SendMessage to the server. +func (h *baseHandler) SendMessage(command string) error { + encoded := base64.StdEncoding.EncodeToString([]byte(command)) + logger.Debug("Sending command", h.server, command, encoded) select { - case h.commands <- fmt.Sprintf("%s;", command): + case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded): case <-time.After(time.Second * 5): - return errors.New("Timed out sending command " + command) - case <-h.stop: + return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) + case <-h.ctx.Done(): } return nil @@ -81,7 +65,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { select { case command := <-h.commands: n = copy(p, []byte(command)) - case <-h.stop: + case <-h.ctx.Done(): return 0, io.EOF } return @@ -92,6 +76,7 @@ func (h *baseHandler) handleMessageType(message string) { if len(h.receiveBuf) == 0 { return } + // Hidden server commands starti with a dot "." if h.receiveBuf[0] == '.' { h.handleHiddenMessage(message) @@ -108,6 +93,7 @@ func (h *baseHandler) handleMessageType(message string) { h.receiveBuf = h.receiveBuf[:0] return } + logger.Raw(message) h.receiveBuf = h.receiveBuf[:0] } @@ -116,19 +102,27 @@ func (h *baseHandler) handleMessageType(message string) { // to the end user. func (h *baseHandler) handleHiddenMessage(message string) { switch { - case strings.HasPrefix(message, ".pong"): - h.pong <- struct{}{} case strings.HasPrefix(message, ".syn close connection"): - h.SendCommand("ack close connection") - } -} + h.SendMessage(".ack close connection") + select { + case <-time.After(time.Second * 1): + logger.Debug("Shutting down client after timeout and sending ack to server") + h.withCancel.shutdown() + case <-h.ctx.Done(): + } -// Stop the handler. -func (h *baseHandler) Stop() { - select { - case <-h.stop: - default: - logger.Debug("Stopping base handler", h.server) - close(h.stop) + case strings.HasPrefix(message, ".run exitstatus"): + splitted := strings.Split(strings.TrimSuffix(message, "\n"), " ") + if len(splitted) != 3 { + logger.Error("Unable to retrieve exitstatus", message) + return + } + i, err := strconv.Atoi(splitted[2]) + if err != nil { + logger.Error("Unable to retrieve exitstatus", message, err) + return + } + h.status = i + logger.Debug("Retrieved exitstatus", h.status) } } diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go index 4738cd3..fcd8052 100644 --- a/internal/clients/handlers/clienthandler.go +++ b/internal/clients/handlers/clienthandler.go @@ -1,7 +1,7 @@ package handlers import ( - "github.com/mimecast/dtail/internal/logger" + "github.com/mimecast/dtail/internal/io/logger" ) // ClientHandler is the basic client handler interface. @@ -10,7 +10,7 @@ type ClientHandler struct { } // NewClientHandler creates a new client handler. -func NewClientHandler(server string, pingTimeout int) *ClientHandler { +func NewClientHandler(server string) *ClientHandler { logger.Debug(server, "Creating new client handler") return &ClientHandler{ @@ -18,9 +18,10 @@ func NewClientHandler(server string, pingTimeout int) *ClientHandler { server: server, shellStarted: false, commands: make(chan string), - pong: make(chan struct{}, 1), - stop: make(chan struct{}), - pingTimeout: pingTimeout, + status: -1, + withCancel: withCancel{ + done: make(chan struct{}), + }, }, } } diff --git a/internal/clients/handlers/handler.go b/internal/clients/handlers/handler.go index 2013be0..c53ca34 100644 --- a/internal/clients/handlers/handler.go +++ b/internal/clients/handlers/handler.go @@ -1,12 +1,16 @@ package handlers -import "io" +import ( + "context" + "io" +) // Handler provides all methods which can be run on any client handler. type Handler interface { io.ReadWriter - Ping() error - Stop() - SendCommand(command string) error + SendMessage(command string) error Server() string + Status() int + WithCancel(ctx context.Context) (context.Context, context.CancelFunc) + Done() <-chan struct{} } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 4051e2c..9051015 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -8,6 +8,7 @@ import ( // HealthHandler implements the handler required for health checks. type HealthHandler struct { + withCancel // Buffer of incoming data from server. receiveBuf []byte // To send commands to the server. @@ -16,6 +17,7 @@ type HealthHandler struct { receive chan<- string // The remote server address server string + status int } // NewHealthHandler returns a new health check handler. @@ -24,6 +26,10 @@ func NewHealthHandler(server string, receive chan<- string) *HealthHandler { server: server, receive: receive, commands: make(chan string), + status: -1, + withCancel: withCancel{ + done: make(chan struct{}), + }, } return &h @@ -34,18 +40,13 @@ func (h *HealthHandler) Server() string { return h.server } -// Stop is not of use for health check handler. -func (h *HealthHandler) Stop() { - // Nothing done here. +// Status of the handler. +func (h *HealthHandler) Status() int { + return h.status } -// Ping is not of use for health check handler. -func (h *HealthHandler) Ping() error { - return nil -} - -// SendCommand send a DTail command to the server. -func (h *HealthHandler) SendCommand(command string) error { +// SendMessage sends a DTail command to the server. +func (h *HealthHandler) SendMessage(command string) error { select { case h.commands <- fmt.Sprintf("%s;", command): case <-time.NewTimer(time.Second * 10).C: diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index d76cdfd..874bb7d 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -1,10 +1,11 @@ package handlers import ( - "github.com/mimecast/dtail/internal/logger" + "strings" + + "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" - "strings" ) // MaprHandler is the handler used on the client side for running mapreduce aggregations. @@ -16,15 +17,16 @@ type MaprHandler struct { } // NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet, pingTimeout int) *MaprHandler { +func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler { return &MaprHandler{ baseHandler: baseHandler{ server: server, shellStarted: false, commands: make(chan string), - pong: make(chan struct{}, 1), - stop: make(chan struct{}), - pingTimeout: pingTimeout, + status: -1, + withCancel: withCancel{ + done: make(chan struct{}), + }, }, query: query, aggregate: client.NewAggregate(server, query, globalGroup), @@ -65,10 +67,3 @@ func (h *MaprHandler) handleAggregateMessage(message string) { h.aggregate.Aggregate(parts[2:]) logger.Debug("Aggregated aggregate data", h.server, h.count) } - -// Stop stops the mapreduce client handler. -func (h *MaprHandler) Stop() { - logger.Debug("Stopping mapreduce handler", h.server) - h.aggregate.Stop() - h.baseHandler.Stop() -} diff --git a/internal/clients/handlers/withcancel.go b/internal/clients/handlers/withcancel.go new file mode 100644 index 0000000..7c9cf4e --- /dev/null +++ b/internal/clients/handlers/withcancel.go @@ -0,0 +1,24 @@ +package handlers + +import "context" + +type withCancel struct { + ctx context.Context + done chan struct{} +} + +// WithCancel sets and returns the context used. +func (w *withCancel) WithCancel(ctx context.Context) (context.Context, context.CancelFunc) { + cancelCtx, cancel := context.WithCancel(ctx) + w.ctx = cancelCtx + + return cancelCtx, cancel +} + +func (w *withCancel) Done() <-chan struct{} { + return w.done +} + +func (w *withCancel) shutdown() { + close(w.done) +} |
