From b3c161b7887d98ab7aba2fce90c9b5965991ab62 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 21 Aug 2021 14:54:24 +0300 Subject: read files bytewise for more control of whats happening - change transport protocol for more control over newlines --- internal/clients/handlers/basehandler.go | 6 +++--- internal/clients/handlers/maprhandler.go | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'internal/clients') diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index f07fd90..acafe0e 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -9,7 +9,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/version" + "github.com/mimecast/dtail/internal/protocol" ) type baseHandler struct { @@ -43,7 +43,7 @@ func (h *baseHandler) SendMessage(command string) error { logger.Debug("Sending command", h.server, command, encoded) select { - case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded): + case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded): case <-time.After(time.Second * 5): return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) case <-h.Done(): @@ -57,7 +57,7 @@ func (h *baseHandler) SendMessage(command string) error { func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { + if b == protocol.MessageDelimiter { if len(h.receiveBuf) == 0 { continue } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index fb71c8f..7ac5895 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -7,6 +7,7 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" + "github.com/mimecast/dtail/internal/protocol" ) // MaprHandler is the handler used on the client side for running mapreduce aggregations. @@ -58,7 +59,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { // related data. func (h *MaprHandler) handleAggregateMessage(message string) { h.count++ - parts := strings.Split(message, "➔") + parts := strings.Split(message, protocol.AggregateDelimiter) // Index 0 contains 'AGGREGATE', 1 contains server host. // Aggregation data begins from index 2. -- cgit v1.2.3