From c2522ffb59514443816a96386c16bb7527cbe57c Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 21 Aug 2021 14:54:24 +0300 Subject: read files bytewise for more control of whats happening - change transport protocol for more control over newlines --- internal/clients/handlers/maprhandler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'internal/clients/handlers/maprhandler.go') diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index fb71c8f..7ac5895 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -7,6 +7,7 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" + "github.com/mimecast/dtail/internal/protocol" ) // MaprHandler is the handler used on the client side for running mapreduce aggregations. @@ -58,7 +59,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { // related data. func (h *MaprHandler) handleAggregateMessage(message string) { h.count++ - parts := strings.Split(message, "➔") + parts := strings.Split(message, protocol.AggregateDelimiter) // Index 0 contains 'AGGREGATE', 1 contains server host. // Aggregation data begins from index 2. -- cgit v1.2.3 From 9883a190109623b64e6d311dc2b462a6eae68003 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 22 Aug 2021 10:07:00 +0300 Subject: introduces the protocol package --- internal/clients/handlers/maprhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/clients/handlers/maprhandler.go') diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 7ac5895..afad507 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -37,7 +37,7 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == '\n' { + if b == protocol.MessageDelimiter { // '\n' { if len(h.baseHandler.receiveBuf) == 0 { continue } -- cgit v1.2.3 From 16dc57e1e1c28e9d762424e596223a980770e059 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 8 Sep 2021 19:10:50 +0300 Subject: mapreduce tables are in colors now too --- internal/clients/handlers/maprhandler.go | 42 ++++++++++++++++---------------- 1 file changed, 21 insertions(+), 21 deletions(-) (limited to 'internal/clients/handlers/maprhandler.go') diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index afad507..65b1454 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -15,7 +15,6 @@ type MaprHandler struct { baseHandler aggregate *client.Aggregate query *mapr.Query - count uint64 } // NewMaprHandler returns a new mapreduce client handler. @@ -36,19 +35,20 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr // Read data from the dtail server via Writer interface. func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == protocol.MessageDelimiter { // '\n' { - if len(h.baseHandler.receiveBuf) == 0 { - continue + switch b { + case '\n': + continue + case protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + logger.Debug(message) + if message[0] == 'A' { + h.handleAggregateMessage(message) + } else { + h.baseHandler.handleMessageType(message) } - message := string(h.baseHandler.receiveBuf) - - if h.baseHandler.receiveBuf[0] == 'A' { - h.handleAggregateMessage(strings.TrimSpace(message)) - h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] - continue - } - h.baseHandler.handleMessageType(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } @@ -58,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { // Handle a message received from server including mapr aggregation // related data. func (h *MaprHandler) handleAggregateMessage(message string) { - h.count++ - parts := strings.Split(message, protocol.AggregateDelimiter) - - // Index 0 contains 'AGGREGATE', 1 contains server host. - // Aggregation data begins from index 2. - logger.Debug("Received aggregate data", h.server, h.count, parts) - h.aggregate.Aggregate(parts[2:]) - logger.Debug("Aggregated aggregate data", h.server, h.count) + parts := strings.SplitN(message, protocol.FieldDelimiter, 3) + if len(parts) != 3 { + logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + return + } + if err := h.aggregate.Aggregate(parts[2]); err != nil { + logger.Error("Unable to aggregate data", h.server, message, err) + } } -- cgit v1.2.3 From fe3e68afd99d8ea246be52893730f987e138ec24 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 19 Sep 2021 13:22:59 +0300 Subject: move args to config package logger package rewrite as dlog --- internal/clients/handlers/maprhandler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'internal/clients/handlers/maprhandler.go') diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 65b1454..848e7f0 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -4,7 +4,7 @@ import ( "strings" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" "github.com/mimecast/dtail/internal/protocol" @@ -40,7 +40,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { continue case protocol.MessageDelimiter: message := h.baseHandler.receiveBuf.String() - logger.Debug(message) + dlog.Client.Debug(message) if message[0] == 'A' { h.handleAggregateMessage(message) } else { @@ -60,10 +60,10 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { func (h *MaprHandler) handleAggregateMessage(message string) { parts := strings.SplitN(message, protocol.FieldDelimiter, 3) if len(parts) != 3 { - logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") return } if err := h.aggregate.Aggregate(parts[2]); err != nil { - logger.Error("Unable to aggregate data", h.server, message, err) + dlog.Client.Error("Unable to aggregate data", h.server, message, err) } } -- cgit v1.2.3 From fab5dc3e70434ea0abc7a0976487a1973b662331 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 6 Oct 2021 09:50:41 +0300 Subject: enable faster shutdown - useful for dgrep/dmap and dcat commands --- internal/clients/handlers/maprhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/clients/handlers/maprhandler.go') diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 848e7f0..d1acfbd 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -44,7 +44,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { if message[0] == 'A' { h.handleAggregateMessage(message) } else { - h.baseHandler.handleMessageType(message) + h.baseHandler.handleMessage(message) } h.baseHandler.receiveBuf.Reset() default: -- cgit v1.2.3 From 97747ea0f3178f7f5890512d483fdccaa82846b0 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 21:10:29 +0300 Subject: vetting and linting and some code restyling --- internal/clients/handlers/maprhandler.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'internal/clients/handlers/maprhandler.go') diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index d1acfbd..8718b35 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -10,7 +10,8 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// MaprHandler is the handler used on the client side for running mapreduce aggregations. +// MaprHandler is the handler used on the client side for running mapreduce +// aggregations. type MaprHandler struct { baseHandler aggregate *client.Aggregate @@ -18,7 +19,9 @@ type MaprHandler struct { } // NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler { +func NewMaprHandler(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *MaprHandler { + return &MaprHandler{ baseHandler: baseHandler{ server: server, @@ -55,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { return len(p), nil } -// Handle a message received from server including mapr aggregation -// related data. +// Handle a message received from server including mapr aggregation related data. func (h *MaprHandler) handleAggregateMessage(message string) { parts := strings.SplitN(message, protocol.FieldDelimiter, 3) if len(parts) != 3 { - dlog.Client.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, + len(parts), "expected 3 parts") return } if err := h.aggregate.Aggregate(parts[2]); err != nil { -- cgit v1.2.3