summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-08-21 14:54:24 +0300
committerPaul Buetow <paul@buetow.org>2021-08-21 14:54:24 +0300
commitb3c161b7887d98ab7aba2fce90c9b5965991ab62 (patch)
tree6e6fb065e14b92e362f66103cfed2cbdc51ceccf /internal/clients
parent00c3b6be33448d0389e17e45582c7c41d5d61fc2 (diff)
read files bytewise for more control of whats happening - change transport protocol for more control over newlines
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/handlers/basehandler.go6
-rw-r--r--internal/clients/handlers/maprhandler.go3
2 files changed, 5 insertions, 4 deletions
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index f07fd90..acafe0e 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -9,7 +9,7 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/version"
+ "github.com/mimecast/dtail/internal/protocol"
)
type baseHandler struct {
@@ -43,7 +43,7 @@ func (h *baseHandler) SendMessage(command string) error {
logger.Debug("Sending command", h.server, command, encoded)
select {
- case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded):
+ case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded):
case <-time.After(time.Second * 5):
return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded)
case <-h.Done():
@@ -57,7 +57,7 @@ func (h *baseHandler) SendMessage(command string) error {
func (h *baseHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
h.receiveBuf = append(h.receiveBuf, b)
- if b == '\n' {
+ if b == protocol.MessageDelimiter {
if len(h.receiveBuf) == 0 {
continue
}
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index fb71c8f..7ac5895 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -7,6 +7,7 @@ import (
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
+ "github.com/mimecast/dtail/internal/protocol"
)
// MaprHandler is the handler used on the client side for running mapreduce aggregations.
@@ -58,7 +59,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
// related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
h.count++
- parts := strings.Split(message, "➔")
+ parts := strings.Split(message, protocol.AggregateDelimiter)
// Index 0 contains 'AGGREGATE', 1 contains server host.
// Aggregation data begins from index 2.