diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
| commit | f4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch) | |
| tree | ea5e4a2d2a67035f645bdee496ae55a52034178a /internal/clients/handlers/maprhandler.go | |
| parent | d80d6070557e3a800e3a54967af9eced518f116b (diff) | |
| parent | 739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff) | |
merge develop
Diffstat (limited to 'internal/clients/handlers/maprhandler.go')
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 56 |
1 files changed, 30 insertions, 26 deletions
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index fb71c8f..8718b35 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -4,21 +4,24 @@ import ( "strings" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" + "github.com/mimecast/dtail/internal/protocol" ) -// MaprHandler is the handler used on the client side for running mapreduce aggregations. +// MaprHandler is the handler used on the client side for running mapreduce +// aggregations. type MaprHandler struct { baseHandler aggregate *client.Aggregate query *mapr.Query - count uint64 } // NewMaprHandler returns a new mapreduce client handler. -func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler { +func NewMaprHandler(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *MaprHandler { + return &MaprHandler{ baseHandler: baseHandler{ server: server, @@ -35,34 +38,35 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr // Read data from the dtail server via Writer interface. func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == '\n' { - if len(h.baseHandler.receiveBuf) == 0 { - continue - } - message := string(h.baseHandler.receiveBuf) - - if h.baseHandler.receiveBuf[0] == 'A' { - h.handleAggregateMessage(strings.TrimSpace(message)) - h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] - continue + switch b { + case '\n': + continue + case protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + dlog.Client.Debug(message) + if message[0] == 'A' { + h.handleAggregateMessage(message) + } else { + h.baseHandler.handleMessage(message) } - h.baseHandler.handleMessageType(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } return len(p), nil } -// Handle a message received from server including mapr aggregation -// related data. +// Handle a message received from server including mapr aggregation related data. func (h *MaprHandler) handleAggregateMessage(message string) { - h.count++ - parts := strings.Split(message, "➔") - - // Index 0 contains 'AGGREGATE', 1 contains server host. - // Aggregation data begins from index 2. - logger.Debug("Received aggregate data", h.server, h.count, parts) - h.aggregate.Aggregate(parts[2:]) - logger.Debug("Aggregated aggregate data", h.server, h.count) + parts := strings.SplitN(message, protocol.FieldDelimiter, 3) + if len(parts) != 3 { + dlog.Client.Error("Unable to aggregate data", h.server, message, parts, + len(parts), "expected 3 parts") + return + } + if err := h.aggregate.Aggregate(parts[2]); err != nil { + dlog.Client.Error("Unable to aggregate data", h.server, message, err) + } } |
