summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-08-21 14:54:24 +0300
committerPaul Buetow <paul@buetow.org>2021-08-21 14:54:24 +0300
commitc2522ffb59514443816a96386c16bb7527cbe57c (patch)
tree6e6fb065e14b92e362f66103cfed2cbdc51ceccf
parent70cc67e78278fcf103acc57dfe513bd6f5f258c9 (diff)
read files bytewise for more control of whats happening - change transport protocol for more control over newlines
-rw-r--r--internal/clients/handlers/basehandler.go6
-rw-r--r--internal/clients/handlers/maprhandler.go3
-rw-r--r--internal/io/fs/readfile.go71
-rw-r--r--internal/io/logger/logger.go7
-rw-r--r--internal/mapr/aggregateset.go10
-rw-r--r--internal/protocol/protocol.go10
-rw-r--r--internal/server/handlers/serverhandler.go15
-rw-r--r--internal/version/version.go7
8 files changed, 66 insertions, 63 deletions
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index f07fd90..acafe0e 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -9,7 +9,7 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/version"
+ "github.com/mimecast/dtail/internal/protocol"
)
type baseHandler struct {
@@ -43,7 +43,7 @@ func (h *baseHandler) SendMessage(command string) error {
logger.Debug("Sending command", h.server, command, encoded)
select {
- case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded):
+ case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded):
case <-time.After(time.Second * 5):
return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded)
case <-h.Done():
@@ -57,7 +57,7 @@ func (h *baseHandler) SendMessage(command string) error {
func (h *baseHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
h.receiveBuf = append(h.receiveBuf, b)
- if b == '\n' {
+ if b == protocol.MessageDelimiter {
if len(h.receiveBuf) == 0 {
continue
}
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index fb71c8f..7ac5895 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -7,6 +7,7 @@ import (
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/client"
+ "github.com/mimecast/dtail/internal/protocol"
)
// MaprHandler is the handler used on the client side for running mapreduce aggregations.
@@ -58,7 +59,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
// related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
h.count++
- parts := strings.Split(message, "➔")
+ parts := strings.Split(message, protocol.AggregateDelimiter)
// Index 0 contains 'AGGREGATE', 1 contains server host.
// Aggregation data begins from index 2.
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 6757bd6..8a365a1 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -14,6 +14,7 @@ import (
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/protocol"
"github.com/mimecast/dtail/internal/regex"
"github.com/DataDog/zstd"
@@ -148,80 +149,64 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
if err != nil {
return err
}
- rawLine := make([]byte, 0, 512)
lineLengthThreshold := 1024 * 1024 // 1mb
- longLineWarning := false
+ warnedAboutLongLine := false
+ message := make([]byte, 0, 512)
for {
select {
case <-ctx.Done():
return nil
- default:
- }
-
- select {
case <-truncate:
if isTruncated, err := f.truncated(fd); isTruncated {
return err
}
- logger.Info(f.filePath, "Current offset", offset)
default:
}
- // Read some bytes (max 4k at once as of go 1.12). isPrefix will
- // be set if line does not fit into 4k buffer.
- bytes, isPrefix, err := reader.ReadLine()
+ b, err := reader.ReadByte()
if err != nil {
- // If EOF, sleep a couple of ms and return with nil error.
- // If other error, return with non-nil error.
if err != io.EOF {
return err
}
if !f.seekEOF {
- logger.Debug(f.FilePath(), "End of file reached")
+ logger.Info(f.FilePath(), "End of file reached")
return nil
}
time.Sleep(time.Millisecond * 100)
continue
}
+ offset++
- rawLine = append(rawLine, bytes...)
- offset += uint64(len(bytes))
-
- if !isPrefix {
- // last LineRead call returned contend until end of line.
- rawLine = append(rawLine, '\n')
- select {
- case rawLines <- rawLine:
- case <-ctx.Done():
- return nil
+ switch b {
+ case '\n':
+ if len(message) == 0 {
+ time.Sleep(time.Millisecond * 100)
+ continue
}
- rawLine = make([]byte, 0, 512)
- if longLineWarning {
- longLineWarning = false
- }
- continue
- }
-
- // Last LineRead call could not read content until end of line, buffer
- // was too small. Determine whether we exceed the max line length we
- // want dtail to send to the client at once. Possibly split up log line
- // into multiple log lines.
- if len(rawLine) >= lineLengthThreshold {
- if !longLineWarning {
- f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
- // Only print out one warning per long log line.
- longLineWarning = true
- }
- rawLine = append(rawLine, '\n')
select {
- case rawLines <- rawLine:
+ case rawLines <- append(message, protocol.MessageDelimiter):
+ message = make([]byte, 0, 512)
+ warnedAboutLongLine = false
case <-ctx.Done():
return nil
}
- rawLine = make([]byte, 0, 512)
+ default:
+ if len(message) >= lineLengthThreshold {
+ if !warnedAboutLongLine {
+ f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
+ warnedAboutLongLine = true
+ }
+ select {
+ case <-ctx.Done():
+ return nil
+ case rawLines <- append(message, protocol.MessageDelimiter):
+ message = make([]byte, 0, 512)
+ }
+ }
+ message = append(message, b)
}
}
}
diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go
index bb9dc02..3a3935d 100644
--- a/internal/io/logger/logger.go
+++ b/internal/io/logger/logger.go
@@ -205,7 +205,7 @@ func Trace(args ...interface{}) string {
// Write log line to buffer and/or log file.
func write(what, severity, message string) {
if Mode.logToStdout {
- line := fmt.Sprintf("%s|%s|%s|%s\n", what, hostname, severity, message)
+ line := fmt.Sprintf("%s|%s|%s|%s", what, hostname, severity, message)
if config.Client.TermColorsEnable {
line = brush.Colorfy(line)
@@ -219,7 +219,7 @@ func write(what, severity, message string) {
timeStr := t.Format("20060102-150405")
fileLogBufCh <- buf{
time: t,
- message: fmt.Sprintf("%s|%s|%s|%s\n", severity, timeStr, what, message),
+ message: fmt.Sprintf("%s|%s|%s|%s", severity, timeStr, what, message),
}
}
}
@@ -326,6 +326,7 @@ func Flush() {
select {
case message := <-stdoutBufCh:
stdoutWriter.WriteString(message)
+ stdoutWriter.WriteString("\n")
default:
stdoutWriter.Flush()
return
@@ -338,6 +339,7 @@ func writeToStdout(ctx context.Context) {
select {
case message := <-stdoutBufCh:
stdoutWriter.WriteString(message)
+ stdoutWriter.WriteString("\n")
case <-time.After(time.Millisecond * 100):
stdoutWriter.Flush()
case <-pauseCh:
@@ -365,6 +367,7 @@ func writeToFile(ctx context.Context) {
dateStr := buf.time.Format("20060102")
w := fileWriter(dateStr)
w.WriteString(buf.message)
+ w.WriteString("\n")
case <-pauseCh:
PAUSE:
for {
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go
index a6cc6eb..029d87b 100644
--- a/internal/mapr/aggregateset.go
+++ b/internal/mapr/aggregateset.go
@@ -5,6 +5,8 @@ import (
"fmt"
"strconv"
"strings"
+
+ "github.com/mimecast/dtail/internal/protocol"
)
// AggregateSet represents aggregated key/value pairs from the
@@ -70,20 +72,20 @@ func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<-
var sb strings.Builder
sb.WriteString(groupKey)
- sb.WriteString("➔")
- sb.WriteString(fmt.Sprintf("%d➔", s.Samples))
+ sb.WriteString(protocol.AggregateDelimiter)
+ sb.WriteString(fmt.Sprintf("%d%s", s.Samples, protocol.AggregateDelimiter))
for k, v := range s.FValues {
sb.WriteString(k)
sb.WriteString("=")
- sb.WriteString(fmt.Sprintf("%v➔", v))
+ sb.WriteString(fmt.Sprintf("%v%s", v, protocol.AggregateDelimiter))
}
for k, v := range s.SValues {
sb.WriteString(k)
sb.WriteString("=")
sb.WriteString(v)
- sb.WriteString("➔")
+ sb.WriteString(protocol.AggregateDelimiter)
}
select {
diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go
new file mode 100644
index 0000000..2a570cd
--- /dev/null
+++ b/internal/protocol/protocol.go
@@ -0,0 +1,10 @@
+package protocol
+
+const (
+ // ProtocolCompat -ibility version
+ ProtocolCompat string = "4"
+ // MessageDelimiter delimits separate messages.
+ MessageDelimiter byte = '¬'
+ // AggregateDelimiter delimits parts of an aggregation message.
+ AggregateDelimiter string = "➔"
+)
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 185e7c2..23e3aeb 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -17,8 +17,8 @@ import (
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
+ "github.com/mimecast/dtail/internal/protocol"
user "github.com/mimecast/dtail/internal/user/server"
- "github.com/mimecast/dtail/internal/version"
)
const (
@@ -92,24 +92,27 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) {
}
if message[0] == '.' {
// Handle hidden message (don't display to the user, interpreted by dtail client)
- wholePayload := []byte(fmt.Sprintf("%s\n", message))
+ wholePayload := []byte(fmt.Sprintf("%s%b", message, protocol.MessageDelimiter))
n = copy(p, wholePayload)
return
}
// Handle normal server message (display to the user)
- wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s\n", h.hostname, message))
+ wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s%b", h.hostname, message, protocol.MessageDelimiter))
n = copy(p, wholePayload)
return
case message := <-h.aggregatedMessages:
// Send mapreduce-aggregated data as a message.
- data := fmt.Sprintf("AGGREGATE➔%s➔%s\n", h.hostname, message)
+ data := fmt.Sprintf("AGGREGATE%s%s%s%s%b",
+ protocol.AggregateDelimiter, h.hostname,
+ protocol.AggregateDelimiter, message, protocol.MessageDelimiter)
wholePayload := []byte(data)
n = copy(p, wholePayload)
return
case line := <-h.lines:
+ //fmt.Printf("<<<%d,%s>>>\n", len(line.Content), line.Content)
// Send normal file content data as a message.
serverInfo := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|",
h.hostname, line.TransmittedPerc, line.Count, line.SourceID))
@@ -182,8 +185,8 @@ func (h *ServerHandler) handleProtocolVersion(args []string) ([]string, int, err
return args, argc, errors.New("unable to determine protocol version")
}
- if args[1] != version.ProtocolCompat {
- err := fmt.Errorf("server with protocol version '%s' but client with '%s', please update DTail", version.ProtocolCompat, args[1])
+ if args[1] != protocol.ProtocolCompat {
+ err := fmt.Errorf("server with protocol version '%s' but client with '%s', please update DTail", protocol.ProtocolCompat, args[1])
return args, argc, err
}
diff --git a/internal/version/version.go b/internal/version/version.go
index 3e683bd..7f07c83 100644
--- a/internal/version/version.go
+++ b/internal/version/version.go
@@ -6,6 +6,7 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/protocol"
)
const (
@@ -13,15 +14,13 @@ const (
Name string = "DTail"
// Version of DTail.
Version string = "3.2.0"
- // ProtocolCompat -ibility version.
- ProtocolCompat string = "3"
// Additional information for DTail
Additional string = "Have a lot of fun!"
)
// String representation of the DTail version.
func String() string {
- return fmt.Sprintf("%s %v Protocol %s %s", Name, Version, ProtocolCompat, Additional)
+ return fmt.Sprintf("%s %v Protocol %s %s", Name, Version, protocol.ProtocolCompat, Additional)
}
// PaintedString is a prettier string representation of the DTail version.
@@ -36,7 +35,7 @@ func PaintedString() string {
version := color.PaintWithAttr(fmt.Sprintf(" %s ", Version),
color.FgBlue, color.BgYellow, color.AttrBold)
- protocol := color.Paint(fmt.Sprintf(" Protocol %s ", ProtocolCompat),
+ protocol := color.Paint(fmt.Sprintf(" Protocol %s ", protocol.ProtocolCompat),
color.FgBlack, color.BgGreen)
additional := color.PaintWithAttr(fmt.Sprintf(" %s ", Additional),