summaryrefslogtreecommitdiff
path: root/internal/clients/handlers
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2021-10-21 21:28:49 +0300
committerPaul Buetow <pbuetow@mimecast.com>2021-10-21 21:28:49 +0300
commitf4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch)
treeea5e4a2d2a67035f645bdee496ae55a52034178a /internal/clients/handlers
parentd80d6070557e3a800e3a54967af9eced518f116b (diff)
parent739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff)
merge develop
Diffstat (limited to 'internal/clients/handlers')
-rw-r--r--internal/clients/handlers/basehandler.go77
-rw-r--r--internal/clients/handlers/clienthandler.go4
-rw-r--r--internal/clients/handlers/healthhandler.go106
-rw-r--r--internal/clients/handlers/maprhandler.go56
4 files changed, 113 insertions, 130 deletions
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 602a7ac..b520c25 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"encoding/base64"
"fmt"
"io"
@@ -8,8 +9,8 @@ import (
"time"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/version"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/protocol"
)
type baseHandler struct {
@@ -17,10 +18,20 @@ type baseHandler struct {
server string
shellStarted bool
commands chan string
- receiveBuf []byte
+ receiveBuf bytes.Buffer
status int
}
+func (h *baseHandler) String() string {
+ return fmt.Sprintf("baseHandler(%s,server:%s,shellStarted:%v,status:%d)@%p",
+ h.done,
+ h.server,
+ h.shellStarted,
+ h.status,
+ h,
+ )
+}
+
func (h *baseHandler) Server() string {
return h.server
}
@@ -29,21 +40,13 @@ func (h *baseHandler) Status() int {
return h.status
}
-func (h *baseHandler) Done() <-chan struct{} {
- return h.done.Done()
-}
-
-func (h *baseHandler) Shutdown() {
- h.done.Shutdown()
-}
-
// SendMessage to the server.
func (h *baseHandler) SendMessage(command string) error {
encoded := base64.StdEncoding.EncodeToString([]byte(command))
- logger.Debug("Sending command", h.server, command, encoded)
+ dlog.Client.Debug("Sending command", h.server, command, encoded)
select {
- case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded):
+ case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded):
case <-time.After(time.Second * 5):
return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded)
case <-h.Done():
@@ -56,13 +59,20 @@ func (h *baseHandler) SendMessage(command string) error {
// Read data from the dtail server via Writer interface.
func (h *baseHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.receiveBuf = append(h.receiveBuf, b)
- if b == '\n' {
- if len(h.receiveBuf) == 0 {
+ switch b {
+ /*
+ // NEXT: Next DTail version make it so that '\n' gets ignored. For now
+ // leave it for compatibility with older DTail server + ability to display
+ // the protocol mismatch warn message.
+ case '\n' {
continue
- }
- message := string(h.receiveBuf)
- h.handleMessageType(message)
+ */
+ case '\n', protocol.MessageDelimiter:
+ message := h.receiveBuf.String()
+ h.handleMessage(message)
+ h.receiveBuf.Reset()
+ default:
+ h.receiveBuf.WriteByte(b)
}
}
@@ -77,31 +87,32 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
case <-h.Done():
return 0, io.EOF
}
-
return
}
-// Handle various message types.
-func (h *baseHandler) handleMessageType(message string) {
- if len(h.receiveBuf) == 0 {
- return
- }
-
- // Hidden server commands starti with a dot "."
- if h.receiveBuf[0] == '.' {
+func (h *baseHandler) handleMessage(message string) {
+ if len(message) > 0 && message[0] == '.' {
h.handleHiddenMessage(message)
- h.receiveBuf = h.receiveBuf[:0]
return
}
- logger.Raw(message)
- h.receiveBuf = h.receiveBuf[:0]
+ dlog.Client.Raw(message)
}
// Handle messages received from server which are not meant to be displayed
// to the end user.
func (h *baseHandler) handleHiddenMessage(message string) {
- if strings.HasPrefix(message, ".syn close connection") {
- h.SendMessage(".ack close connection")
+ switch {
+ case strings.HasPrefix(message, ".syn close connection"):
+ go h.SendMessage(".ack close connection")
+ h.Shutdown()
}
}
+
+func (h *baseHandler) Done() <-chan struct{} {
+ return h.done.Done()
+}
+
+func (h *baseHandler) Shutdown() {
+ h.done.Shutdown()
+}
diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go
index 2bcb038..27ac85e 100644
--- a/internal/clients/handlers/clienthandler.go
+++ b/internal/clients/handlers/clienthandler.go
@@ -2,7 +2,7 @@ package handlers
import (
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// ClientHandler is the basic client handler interface.
@@ -12,7 +12,7 @@ type ClientHandler struct {
// NewClientHandler creates a new client handler.
func NewClientHandler(server string) *ClientHandler {
- logger.Debug(server, "Creating new client handler")
+ dlog.Client.Debug(server, "Creating new client handler")
return &ClientHandler{
baseHandler{
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 0440706..47b594e 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -1,88 +1,56 @@
package handlers
import (
- "errors"
- "fmt"
- "time"
+ "strings"
"github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/protocol"
)
-// HealthHandler implements the handler required for health checks.
+// HealthHandler is the handler used on the client side for running mapreduce
+// aggregations.
type HealthHandler struct {
- done *internal.Done
- // Buffer of incoming data from server.
- receiveBuf []byte
- // To send commands to the server.
- commands chan string
- // To receive messages from the server.
- receive chan<- string
- // The remote server address
- server string
- // The return status.
- status int
-}
-
-// NewHealthHandler returns a new health check handler.
-func NewHealthHandler(server string, receive chan<- string) *HealthHandler {
- h := HealthHandler{
- server: server,
- receive: receive,
- commands: make(chan string),
- status: -1,
- done: internal.NewDone(),
- }
-
- return &h
-}
-
-// Server returns the remote server name.
-func (h *HealthHandler) Server() string {
- return h.server
-}
-
-// Status of the handler.
-func (h *HealthHandler) Status() int {
- return h.status
-}
-
-// Done returns done channel of the handler.
-func (h *HealthHandler) Done() <-chan struct{} {
- return h.done.Done()
-}
-
-// Shutdown the handler.
-func (h *HealthHandler) Shutdown() {
- h.done.Shutdown()
-}
-
-// SendMessage sends a DTail command to the server.
-func (h *HealthHandler) SendMessage(command string) error {
- select {
- case h.commands <- fmt.Sprintf("%s;", command):
- case <-time.NewTimer(time.Second * 10).C:
- return errors.New("Timed out sending command " + command)
- case <-h.Done():
+ baseHandler
+}
+
+// NewHealthHandler returns a new health client handler.
+func NewHealthHandler(server string) *HealthHandler {
+ dlog.Client.Debug(server, "Creating new health handler")
+ return &HealthHandler{
+ baseHandler: baseHandler{
+ server: server,
+ shellStarted: false,
+ commands: make(chan string),
+ status: 2, // Assume CRITICAL status by default.
+ done: internal.NewDone(),
+ },
}
-
- return nil
}
-// Server writes byte stream to client.
+// Read data from the dtail server via Writer interface.
func (h *HealthHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.receiveBuf = append(h.receiveBuf, b)
- if b == '\n' {
- h.receive <- string(h.receiveBuf)
- h.receiveBuf = h.receiveBuf[:0]
+ switch b {
+ case '\n', protocol.MessageDelimiter:
+ message := h.baseHandler.receiveBuf.String()
+ h.handleMessage(message)
+ h.baseHandler.receiveBuf.Reset()
+ default:
+ h.baseHandler.receiveBuf.WriteByte(b)
}
}
-
return len(p), nil
}
-// Server reads byte stream from client.
-func (h *HealthHandler) Read(p []byte) (n int, err error) {
- n = copy(p, []byte(<-h.commands))
- return
+func (h *HealthHandler) handleMessage(message string) {
+ if len(message) > 0 && message[0] == '.' {
+ h.baseHandler.handleHiddenMessage(message)
+ return
+ }
+ s := strings.Split(message, protocol.FieldDelimiter)
+ message = s[len(s)-1]
+ if message == "OK" {
+ h.baseHandler.status = 0
+ }
}
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index fb71c8f..8718b35 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -4,21 +4,24 @@ import (
"strings"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
+ "github.com/mimecast/dtail/internal/protocol"
)
-// MaprHandler is the handler used on the client side for running mapreduce aggregations.
+// MaprHandler is the handler used on the client side for running mapreduce
+// aggregations.
type MaprHandler struct {
baseHandler
aggregate *client.Aggregate
query *mapr.Query
- count uint64
}
// NewMaprHandler returns a new mapreduce client handler.
-func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler {
+func NewMaprHandler(server string, query *mapr.Query,
+ globalGroup *mapr.GlobalGroupSet) *MaprHandler {
+
return &MaprHandler{
baseHandler: baseHandler{
server: server,
@@ -35,34 +38,35 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr
// Read data from the dtail server via Writer interface.
func (h *MaprHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b)
- if b == '\n' {
- if len(h.baseHandler.receiveBuf) == 0 {
- continue
- }
- message := string(h.baseHandler.receiveBuf)
-
- if h.baseHandler.receiveBuf[0] == 'A' {
- h.handleAggregateMessage(strings.TrimSpace(message))
- h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0]
- continue
+ switch b {
+ case '\n':
+ continue
+ case protocol.MessageDelimiter:
+ message := h.baseHandler.receiveBuf.String()
+ dlog.Client.Debug(message)
+ if message[0] == 'A' {
+ h.handleAggregateMessage(message)
+ } else {
+ h.baseHandler.handleMessage(message)
}
- h.baseHandler.handleMessageType(message)
+ h.baseHandler.receiveBuf.Reset()
+ default:
+ h.baseHandler.receiveBuf.WriteByte(b)
}
}
return len(p), nil
}
-// Handle a message received from server including mapr aggregation
-// related data.
+// Handle a message received from server including mapr aggregation related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
- h.count++
- parts := strings.Split(message, "➔")
-
- // Index 0 contains 'AGGREGATE', 1 contains server host.
- // Aggregation data begins from index 2.
- logger.Debug("Received aggregate data", h.server, h.count, parts)
- h.aggregate.Aggregate(parts[2:])
- logger.Debug("Aggregated aggregate data", h.server, h.count)
+ parts := strings.SplitN(message, protocol.FieldDelimiter, 3)
+ if len(parts) != 3 {
+ dlog.Client.Error("Unable to aggregate data", h.server, message, parts,
+ len(parts), "expected 3 parts")
+ return
+ }
+ if err := h.aggregate.Aggregate(parts[2]); err != nil {
+ dlog.Client.Error("Unable to aggregate data", h.server, message, err)
+ }
}