summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/clients/handlers/maprhandler.go10
1 files changed, 8 insertions, 2 deletions
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index 8718b35..4c11470 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -16,6 +16,7 @@ type MaprHandler struct {
baseHandler
aggregate *client.Aggregate
query *mapr.Query
+ removedNl bool
}
// NewMaprHandler returns a new mapreduce client handler.
@@ -40,16 +41,21 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
switch b {
case '\n':
- continue
+ h.removedNl = true
case protocol.MessageDelimiter:
message := h.baseHandler.receiveBuf.String()
dlog.Client.Debug(message)
if message[0] == 'A' {
h.handleAggregateMessage(message)
} else {
- h.baseHandler.handleMessage(message)
+ if h.removedNl {
+ h.baseHandler.handleMessage(message + "\n")
+ } else {
+ h.baseHandler.handleMessage(message)
+ }
}
h.baseHandler.receiveBuf.Reset()
+ h.removedNl = false
default:
h.baseHandler.receiveBuf.WriteByte(b)
}