summaryrefslogtreecommitdiff
path: root/clients/handlers
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-09 20:30:15 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-01-09 20:30:15 +0000
commit3755a9911ecb05886577095f2b8cc8b9e4066a3a (patch)
tree86e24bc466986cb5c9c6d167a918e6064defeafc /clients/handlers
Release of DTail v1.0.0v1.0.0
Diffstat (limited to 'clients/handlers')
-rw-r--r--clients/handlers/basehandler.go134
-rw-r--r--clients/handlers/clienthandler.go26
-rw-r--r--clients/handlers/handler.go12
-rw-r--r--clients/handlers/healthhandler.go75
-rw-r--r--clients/handlers/maprhandler.go74
5 files changed, 321 insertions, 0 deletions
diff --git a/clients/handlers/basehandler.go b/clients/handlers/basehandler.go
new file mode 100644
index 0000000..ce82aa2
--- /dev/null
+++ b/clients/handlers/basehandler.go
@@ -0,0 +1,134 @@
+package handlers
+
+import (
+ "dtail/logger"
+ "errors"
+ "fmt"
+ "io"
+ "strings"
+ "time"
+)
+
+type baseHandler struct {
+ server string
+ shellStarted bool
+ commands chan string
+ pong chan struct{}
+ receiveBuf []byte
+ stop chan struct{}
+ pingTimeout int
+}
+
+func (h *baseHandler) Server() string {
+ return h.server
+}
+
+// Used to determine whether server is still responding to requests or not.
+func (h *baseHandler) Ping() error {
+ if h.pingTimeout == 0 {
+ // Server ping disabled
+ return nil
+ }
+
+ if err := h.SendCommand("ping"); err != nil {
+ return err
+ }
+
+ select {
+ case <-h.pong:
+ return nil
+ case <-time.After(time.Duration(h.pingTimeout) * time.Second):
+ }
+
+ return errors.New("Didn't receive any server pongs (ping replies)")
+}
+
+func (h *baseHandler) SendCommand(command string) error {
+ if command == "ping" {
+ logger.Trace("Sending command", h.server, command)
+ } else {
+ logger.Debug("Sending command", h.server, command)
+ }
+
+ select {
+ case h.commands <- fmt.Sprintf("%s;", command):
+ case <-time.After(time.Second * 5):
+ return errors.New("Timed out sending command " + command)
+ case <-h.stop:
+ }
+
+ return nil
+}
+
+// Read data from the dtail server via Writer interface.
+func (h *baseHandler) Write(p []byte) (n int, err error) {
+ for _, b := range p {
+ h.receiveBuf = append(h.receiveBuf, b)
+ if b == '\n' {
+ if len(h.receiveBuf) == 0 {
+ continue
+ }
+ message := string(h.receiveBuf)
+ h.handleMessageType(message)
+ }
+ }
+
+ return len(p), nil
+}
+
+// Send data to the dtail server via Reader interface.
+func (h *baseHandler) Read(p []byte) (n int, err error) {
+ select {
+ case command := <-h.commands:
+ n = copy(p, []byte(command))
+ case <-h.stop:
+ return 0, io.EOF
+ }
+ return
+}
+
+// Handle various message types.
+func (h *baseHandler) handleMessageType(message string) {
+ if len(h.receiveBuf) == 0 {
+ return
+ }
+ // Hidden server commands starti with a dot "."
+ if h.receiveBuf[0] == '.' {
+ h.handleHiddenMessage(message)
+ h.receiveBuf = h.receiveBuf[:0]
+ return
+ }
+
+ // Silent mode will only print out remote logs but not remote server
+ // commands. But remote server commands will be still logged to ./log/.
+ if logger.Mode == logger.SilentMode {
+ if h.receiveBuf[0] == 'R' {
+ logger.Raw(message)
+ }
+ h.receiveBuf = h.receiveBuf[:0]
+ return
+ }
+ logger.Raw(message)
+ h.receiveBuf = h.receiveBuf[:0]
+}
+
+// Handle messages received from server which are not meant to be displayed
+// to the end user.
+func (h *baseHandler) handleHiddenMessage(message string) {
+ switch {
+ case strings.HasPrefix(message, ".pong"):
+ h.pong <- struct{}{}
+ case strings.HasPrefix(message, ".syn close connection"):
+ h.SendCommand("ack close connection")
+ }
+}
+
+// Stop the handler.
+func (h *baseHandler) Stop() {
+ select {
+ case <-h.stop:
+ default:
+ logger.Debug("Stopping base handler", h.server)
+ close(h.stop)
+ }
+}
diff --git a/clients/handlers/clienthandler.go b/clients/handlers/clienthandler.go
new file mode 100644
index 0000000..e818b52
--- /dev/null
+++ b/clients/handlers/clienthandler.go
@@ -0,0 +1,26 @@
+package handlers
+
+import (
+ "dtail/logger"
+)
+
+// ClientHandler is the basic client handler interface.
+type ClientHandler struct {
+ baseHandler
+}
+
+// NewClientHandler creates a new client handler.
+func NewClientHandler(server string, pingTimeout int) *ClientHandler {
+ logger.Debug(server, "Creating new client handler")
+
+ return &ClientHandler{
+ baseHandler{
+ server: server,
+ shellStarted: false,
+ commands: make(chan string),
+ pong: make(chan struct{}, 1),
+ stop: make(chan struct{}),
+ pingTimeout: pingTimeout,
+ },
+ }
+}
diff --git a/clients/handlers/handler.go b/clients/handlers/handler.go
new file mode 100644
index 0000000..2013be0
--- /dev/null
+++ b/clients/handlers/handler.go
@@ -0,0 +1,12 @@
+package handlers
+
+import "io"
+
+// Handler provides all methods which can be run on any client handler.
+type Handler interface {
+ io.ReadWriter
+ Ping() error
+ Stop()
+ SendCommand(command string) error
+ Server() string
+}
diff --git a/clients/handlers/healthhandler.go b/clients/handlers/healthhandler.go
new file mode 100644
index 0000000..4051e2c
--- /dev/null
+++ b/clients/handlers/healthhandler.go
@@ -0,0 +1,75 @@
+package handlers
+
+import (
+ "errors"
+ "fmt"
+ "time"
+)
+
+// HealthHandler implements the handler required for health checks.
+type HealthHandler struct {
+ // Buffer of incoming data from server.
+ receiveBuf []byte
+ // To send commands to the server.
+ commands chan string
+ // To receive messages from the server.
+ receive chan<- string
+ // The remote server address
+ server string
+}
+
+// NewHealthHandler returns a new health check handler.
+func NewHealthHandler(server string, receive chan<- string) *HealthHandler {
+ h := HealthHandler{
+ server: server,
+ receive: receive,
+ commands: make(chan string),
+ }
+
+ return &h
+}
+
+// Server returns the remote server name.
+func (h *HealthHandler) Server() string {
+ return h.server
+}
+
+// Stop is not of use for health check handler.
+func (h *HealthHandler) Stop() {
+ // Nothing done here.
+}
+
+// Ping is not of use for health check handler.
+func (h *HealthHandler) Ping() error {
+ return nil
+}
+
+// SendCommand send a DTail command to the server.
+func (h *HealthHandler) SendCommand(command string) error {
+ select {
+ case h.commands <- fmt.Sprintf("%s;", command):
+ case <-time.NewTimer(time.Second * 10).C:
+ return errors.New("Timed out sending command " + command)
+ }
+
+ return nil
+}
+
+// Server writes byte stream to client.
+func (h *HealthHandler) Write(p []byte) (n int, err error) {
+ for _, b := range p {
+ h.receiveBuf = append(h.receiveBuf, b)
+ if b == '\n' {
+ h.receive <- string(h.receiveBuf)
+ h.receiveBuf = h.receiveBuf[:0]
+ }
+ }
+
+ return len(p), nil
+}
+
+// Server reads byte stream from client.
+func (h *HealthHandler) Read(p []byte) (n int, err error) {
+ n = copy(p, []byte(<-h.commands))
+ return
+}
diff --git a/clients/handlers/maprhandler.go b/clients/handlers/maprhandler.go
new file mode 100644
index 0000000..830a142
--- /dev/null
+++ b/clients/handlers/maprhandler.go
@@ -0,0 +1,74 @@
+package handlers
+
+import (
+ "dtail/logger"
+ "dtail/mapr"
+ "dtail/mapr/client"
+ "strings"
+)
+
+// MaprHandler is the handler used on the client side for running mapreduce aggregations.
+type MaprHandler struct {
+ baseHandler
+ aggregate *client.Aggregate
+ query *mapr.Query
+ count uint64
+}
+
+// NewMaprHandler returns a new mapreduce client handler.
+func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet, pingTimeout int) *MaprHandler {
+ return &MaprHandler{
+ baseHandler: baseHandler{
+ server: server,
+ shellStarted: false,
+ commands: make(chan string),
+ pong: make(chan struct{}, 1),
+ stop: make(chan struct{}),
+ pingTimeout: pingTimeout,
+ },
+ query: query,
+ aggregate: client.NewAggregate(server, query, globalGroup),
+ }
+}
+
+// Read data from the dtail server via Writer interface.
+func (h *MaprHandler) Write(p []byte) (n int, err error) {
+ for _, b := range p {
+ h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b)
+ if b == '\n' {
+ if len(h.baseHandler.receiveBuf) == 0 {
+ continue
+ }
+ message := string(h.baseHandler.receiveBuf)
+
+ if h.baseHandler.receiveBuf[0] == 'A' {
+ h.handleAggregateMessage(strings.TrimSpace(message))
+ h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0]
+ continue
+ }
+ h.baseHandler.handleMessageType(message)
+ }
+ }
+
+ return len(p), nil
+}
+
+// Handle a message received from server including mapr aggregation
+// related data.
+func (h *MaprHandler) handleAggregateMessage(message string) {
+ h.count++
+ parts := strings.Split(message, "|")
+
+ // Index 0 contains 'AGGREGATE', 1 contains server host.
+ // Aggregation data begins from index 2.
+ logger.Debug("Received aggregate data", h.server, h.count)
+ h.aggregate.Aggregate(parts[2:])
+ logger.Debug("Aggregated aggregate data", h.server, h.count)
+}
+
+// Stop stops the mapreduce client handler.
+func (h *MaprHandler) Stop() {
+ logger.Debug("Stopping mapreduce handler", h.server)
+ h.aggregate.Stop()
+ h.baseHandler.Stop()
+}