diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
| commit | f4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch) | |
| tree | ea5e4a2d2a67035f645bdee496ae55a52034178a /internal/clients/handlers | |
| parent | d80d6070557e3a800e3a54967af9eced518f116b (diff) | |
| parent | 739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff) | |
merge develop
Diffstat (limited to 'internal/clients/handlers')
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 77 | ||||
| -rw-r--r-- | internal/clients/handlers/clienthandler.go | 4 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 106 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 56 |
4 files changed, 113 insertions, 130 deletions
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 602a7ac..b520c25 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "encoding/base64" "fmt" "io" @@ -8,8 +9,8 @@ import ( "time" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/version" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/protocol" ) type baseHandler struct { @@ -17,10 +18,20 @@ type baseHandler struct { server string shellStarted bool commands chan string - receiveBuf []byte + receiveBuf bytes.Buffer status int } +func (h *baseHandler) String() string { + return fmt.Sprintf("baseHandler(%s,server:%s,shellStarted:%v,status:%d)@%p", + h.done, + h.server, + h.shellStarted, + h.status, + h, + ) +} + func (h *baseHandler) Server() string { return h.server } @@ -29,21 +40,13 @@ func (h *baseHandler) Status() int { return h.status } -func (h *baseHandler) Done() <-chan struct{} { - return h.done.Done() -} - -func (h *baseHandler) Shutdown() { - h.done.Shutdown() -} - // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) - logger.Debug("Sending command", h.server, command, encoded) + dlog.Client.Debug("Sending command", h.server, command, encoded) select { - case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded): + case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded): case <-time.After(time.Second * 5): return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) case <-h.Done(): @@ -56,13 +59,20 @@ func (h *baseHandler) SendMessage(command string) error { // Read data from the dtail server via Writer interface. func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { - if len(h.receiveBuf) == 0 { + switch b { + /* + // NEXT: Next DTail version make it so that '\n' gets ignored. For now + // leave it for compatibility with older DTail server + ability to display + // the protocol mismatch warn message. + case '\n' { continue - } - message := string(h.receiveBuf) - h.handleMessageType(message) + */ + case '\n', protocol.MessageDelimiter: + message := h.receiveBuf.String() + h.handleMessage(message) + h.receiveBuf.Reset() + default: + h.receiveBuf.WriteByte(b) } } @@ -77,31 +87,32 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.Done(): return 0, io.EOF } - return } -// Handle various message types. -func (h *baseHandler) handleMessageType(message string) { - if len(h.receiveBuf) == 0 { - return - } - - // Hidden server commands starti with a dot "." - if h.receiveBuf[0] == '.' { +func (h *baseHandler) handleMessage(message string) { + if len(message) > 0 && message[0] == '.' { h.handleHiddenMessage(message) - h.receiveBuf = h.receiveBuf[:0] return } - logger.Raw(message) - h.receiveBuf = h.receiveBuf[:0] + dlog.Client.Raw(message) } // Handle messages received from server which are not meant to be displayed // to the end user. func (h *baseHandler) handleHiddenMessage(message string) { - if strings.HasPrefix(message, ".syn close connection") { - h.SendMessage(".ack close connection") + switch { + case strings.HasPrefix(message, ".syn close connection"): + go h.SendMessage(".ack close connection") + h.Shutdown() } } + +func (h *baseHandler) Done() <-chan struct{} { + return h.done.Done() +} + +func (h *baseHandler) Shutdown() { + h.done.Shutdown() +} diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go index 2bcb038..27ac85e 100644 --- a/internal/clients/handlers/clienthandler.go +++ b/internal/clients/handlers/clienthandler.go @@ -2,7 +2,7 @@ package handlers import ( "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // ClientHandler is the basic client handler interface. @@ -12,7 +12,7 @@ type ClientHandler struct { // NewClientHandler creates a new client handler. func NewClientHandler(server string) *ClientHandler { - logger.Debug(server, "Creating new client handler") + dlog.Client.Debug(server, "Creating new client handler") return &ClientHandler{ baseHandler{ diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 0440706..47b594e 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -1,88 +1,56 @@ package handlers import ( - "errors" - "fmt" - "time" + "strings" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/protocol" ) -// HealthHandler implements the handler required for health checks. +// HealthHandler is the handler used on the client side for running mapreduce +// aggregations. type HealthHandler struct { - done *internal.Done - // Buffer of incoming data from server. - receiveBuf []byte - // To send commands to the server. - commands chan string - // To receive messages from the server. - receive chan<- string - // The remote server address - server string - // The return status. - status int -} - -// NewHealthHandler returns a new health check handler. -func NewHealthHandler(server string, receive chan<- string) *HealthHandler { - h := HealthHandler{ - server: server, - receive: receive, - commands: make(chan string), - status: -1, - done: internal.NewDone(), - } - - return &h -} - -// Server returns the remote server name. -func (h *HealthHandler) Server() string { - return h.server -} - -// Status of the handler. -func (h *HealthHandler) Status() int { - return h.status -} - -// Done returns done channel of the handler. -func (h *HealthHandler) Done() <-chan struct{} { - return h.done.Done() -} - -// Shutdown the handler. -func (h *HealthHandler) Shutdown() { - h.done.Shutdown() -} - -// SendMessage sends a DTail command to the server. -func (h *HealthHandler) SendMessage(command string) error { - select { - case h.commands <- fmt.Sprintf("%s;", command): - case <-time.NewTimer(time.Second * 10).C: - return errors.New("Timed out sending command " + command) - case <-h.Done(): + baseHandler +} + +// NewHealthHandler returns a new health client handler. +func NewHealthHandler(server string) *HealthHandler { + dlog.Client.Debug(server, "Creating new health handler") + return &HealthHandler{ + baseHandler: baseHandler{ + server: server, + shellStarted: false, + commands: make(chan string), + status: 2, // Assume CRITICAL status by default. + done: internal.NewDone(), + }, } - - return nil } -// Server writes byte stream to client. +// Read data from the dtail server via Writer interface. func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { - h.receive <- string(h.receiveBuf) - h.receiveBuf = h.receiveBuf[:0] + switch b { + case '\n', protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + h.handleMessage(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } - return len(p), nil } -// Server reads byte stream from client. -func (h *HealthHandler) Read(p []byte) (n int, err error) { - n = copy(p, []byte(<-h.commands)) - return +func (h *HealthHandler) handleMessage(message string) { + if len(message) > 0 && message[0] == '.' { + h.baseHandler.handleHiddenMessage(message) + return + } + s := strings.Split(message, protocol.FieldDelimiter) + message = s[len(s)-1] + if message == "OK" { + h.baseHandler.status = 0 + } } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index fb71c8f..8718b35 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -4,21 +4,24 @@ import ( "strings" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" + "github.com/mimecast/dtail/internal/protocol" ) -// MaprHandler is the handler used on the client side for running mapreduce aggregations. +// MaprHandler is the handler used on the client side for running mapreduce +// aggregations. type MaprHandler struct { baseHandler aggregate *client.Aggregate query *mapr.Query - count uint64 } // NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler { +func NewMaprHandler(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *MaprHandler { + return &MaprHandler{ baseHandler: baseHandler{ server: server, @@ -35,34 +38,35 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr // Read data from the dtail server via Writer interface. func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == '\n' { - if len(h.baseHandler.receiveBuf) == 0 { - continue - } - message := string(h.baseHandler.receiveBuf) - - if h.baseHandler.receiveBuf[0] == 'A' { - h.handleAggregateMessage(strings.TrimSpace(message)) - h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] - continue + switch b { + case '\n': + continue + case protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + dlog.Client.Debug(message) + if message[0] == 'A' { + h.handleAggregateMessage(message) + } else { + h.baseHandler.handleMessage(message) } - h.baseHandler.handleMessageType(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } return len(p), nil } -// Handle a message received from server including mapr aggregation -// related data. +// Handle a message received from server including mapr aggregation related data. func (h *MaprHandler) handleAggregateMessage(message string) { - h.count++ - parts := strings.Split(message, "➔") - - // Index 0 contains 'AGGREGATE', 1 contains server host. - // Aggregation data begins from index 2. - logger.Debug("Received aggregate data", h.server, h.count, parts) - h.aggregate.Aggregate(parts[2:]) - logger.Debug("Aggregated aggregate data", h.server, h.count) + parts := strings.SplitN(message, protocol.FieldDelimiter, 3) + if len(parts) != 3 { + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, + len(parts), "expected 3 parts") + return + } + if err := h.aggregate.Aggregate(parts[2]); err != nil { + dlog.Client.Error("Unable to aggregate data", h.server, message, err) + } } |
