diff options
| author | Paul Buetow <paul@buetow.org> | 2021-08-21 14:54:24 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-08-21 14:54:24 +0300 |
| commit | c2522ffb59514443816a96386c16bb7527cbe57c (patch) | |
| tree | 6e6fb065e14b92e362f66103cfed2cbdc51ceccf /internal | |
| parent | 70cc67e78278fcf103acc57dfe513bd6f5f258c9 (diff) | |
read files bytewise for more control of whats happening - change transport protocol for more control over newlines
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 6 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 3 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 71 | ||||
| -rw-r--r-- | internal/io/logger/logger.go | 7 | ||||
| -rw-r--r-- | internal/mapr/aggregateset.go | 10 | ||||
| -rw-r--r-- | internal/protocol/protocol.go | 10 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 15 | ||||
| -rw-r--r-- | internal/version/version.go | 7 |
8 files changed, 66 insertions, 63 deletions
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index f07fd90..acafe0e 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -9,7 +9,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/version" + "github.com/mimecast/dtail/internal/protocol" ) type baseHandler struct { @@ -43,7 +43,7 @@ func (h *baseHandler) SendMessage(command string) error { logger.Debug("Sending command", h.server, command, encoded) select { - case h.commands <- fmt.Sprintf("protocol %s base64 %v;", version.ProtocolCompat, encoded): + case h.commands <- fmt.Sprintf("protocol %s base64 %v;", protocol.ProtocolCompat, encoded): case <-time.After(time.Second * 5): return fmt.Errorf("Timed out sending command '%s' (base64: '%s')", command, encoded) case <-h.Done(): @@ -57,7 +57,7 @@ func (h *baseHandler) SendMessage(command string) error { func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { h.receiveBuf = append(h.receiveBuf, b) - if b == '\n' { + if b == protocol.MessageDelimiter { if len(h.receiveBuf) == 0 { continue } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index fb71c8f..7ac5895 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -7,6 +7,7 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/client" + "github.com/mimecast/dtail/internal/protocol" ) // MaprHandler is the handler used on the client side for running mapreduce aggregations. @@ -58,7 +59,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { // related data. func (h *MaprHandler) handleAggregateMessage(message string) { h.count++ - parts := strings.Split(message, "➔") + parts := strings.Split(message, protocol.AggregateDelimiter) // Index 0 contains 'AGGREGATE', 1 contains server host. // Aggregation data begins from index 2. diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 6757bd6..8a365a1 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -14,6 +14,7 @@ import ( "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/protocol" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -148,80 +149,64 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t if err != nil { return err } - rawLine := make([]byte, 0, 512) lineLengthThreshold := 1024 * 1024 // 1mb - longLineWarning := false + warnedAboutLongLine := false + message := make([]byte, 0, 512) for { select { case <-ctx.Done(): return nil - default: - } - - select { case <-truncate: if isTruncated, err := f.truncated(fd); isTruncated { return err } - logger.Info(f.filePath, "Current offset", offset) default: } - // Read some bytes (max 4k at once as of go 1.12). isPrefix will - // be set if line does not fit into 4k buffer. - bytes, isPrefix, err := reader.ReadLine() + b, err := reader.ReadByte() if err != nil { - // If EOF, sleep a couple of ms and return with nil error. - // If other error, return with non-nil error. if err != io.EOF { return err } if !f.seekEOF { - logger.Debug(f.FilePath(), "End of file reached") + logger.Info(f.FilePath(), "End of file reached") return nil } time.Sleep(time.Millisecond * 100) continue } + offset++ - rawLine = append(rawLine, bytes...) - offset += uint64(len(bytes)) - - if !isPrefix { - // last LineRead call returned contend until end of line. - rawLine = append(rawLine, '\n') - select { - case rawLines <- rawLine: - case <-ctx.Done(): - return nil + switch b { + case '\n': + if len(message) == 0 { + time.Sleep(time.Millisecond * 100) + continue } - rawLine = make([]byte, 0, 512) - if longLineWarning { - longLineWarning = false - } - continue - } - - // Last LineRead call could not read content until end of line, buffer - // was too small. Determine whether we exceed the max line length we - // want dtail to send to the client at once. Possibly split up log line - // into multiple log lines. - if len(rawLine) >= lineLengthThreshold { - if !longLineWarning { - f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") - // Only print out one warning per long log line. - longLineWarning = true - } - rawLine = append(rawLine, '\n') select { - case rawLines <- rawLine: + case rawLines <- append(message, protocol.MessageDelimiter): + message = make([]byte, 0, 512) + warnedAboutLongLine = false case <-ctx.Done(): return nil } - rawLine = make([]byte, 0, 512) + default: + if len(message) >= lineLengthThreshold { + if !warnedAboutLongLine { + f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") + warnedAboutLongLine = true + } + select { + case <-ctx.Done(): + return nil + case rawLines <- append(message, protocol.MessageDelimiter): + message = make([]byte, 0, 512) + } + } + message = append(message, b) } } } diff --git a/internal/io/logger/logger.go b/internal/io/logger/logger.go index bb9dc02..3a3935d 100644 --- a/internal/io/logger/logger.go +++ b/internal/io/logger/logger.go @@ -205,7 +205,7 @@ func Trace(args ...interface{}) string { // Write log line to buffer and/or log file. func write(what, severity, message string) { if Mode.logToStdout { - line := fmt.Sprintf("%s|%s|%s|%s\n", what, hostname, severity, message) + line := fmt.Sprintf("%s|%s|%s|%s", what, hostname, severity, message) if config.Client.TermColorsEnable { line = brush.Colorfy(line) @@ -219,7 +219,7 @@ func write(what, severity, message string) { timeStr := t.Format("20060102-150405") fileLogBufCh <- buf{ time: t, - message: fmt.Sprintf("%s|%s|%s|%s\n", severity, timeStr, what, message), + message: fmt.Sprintf("%s|%s|%s|%s", severity, timeStr, what, message), } } } @@ -326,6 +326,7 @@ func Flush() { select { case message := <-stdoutBufCh: stdoutWriter.WriteString(message) + stdoutWriter.WriteString("\n") default: stdoutWriter.Flush() return @@ -338,6 +339,7 @@ func writeToStdout(ctx context.Context) { select { case message := <-stdoutBufCh: stdoutWriter.WriteString(message) + stdoutWriter.WriteString("\n") case <-time.After(time.Millisecond * 100): stdoutWriter.Flush() case <-pauseCh: @@ -365,6 +367,7 @@ func writeToFile(ctx context.Context) { dateStr := buf.time.Format("20060102") w := fileWriter(dateStr) w.WriteString(buf.message) + w.WriteString("\n") case <-pauseCh: PAUSE: for { diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index a6cc6eb..029d87b 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -5,6 +5,8 @@ import ( "fmt" "strconv" "strings" + + "github.com/mimecast/dtail/internal/protocol" ) // AggregateSet represents aggregated key/value pairs from the @@ -70,20 +72,20 @@ func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- var sb strings.Builder sb.WriteString(groupKey) - sb.WriteString("➔") - sb.WriteString(fmt.Sprintf("%d➔", s.Samples)) + sb.WriteString(protocol.AggregateDelimiter) + sb.WriteString(fmt.Sprintf("%d%s", s.Samples, protocol.AggregateDelimiter)) for k, v := range s.FValues { sb.WriteString(k) sb.WriteString("=") - sb.WriteString(fmt.Sprintf("%v➔", v)) + sb.WriteString(fmt.Sprintf("%v%s", v, protocol.AggregateDelimiter)) } for k, v := range s.SValues { sb.WriteString(k) sb.WriteString("=") sb.WriteString(v) - sb.WriteString("➔") + sb.WriteString(protocol.AggregateDelimiter) } select { diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go new file mode 100644 index 0000000..2a570cd --- /dev/null +++ b/internal/protocol/protocol.go @@ -0,0 +1,10 @@ +package protocol + +const ( + // ProtocolCompat -ibility version + ProtocolCompat string = "4" + // MessageDelimiter delimits separate messages. + MessageDelimiter byte = '¬' + // AggregateDelimiter delimits parts of an aggregation message. + AggregateDelimiter string = "➔" +) diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 185e7c2..23e3aeb 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -17,8 +17,8 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" + "github.com/mimecast/dtail/internal/protocol" user "github.com/mimecast/dtail/internal/user/server" - "github.com/mimecast/dtail/internal/version" ) const ( @@ -92,24 +92,27 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { } if message[0] == '.' { // Handle hidden message (don't display to the user, interpreted by dtail client) - wholePayload := []byte(fmt.Sprintf("%s\n", message)) + wholePayload := []byte(fmt.Sprintf("%s%b", message, protocol.MessageDelimiter)) n = copy(p, wholePayload) return } // Handle normal server message (display to the user) - wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s\n", h.hostname, message)) + wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s%b", h.hostname, message, protocol.MessageDelimiter)) n = copy(p, wholePayload) return case message := <-h.aggregatedMessages: // Send mapreduce-aggregated data as a message. - data := fmt.Sprintf("AGGREGATE➔%s➔%s\n", h.hostname, message) + data := fmt.Sprintf("AGGREGATE%s%s%s%s%b", + protocol.AggregateDelimiter, h.hostname, + protocol.AggregateDelimiter, message, protocol.MessageDelimiter) wholePayload := []byte(data) n = copy(p, wholePayload) return case line := <-h.lines: + //fmt.Printf("<<<%d,%s>>>\n", len(line.Content), line.Content) // Send normal file content data as a message. serverInfo := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|", h.hostname, line.TransmittedPerc, line.Count, line.SourceID)) @@ -182,8 +185,8 @@ func (h *ServerHandler) handleProtocolVersion(args []string) ([]string, int, err return args, argc, errors.New("unable to determine protocol version") } - if args[1] != version.ProtocolCompat { - err := fmt.Errorf("server with protocol version '%s' but client with '%s', please update DTail", version.ProtocolCompat, args[1]) + if args[1] != protocol.ProtocolCompat { + err := fmt.Errorf("server with protocol version '%s' but client with '%s', please update DTail", protocol.ProtocolCompat, args[1]) return args, argc, err } diff --git a/internal/version/version.go b/internal/version/version.go index 3e683bd..7f07c83 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -6,6 +6,7 @@ import ( "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/protocol" ) const ( @@ -13,15 +14,13 @@ const ( Name string = "DTail" // Version of DTail. Version string = "3.2.0" - // ProtocolCompat -ibility version. - ProtocolCompat string = "3" // Additional information for DTail Additional string = "Have a lot of fun!" ) // String representation of the DTail version. func String() string { - return fmt.Sprintf("%s %v Protocol %s %s", Name, Version, ProtocolCompat, Additional) + return fmt.Sprintf("%s %v Protocol %s %s", Name, Version, protocol.ProtocolCompat, Additional) } // PaintedString is a prettier string representation of the DTail version. @@ -36,7 +35,7 @@ func PaintedString() string { version := color.PaintWithAttr(fmt.Sprintf(" %s ", Version), color.FgBlue, color.BgYellow, color.AttrBold) - protocol := color.Paint(fmt.Sprintf(" Protocol %s ", ProtocolCompat), + protocol := color.Paint(fmt.Sprintf(" Protocol %s ", protocol.ProtocolCompat), color.FgBlack, color.BgGreen) additional := color.PaintWithAttr(fmt.Sprintf(" %s ", Additional), |
