summaryrefslogtreecommitdiff
path: root/internal/clients/handlers/maprhandler.go
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2021-10-21 21:28:49 +0300
committerPaul Buetow <pbuetow@mimecast.com>2021-10-21 21:28:49 +0300
commitf4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch)
treeea5e4a2d2a67035f645bdee496ae55a52034178a /internal/clients/handlers/maprhandler.go
parentd80d6070557e3a800e3a54967af9eced518f116b (diff)
parent739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff)
merge develop
Diffstat (limited to 'internal/clients/handlers/maprhandler.go')
-rw-r--r--internal/clients/handlers/maprhandler.go56
1 files changed, 30 insertions, 26 deletions
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index fb71c8f..8718b35 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -4,21 +4,24 @@ import (
"strings"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
+ "github.com/mimecast/dtail/internal/protocol"
)
-// MaprHandler is the handler used on the client side for running mapreduce aggregations.
+// MaprHandler is the handler used on the client side for running mapreduce
+// aggregations.
type MaprHandler struct {
baseHandler
aggregate *client.Aggregate
query *mapr.Query
- count uint64
}
// NewMaprHandler returns a new mapreduce client handler.
-func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *MaprHandler {
+func NewMaprHandler(server string, query *mapr.Query,
+ globalGroup *mapr.GlobalGroupSet) *MaprHandler {
+
return &MaprHandler{
baseHandler: baseHandler{
server: server,
@@ -35,34 +38,35 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr
// Read data from the dtail server via Writer interface.
func (h *MaprHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b)
- if b == '\n' {
- if len(h.baseHandler.receiveBuf) == 0 {
- continue
- }
- message := string(h.baseHandler.receiveBuf)
-
- if h.baseHandler.receiveBuf[0] == 'A' {
- h.handleAggregateMessage(strings.TrimSpace(message))
- h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0]
- continue
+ switch b {
+ case '\n':
+ continue
+ case protocol.MessageDelimiter:
+ message := h.baseHandler.receiveBuf.String()
+ dlog.Client.Debug(message)
+ if message[0] == 'A' {
+ h.handleAggregateMessage(message)
+ } else {
+ h.baseHandler.handleMessage(message)
}
- h.baseHandler.handleMessageType(message)
+ h.baseHandler.receiveBuf.Reset()
+ default:
+ h.baseHandler.receiveBuf.WriteByte(b)
}
}
return len(p), nil
}
-// Handle a message received from server including mapr aggregation
-// related data.
+// Handle a message received from server including mapr aggregation related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
- h.count++
- parts := strings.Split(message, "➔")
-
- // Index 0 contains 'AGGREGATE', 1 contains server host.
- // Aggregation data begins from index 2.
- logger.Debug("Received aggregate data", h.server, h.count, parts)
- h.aggregate.Aggregate(parts[2:])
- logger.Debug("Aggregated aggregate data", h.server, h.count)
+ parts := strings.SplitN(message, protocol.FieldDelimiter, 3)
+ if len(parts) != 3 {
+ dlog.Client.Error("Unable to aggregate data", h.server, message, parts,
+ len(parts), "expected 3 parts")
+ return
+ }
+ if err := h.aggregate.Aggregate(parts[2]); err != nil {
+ dlog.Client.Error("Unable to aggregate data", h.server, message, err)
+ }
}