From c2522ffb59514443816a96386c16bb7527cbe57c Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 21 Aug 2021 14:54:24 +0300 Subject: read files bytewise for more control of whats happening - change transport protocol for more control over newlines --- internal/clients/handlers/basehandler.go | 6 +++--- internal/clients/handlers/maprhandler.go | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index f07fd90..acafe0e 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -9,7 +9,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/version" + "github.com/mimecast/dtail/internal/protocol" ) type baseHandler struct { @@ -43,7 +43,7 @@ func (h *baseHandler) SendMessage(command string) error { logger.Debug("Sending command", h.server, command, encoded) select { - case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded): + case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded): case <-time.After(time.Second * 5): return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) case <-h.Done(): @@ -57,7 +57,7 @@ func (h *baseHandler) SendMessage(command string) error { func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { + if b == protocol.MessageDelimiter { if len(h.receiveBuf) == 0 { continue } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index fb71c8f..7ac5895 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -7,6 +7,7 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" + "github.com/mimecast/dtail/internal/protocol" ) // MaprHandler is the handler used on the client side for running mapreduce aggregations. @@ -58,7 +59,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { // related data. func (h *MaprHandler) handleAggregateMessage(message string) { h.count++ - parts := strings.Split(message, "➔") + parts := strings.Split(message, protocol.AggregateDelimiter) // Index 0 contains 'AGGREGATE', 1 contains server host. // Aggregation data begins from index 2. -- cgit v1.2.3 From 9883a190109623b64e6d311dc2b462a6eae68003 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 22 Aug 2021 10:07:00 +0300 Subject: introduces the protocol package --- internal/clients/handlers/basehandler.go | 3 ++- internal/clients/handlers/healthhandler.go | 3 ++- internal/clients/handlers/maprhandler.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index acafe0e..fe83faa 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -56,13 +56,14 @@ func (h *baseHandler) SendMessage(command string) error { // Read data from the dtail server via Writer interface. func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) if b == protocol.MessageDelimiter { if len(h.receiveBuf) == 0 { continue } message := string(h.receiveBuf) h.handleMessageType(message) + } else { + h.receiveBuf = append(h.receiveBuf, b) } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 0440706..213748c 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -6,6 +6,7 @@ import ( "time" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/protocol" ) // HealthHandler implements the handler required for health checks. @@ -72,7 +73,7 @@ func (h *HealthHandler) SendMessage(command string) error { func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { + if b == protocol.MessageDelimiter { // '\n' { h.receive <- string(h.receiveBuf) h.receiveBuf = h.receiveBuf[:0] } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 7ac5895..afad507 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -37,7 +37,7 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == '\n' { + if b == protocol.MessageDelimiter { // '\n' { if len(h.baseHandler.receiveBuf) == 0 { continue } -- cgit v1.2.3 From cc89d3fb8be2465af276d7ef03ea2a8affd87b2e Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 6 Sep 2021 13:48:55 +0300 Subject: Print out client/server update notice even from dtail server 4 to dtail client 3. --- internal/clients/handlers/basehandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index fe83faa..63ceaac 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -56,7 +56,7 @@ func (h *baseHandler) SendMessage(command string) error { // Read data from the dtail server via Writer interface. func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { - if b == protocol.MessageDelimiter { + if b == protocol.MessageDelimiter || b == '\n' { if len(h.receiveBuf) == 0 { continue } -- cgit v1.2.3 From 16dc57e1e1c28e9d762424e596223a980770e059 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 8 Sep 2021 19:10:50 +0300 Subject: mapreduce tables are in colors now too --- internal/clients/handlers/basehandler.go | 29 +++++++++++++-------- internal/clients/handlers/healthhandler.go | 11 ++++---- internal/clients/handlers/maprhandler.go | 42 +++++++++++++++--------------- 3 files changed, 45 insertions(+), 37 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 63ceaac..51f33c1 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "encoding/base64" "fmt" "io" @@ -17,7 +18,7 @@ type baseHandler struct { server string shellStarted bool commands chan string - receiveBuf []byte + receiveBuf bytes.Buffer status int } @@ -56,14 +57,23 @@ func (h *baseHandler) SendMessage(command string) error { // Read data from the dtail server via Writer interface. func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { - if b == protocol.MessageDelimiter || b == '\n' { - if len(h.receiveBuf) == 0 { + switch b { + /* + // TODO: Next DTail version make it so that '\n' gets ignored. For now + // leave it for compatibility with older DTail server + ability to display + // the protocol mismatch warn message. + case '\n' { + continue + */ + case '\n', protocol.MessageDelimiter: + message := h.receiveBuf.String() + if len(message) == 0 { continue } - message := string(h.receiveBuf) h.handleMessageType(message) - } else { - h.receiveBuf = append(h.receiveBuf, b) + h.receiveBuf.Reset() + default: + h.receiveBuf.WriteByte(b) } } @@ -78,25 +88,22 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.Done(): return 0, io.EOF } - return } // Handle various message types. func (h *baseHandler) handleMessageType(message string) { - if len(h.receiveBuf) == 0 { + if len(message) == 0 { return } // Hidden server commands starti with a dot "." - if h.receiveBuf[0] == '.' { + if message[0] == '.' { h.handleHiddenMessage(message) - h.receiveBuf = h.receiveBuf[:0] return } logger.Raw(message) - h.receiveBuf = h.receiveBuf[:0] } // Handle messages received from server which are not meant to be displayed diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 213748c..eca0348 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "errors" "fmt" "time" @@ -13,7 +14,7 @@ import ( type HealthHandler struct { done *internal.Done // Buffer of incoming data from server. - receiveBuf []byte + receiveBuf bytes.Buffer // To send commands to the server. commands chan string // To receive messages from the server. @@ -72,10 +73,10 @@ func (h *HealthHandler) SendMessage(command string) error { // Server writes byte stream to client. func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) - if b == protocol.MessageDelimiter { // '\n' { - h.receive <- string(h.receiveBuf) - h.receiveBuf = h.receiveBuf[:0] + h.receiveBuf.WriteByte(b) + if b == protocol.MessageDelimiter { + h.receive <- h.receiveBuf.String() + h.receiveBuf.Reset() } } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index afad507..65b1454 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -15,7 +15,6 @@ type MaprHandler struct { baseHandler aggregate *client.Aggregate query *mapr.Query - count uint64 } // NewMaprHandler returns a new mapreduce client handler. @@ -36,19 +35,20 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr // Read data from the dtail server via Writer interface. func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == protocol.MessageDelimiter { // '\n' { - if len(h.baseHandler.receiveBuf) == 0 { - continue + switch b { + case '\n': + continue + case protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + logger.Debug(message) + if message[0] == 'A' { + h.handleAggregateMessage(message) + } else { + h.baseHandler.handleMessageType(message) } - message := string(h.baseHandler.receiveBuf) - - if h.baseHandler.receiveBuf[0] == 'A' { - h.handleAggregateMessage(strings.TrimSpace(message)) - h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] - continue - } - h.baseHandler.handleMessageType(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } @@ -58,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { // Handle a message received from server including mapr aggregation // related data. func (h *MaprHandler) handleAggregateMessage(message string) { - h.count++ - parts := strings.Split(message, protocol.AggregateDelimiter) - - // Index 0 contains 'AGGREGATE', 1 contains server host. - // Aggregation data begins from index 2. - logger.Debug("Received aggregate data", h.server, h.count, parts) - h.aggregate.Aggregate(parts[2:]) - logger.Debug("Aggregated aggregate data", h.server, h.count) + parts := strings.SplitN(message, protocol.FieldDelimiter, 3) + if len(parts) != 3 { + logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + return + } + if err := h.aggregate.Aggregate(parts[2]); err != nil { + logger.Error("Unable to aggregate data", h.server, message, err) + } } -- cgit v1.2.3 From 5e717af91e8012c72ec7dc0204420dea46f187db Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 18 Sep 2021 11:57:09 +0300 Subject: new docker test cases - also change default FATAL bg color to magenta --- internal/clients/handlers/basehandler.go | 7 ------- 1 file changed, 7 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 51f33c1..74559e9 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -112,12 +112,5 @@ func (h *baseHandler) handleHiddenMessage(message string) { switch { case strings.HasPrefix(message, ".syn close connection"): h.SendMessage(".ack close connection") - select { - case <-time.After(time.Second * 5): - logger.Debug("Shutting down client after timeout and sending ack to server") - h.Shutdown() - case <-h.Done(): - return - } } } -- cgit v1.2.3 From 6506e20f6c80f4acb7434eb9dd14f784a67189cd Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 18 Sep 2021 14:41:25 +0300 Subject: add spartan mode --- internal/clients/handlers/basehandler.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 74559e9..0f2d1b5 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -67,9 +67,12 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { */ case '\n', protocol.MessageDelimiter: message := h.receiveBuf.String() - if len(message) == 0 { - continue - } + /* + // dcat/grep should actually display empty lines. + if len(message) == 0 { + continue + } + */ h.handleMessageType(message) h.receiveBuf.Reset() default: @@ -93,12 +96,8 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { // Handle various message types. func (h *baseHandler) handleMessageType(message string) { - if len(message) == 0 { - return - } - // Hidden server commands starti with a dot "." - if message[0] == '.' { + if len(message) > 0 && message[0] == '.' { h.handleHiddenMessage(message) return } -- cgit v1.2.3 From abeac87aec44249bf67f1b0eca471a31086265ca Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 18 Sep 2021 19:27:50 +0300 Subject: fix auto reconnect --- internal/clients/handlers/basehandler.go | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 0f2d1b5..af1ad62 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -22,6 +22,16 @@ type baseHandler struct { status int } +func (h *baseHandler) String() string { + return fmt.Sprintf("baseHandler(%s,server:%s,shellStarted:%v,status:%d)@%p", + h.done, + h.server, + h.shellStarted, + h.status, + h, + ) +} + func (h *baseHandler) Server() string { return h.server } -- cgit v1.2.3 From fe3e68afd99d8ea246be52893730f987e138ec24 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 19 Sep 2021 13:22:59 +0300 Subject: move args to config package logger package rewrite as dlog --- internal/clients/handlers/basehandler.go | 6 +++--- internal/clients/handlers/clienthandler.go | 4 ++-- internal/clients/handlers/maprhandler.go | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index af1ad62..3291b43 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -9,7 +9,7 @@ import ( "time" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/protocol" ) @@ -51,7 +51,7 @@ func (h *baseHandler) Shutdown() { // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) - logger.Debug("Sending command", h.server, command, encoded) + dlog.Client.Debug("Sending command", h.server, command, encoded) select { case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded): @@ -112,7 +112,7 @@ func (h *baseHandler) handleMessageType(message string) { return } - logger.Raw(message) + dlog.Client.Raw(message) } // Handle messages received from server which are not meant to be displayed diff --git a/internal/clients/handlers/clienthandler.go b/internal/clients/handlers/clienthandler.go index 2bcb038..27ac85e 100644 --- a/internal/clients/handlers/clienthandler.go +++ b/internal/clients/handlers/clienthandler.go @@ -2,7 +2,7 @@ package handlers import ( "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // ClientHandler is the basic client handler interface. @@ -12,7 +12,7 @@ type ClientHandler struct { // NewClientHandler creates a new client handler. func NewClientHandler(server string) *ClientHandler { - logger.Debug(server, "Creating new client handler") + dlog.Client.Debug(server, "Creating new client handler") return &ClientHandler{ baseHandler{ diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 65b1454..848e7f0 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -4,7 +4,7 @@ import ( "strings" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" "github.com/mimecast/dtail/internal/protocol" @@ -40,7 +40,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { continue case protocol.MessageDelimiter: message := h.baseHandler.receiveBuf.String() - logger.Debug(message) + dlog.Client.Debug(message) if message[0] == 'A' { h.handleAggregateMessage(message) } else { @@ -60,10 +60,10 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { func (h *MaprHandler) handleAggregateMessage(message string) { parts := strings.SplitN(message, protocol.FieldDelimiter, 3) if len(parts) != 3 { - logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") return } if err := h.aggregate.Aggregate(parts[2]); err != nil { - logger.Error("Unable to aggregate data", h.server, message, err) + dlog.Client.Error("Unable to aggregate data", h.server, message, err) } } -- cgit v1.2.3 From fcaa94c7453efa0d74e330128c0f5c2cde8f11b3 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 26 Sep 2021 16:42:47 +0300 Subject: refactor config reader - also looks in additional search paths for config file unless NONE is specified --- internal/clients/handlers/basehandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 3291b43..8acb45f 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -69,7 +69,7 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { switch b { /* - // TODO: Next DTail version make it so that '\n' gets ignored. For now + // NEXT: Next DTail version make it so that '\n' gets ignored. For now // leave it for compatibility with older DTail server + ability to display // the protocol mismatch warn message. case '\n' { -- cgit v1.2.3 From f70622f307629a2542ea5eb128dea8c1043d3a40 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 5 Oct 2021 10:00:38 +0300 Subject: more on this --- internal/clients/handlers/healthhandler.go | 114 ++++++++++++----------------- 1 file changed, 48 insertions(+), 66 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index eca0348..4949985 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -1,90 +1,72 @@ package handlers import ( - "bytes" - "errors" "fmt" - "time" + "strings" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/protocol" ) -// HealthHandler implements the handler required for health checks. +// HealthHandler is the handler used on the client side for running mapreduce aggregations. type HealthHandler struct { - done *internal.Done - // Buffer of incoming data from server. - receiveBuf bytes.Buffer - // To send commands to the server. - commands chan string - // To receive messages from the server. - receive chan<- string - // The remote server address - server string - // The return status. - status int + baseHandler + HealthStatusCh chan<- int } -// NewHealthHandler returns a new health check handler. -func NewHealthHandler(server string, receive chan<- string) *HealthHandler { - h := HealthHandler{ - server: server, - receive: receive, - commands: make(chan string), - status: -1, - done: internal.NewDone(), +// NewHealthHandler returns a new health client handler. +func NewHealthHandler(server string) *HealthHandler { + dlog.Client.Debug(server, "Creating new health handler") + return &HealthHandler{ + baseHandler: baseHandler{ + server: server, + shellStarted: false, + commands: make(chan string), + status: -1, + done: internal.NewDone(), + }, + HealthStatusCh: make(chan int), } - - return &h -} - -// Server returns the remote server name. -func (h *HealthHandler) Server() string { - return h.server -} - -// Status of the handler. -func (h *HealthHandler) Status() int { - return h.status -} - -// Done returns done channel of the handler. -func (h *HealthHandler) Done() <-chan struct{} { - return h.done.Done() } -// Shutdown the handler. -func (h *HealthHandler) Shutdown() { - h.done.Shutdown() -} - -// SendMessage sends a DTail command to the server. -func (h *HealthHandler) SendMessage(command string) error { - select { - case h.commands <- fmt.Sprintf("%s;", command): - case <-time.NewTimer(time.Second * 10).C: - return errors.New("Timed out sending command " + command) - case <-h.Done(): - } - - return nil -} - -// Server writes byte stream to client. +// Read data from the dtail server via Writer interface. func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf.WriteByte(b) - if b == protocol.MessageDelimiter { - h.receive <- h.receiveBuf.String() - h.receiveBuf.Reset() + switch b { + case '\n': + continue + case protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + dlog.Client.Debug(message) + h.handleHealthMessage(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } return len(p), nil } -// Server reads byte stream from client. -func (h *HealthHandler) Read(p []byte) (n int, err error) { - n = copy(p, []byte(<-h.commands)) - return +func (h *HealthHandler) handleHealthMessage(message string) { + s := strings.Split(message, protocol.FieldDelimiter) + message = s[len(s)-1] + status := strings.Split(message, ":") + fmt.Println(status) + /* + switch status { + case "OK": + h.HealthStatusCh <- 0 + case "WARNING": + h.HealthStatusCh <- 1 + case "CRITICAL": + h.HealthStatusCh <- 2 + case "UNKNOWN": + h.HealthStatusCh <- 3 + default: + fmt.Println("CRITICAL: Unexpected server response: '%s'") + h.HealthStatusCh <- 2 + } + */ } -- cgit v1.2.3 From 9f395a03f25941d8ed98ec43035688daa1e8877f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 5 Oct 2021 22:39:58 +0300 Subject: more on this --- internal/clients/handlers/basehandler.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 8acb45f..04124e7 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -40,14 +40,6 @@ func (h *baseHandler) Status() int { return h.status } -func (h *baseHandler) Done() <-chan struct{} { - return h.done.Done() -} - -func (h *baseHandler) Shutdown() { - h.done.Shutdown() -} - // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) @@ -77,12 +69,6 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { */ case '\n', protocol.MessageDelimiter: message := h.receiveBuf.String() - /* - // dcat/grep should actually display empty lines. - if len(message) == 0 { - continue - } - */ h.handleMessageType(message) h.receiveBuf.Reset() default: @@ -121,5 +107,14 @@ func (h *baseHandler) handleHiddenMessage(message string) { switch { case strings.HasPrefix(message, ".syn close connection"): h.SendMessage(".ack close connection") + h.Shutdown() } } + +func (h *baseHandler) Done() <-chan struct{} { + return h.done.Done() +} + +func (h *baseHandler) Shutdown() { + h.done.Shutdown() +} -- cgit v1.2.3 From fab5dc3e70434ea0abc7a0976487a1973b662331 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 6 Oct 2021 09:50:41 +0300 Subject: enable faster shutdown - useful for dgrep/dmap and dcat commands --- internal/clients/handlers/basehandler.go | 8 +++----- internal/clients/handlers/healthhandler.go | 14 +++++++------- internal/clients/handlers/maprhandler.go | 2 +- 3 files changed, 11 insertions(+), 13 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 04124e7..b520c25 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -69,7 +69,7 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { */ case '\n', protocol.MessageDelimiter: message := h.receiveBuf.String() - h.handleMessageType(message) + h.handleMessage(message) h.receiveBuf.Reset() default: h.receiveBuf.WriteByte(b) @@ -90,9 +90,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { return } -// Handle various message types. -func (h *baseHandler) handleMessageType(message string) { - // Hidden server commands starti with a dot "." +func (h *baseHandler) handleMessage(message string) { if len(message) > 0 && message[0] == '.' { h.handleHiddenMessage(message) return @@ -106,7 +104,7 @@ func (h *baseHandler) handleMessageType(message string) { func (h *baseHandler) handleHiddenMessage(message string) { switch { case strings.HasPrefix(message, ".syn close connection"): - h.SendMessage(".ack close connection") + go h.SendMessage(".ack close connection") h.Shutdown() } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 4949985..4b16ce4 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -12,7 +12,6 @@ import ( // HealthHandler is the handler used on the client side for running mapreduce aggregations. type HealthHandler struct { baseHandler - HealthStatusCh chan<- int } // NewHealthHandler returns a new health client handler. @@ -26,7 +25,6 @@ func NewHealthHandler(server string) *HealthHandler { status: -1, done: internal.NewDone(), }, - HealthStatusCh: make(chan int), } } @@ -34,12 +32,10 @@ func NewHealthHandler(server string) *HealthHandler { func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { switch b { - case '\n': - continue - case protocol.MessageDelimiter: + case '\n', protocol.MessageDelimiter: message := h.baseHandler.receiveBuf.String() dlog.Client.Debug(message) - h.handleHealthMessage(message) + h.handleMessage(message) h.baseHandler.receiveBuf.Reset() default: h.baseHandler.receiveBuf.WriteByte(b) @@ -49,7 +45,11 @@ func (h *HealthHandler) Write(p []byte) (n int, err error) { return len(p), nil } -func (h *HealthHandler) handleHealthMessage(message string) { +func (h *HealthHandler) handleMessage(message string) { + if len(message) > 0 && message[0] == '.' { + h.baseHandler.handleHiddenMessage(message) + return + } s := strings.Split(message, protocol.FieldDelimiter) message = s[len(s)-1] status := strings.Split(message, ":") diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 848e7f0..d1acfbd 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -44,7 +44,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { if message[0] == 'A' { h.handleAggregateMessage(message) } else { - h.baseHandler.handleMessageType(message) + h.baseHandler.handleMessage(message) } h.baseHandler.receiveBuf.Reset() default: -- cgit v1.2.3 From 7306afe9ab073c424ddca0ddc57950f237948118 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 6 Oct 2021 10:55:50 +0300 Subject: move health check to separate client binary --- internal/clients/handlers/healthhandler.go | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 4b16ce4..10ba1f7 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -1,7 +1,6 @@ package handlers import ( - "fmt" "strings" "github.com/mimecast/dtail/internal" @@ -22,7 +21,7 @@ func NewHealthHandler(server string) *HealthHandler { server: server, shellStarted: false, commands: make(chan string), - status: -1, + status: 2, // Assume CRITICAL status by default. done: internal.NewDone(), }, } @@ -34,14 +33,12 @@ func (h *HealthHandler) Write(p []byte) (n int, err error) { switch b { case '\n', protocol.MessageDelimiter: message := h.baseHandler.receiveBuf.String() - dlog.Client.Debug(message) h.handleMessage(message) h.baseHandler.receiveBuf.Reset() default: h.baseHandler.receiveBuf.WriteByte(b) } } - return len(p), nil } @@ -52,21 +49,7 @@ func (h *HealthHandler) handleMessage(message string) { } s := strings.Split(message, protocol.FieldDelimiter) message = s[len(s)-1] - status := strings.Split(message, ":") - fmt.Println(status) - /* - switch status { - case "OK": - h.HealthStatusCh <- 0 - case "WARNING": - h.HealthStatusCh <- 1 - case "CRITICAL": - h.HealthStatusCh <- 2 - case "UNKNOWN": - h.HealthStatusCh <- 3 - default: - fmt.Println("CRITICAL: Unexpected server response: '%s'") - h.HealthStatusCh <- 2 - } - */ + if message == "OK" { + h.baseHandler.status = 0 + } } -- cgit v1.2.3 From 97747ea0f3178f7f5890512d483fdccaa82846b0 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 21:10:29 +0300 Subject: vetting and linting and some code restyling --- internal/clients/handlers/healthhandler.go | 3 ++- internal/clients/handlers/maprhandler.go | 13 ++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) (limited to 'internal/clients/handlers') diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 10ba1f7..47b594e 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -8,7 +8,8 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// HealthHandler is the handler used on the client side for running mapreduce aggregations. +// HealthHandler is the handler used on the client side for running mapreduce +// aggregations. type HealthHandler struct { baseHandler } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index d1acfbd..8718b35 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -10,7 +10,8 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// MaprHandler is the handler used on the client side for running mapreduce aggregations. +// MaprHandler is the handler used on the client side for running mapreduce +// aggregations. type MaprHandler struct { baseHandler aggregate *client.Aggregate @@ -18,7 +19,9 @@ type MaprHandler struct { } // NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler { +func NewMaprHandler(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *MaprHandler { + return &MaprHandler{ baseHandler: baseHandler{ server: server, @@ -55,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { return len(p), nil } -// Handle a message received from server including mapr aggregation -// related data. +// Handle a message received from server including mapr aggregation related data. func (h *MaprHandler) handleAggregateMessage(message string) { parts := strings.SplitN(message, protocol.FieldDelimiter, 3) if len(parts) != 3 { - dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, + len(parts), "expected 3 parts") return } if err := h.aggregate.Aggregate(parts[2]); err != nil { -- cgit v1.2.3