diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-01-09 20:30:15 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-01-09 20:30:15 +0000 |
| commit | 3755a9911ecb05886577095f2b8cc8b9e4066a3a (patch) | |
| tree | 86e24bc466986cb5c9c6d167a918e6064defeafc /clients/handlers | |
Release of DTail v1.0.0v1.0.0
Diffstat (limited to 'clients/handlers')
| -rw-r--r-- | clients/handlers/basehandler.go | 134 | ||||
| -rw-r--r-- | clients/handlers/clienthandler.go | 26 | ||||
| -rw-r--r-- | clients/handlers/handler.go | 12 | ||||
| -rw-r--r-- | clients/handlers/healthhandler.go | 75 | ||||
| -rw-r--r-- | clients/handlers/maprhandler.go | 74 |
5 files changed, 321 insertions, 0 deletions
diff --git a/clients/handlers/basehandler.go b/clients/handlers/basehandler.go new file mode 100644 index 0000000..ce82aa2 --- /dev/null +++ b/clients/handlers/basehandler.go @@ -0,0 +1,134 @@ +package handlers + +import ( + "dtail/logger" + "errors" + "fmt" + "io" + "strings" + "time" +) + +type baseHandler struct { + server string + shellStarted bool + commands chan string + pong chan struct{} + receiveBuf []byte + stop chan struct{} + pingTimeout int +} + +func (h *baseHandler) Server() string { + return h.server +} + +// Used to determine whether server is still responding to requests or not. +func (h *baseHandler) Ping() error { + if h.pingTimeout == 0 { + // Server ping disabled + return nil + } + + if err := h.SendCommand("ping"); err != nil { + return err + } + + select { + case <-h.pong: + return nil + case <-time.After(time.Duration(h.pingTimeout) * time.Second): + } + + return errors.New("Didn't receive any server pongs (ping replies)") +} + +func (h *baseHandler) SendCommand(command string) error { + if command == "ping" { + logger.Trace("Sending command", h.server, command) + } else { + logger.Debug("Sending command", h.server, command) + } + + select { + case h.commands <- fmt.Sprintf("%s;", command): + case <-time.After(time.Second * 5): + return errors.New("Timed out sending command " + command) + case <-h.stop: + } + + return nil +} + +// Read data from the dtail server via Writer interface. +func (h *baseHandler) Write(p []byte) (n int, err error) { + for _, b := range p { + h.receiveBuf = append(h.receiveBuf, b) + if b == '\n' { + if len(h.receiveBuf) == 0 { + continue + } + message := string(h.receiveBuf) + h.handleMessageType(message) + } + } + + return len(p), nil +} + +// Send data to the dtail server via Reader interface. +func (h *baseHandler) Read(p []byte) (n int, err error) { + select { + case command := <-h.commands: + n = copy(p, []byte(command)) + case <-h.stop: + return 0, io.EOF + } + return +} + +// Handle various message types. +func (h *baseHandler) handleMessageType(message string) { + if len(h.receiveBuf) == 0 { + return + } + // Hidden server commands starti with a dot "." + if h.receiveBuf[0] == '.' { + h.handleHiddenMessage(message) + h.receiveBuf = h.receiveBuf[:0] + return + } + + // Silent mode will only print out remote logs but not remote server + // commands. But remote server commands will be still logged to ./log/. + if logger.Mode == logger.SilentMode { + if h.receiveBuf[0] == 'R' { + logger.Raw(message) + } + h.receiveBuf = h.receiveBuf[:0] + return + } + logger.Raw(message) + h.receiveBuf = h.receiveBuf[:0] +} + +// Handle messages received from server which are not meant to be displayed +// to the end user. +func (h *baseHandler) handleHiddenMessage(message string) { + switch { + case strings.HasPrefix(message, ".pong"): + h.pong <- struct{}{} + case strings.HasPrefix(message, ".syn close connection"): + h.SendCommand("ack close connection") + } +} + +// Stop the handler. +func (h *baseHandler) Stop() { + select { + case <-h.stop: + default: + logger.Debug("Stopping base handler", h.server) + close(h.stop) + } +} diff --git a/clients/handlers/clienthandler.go b/clients/handlers/clienthandler.go new file mode 100644 index 0000000..e818b52 --- /dev/null +++ b/clients/handlers/clienthandler.go @@ -0,0 +1,26 @@ +package handlers + +import ( + "dtail/logger" +) + +// ClientHandler is the basic client handler interface. +type ClientHandler struct { + baseHandler +} + +// NewClientHandler creates a new client handler. +func NewClientHandler(server string, pingTimeout int) *ClientHandler { + logger.Debug(server, "Creating new client handler") + + return &ClientHandler{ + baseHandler{ + server: server, + shellStarted: false, + commands: make(chan string), + pong: make(chan struct{}, 1), + stop: make(chan struct{}), + pingTimeout: pingTimeout, + }, + } +} diff --git a/clients/handlers/handler.go b/clients/handlers/handler.go new file mode 100644 index 0000000..2013be0 --- /dev/null +++ b/clients/handlers/handler.go @@ -0,0 +1,12 @@ +package handlers + +import "io" + +// Handler provides all methods which can be run on any client handler. +type Handler interface { + io.ReadWriter + Ping() error + Stop() + SendCommand(command string) error + Server() string +} diff --git a/clients/handlers/healthhandler.go b/clients/handlers/healthhandler.go new file mode 100644 index 0000000..4051e2c --- /dev/null +++ b/clients/handlers/healthhandler.go @@ -0,0 +1,75 @@ +package handlers + +import ( + "errors" + "fmt" + "time" +) + +// HealthHandler implements the handler required for health checks. +type HealthHandler struct { + // Buffer of incoming data from server. + receiveBuf []byte + // To send commands to the server. + commands chan string + // To receive messages from the server. + receive chan<- string + // The remote server address + server string +} + +// NewHealthHandler returns a new health check handler. +func NewHealthHandler(server string, receive chan<- string) *HealthHandler { + h := HealthHandler{ + server: server, + receive: receive, + commands: make(chan string), + } + + return &h +} + +// Server returns the remote server name. +func (h *HealthHandler) Server() string { + return h.server +} + +// Stop is not of use for health check handler. +func (h *HealthHandler) Stop() { + // Nothing done here. +} + +// Ping is not of use for health check handler. +func (h *HealthHandler) Ping() error { + return nil +} + +// SendCommand send a DTail command to the server. +func (h *HealthHandler) SendCommand(command string) error { + select { + case h.commands <- fmt.Sprintf("%s;", command): + case <-time.NewTimer(time.Second * 10).C: + return errors.New("Timed out sending command " + command) + } + + return nil +} + +// Server writes byte stream to client. +func (h *HealthHandler) Write(p []byte) (n int, err error) { + for _, b := range p { + h.receiveBuf = append(h.receiveBuf, b) + if b == '\n' { + h.receive <- string(h.receiveBuf) + h.receiveBuf = h.receiveBuf[:0] + } + } + + return len(p), nil +} + +// Server reads byte stream from client. +func (h *HealthHandler) Read(p []byte) (n int, err error) { + n = copy(p, []byte(<-h.commands)) + return +} diff --git a/clients/handlers/maprhandler.go b/clients/handlers/maprhandler.go new file mode 100644 index 0000000..830a142 --- /dev/null +++ b/clients/handlers/maprhandler.go @@ -0,0 +1,74 @@ +package handlers + +import ( + "dtail/logger" + "dtail/mapr" + "dtail/mapr/client" + "strings" +) + +// MaprHandler is the handler used on the client side for running mapreduce aggregations. +type MaprHandler struct { + baseHandler + aggregate *client.Aggregate + query *mapr.Query + count uint64 +} + +// NewMaprHandler returns a new mapreduce client handler. +func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet, pingTimeout int) *MaprHandler { + return &MaprHandler{ + baseHandler: baseHandler{ + server: server, + shellStarted: false, + commands: make(chan string), + pong: make(chan struct{}, 1), + stop: make(chan struct{}), + pingTimeout: pingTimeout, + }, + query: query, + aggregate: client.NewAggregate(server, query, globalGroup), + } +} + +// Read data from the dtail server via Writer interface. +func (h *MaprHandler) Write(p []byte) (n int, err error) { + for _, b := range p { + h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) + if b == '\n' { + if len(h.baseHandler.receiveBuf) == 0 { + continue + } + message := string(h.baseHandler.receiveBuf) + + if h.baseHandler.receiveBuf[0] == 'A' { + h.handleAggregateMessage(strings.TrimSpace(message)) + h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] + continue + } + h.baseHandler.handleMessageType(message) + } + } + + return len(p), nil +} + +// Handle a message received from server including mapr aggregation +// related data. +func (h *MaprHandler) handleAggregateMessage(message string) { + h.count++ + parts := strings.Split(message, "|") + + // Index 0 contains 'AGGREGATE', 1 contains server host. + // Aggregation data begins from index 2. + logger.Debug("Received aggregate data", h.server, h.count) + h.aggregate.Aggregate(parts[2:]) + logger.Debug("Aggregated aggregate data", h.server, h.count) +} + +// Stop stops the mapreduce client handler. +func (h *MaprHandler) Stop() { + logger.Debug("Stopping mapreduce handler", h.server) + h.aggregate.Stop() + h.baseHandler.Stop() +} |
