summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-08 19:10:50 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commit16dc57e1e1c28e9d762424e596223a980770e059 (patch)
treeea5a7d5caa7f4de7bd3b21e57d0e18c0d8507c7d /internal
parentc83c9e61a08c7ea1cb528bc26dfab25b46faa866 (diff)
mapreduce tables are in colors now too
Diffstat (limited to 'internal')
-rw-r--r--internal/clients/baseclient.go1
-rw-r--r--internal/clients/handlers/basehandler.go29
-rw-r--r--internal/clients/handlers/healthhandler.go11
-rw-r--r--internal/clients/handlers/maprhandler.go42
-rw-r--r--internal/clients/maprclient.go31
-rw-r--r--internal/color/paint.go13
-rw-r--r--internal/config/client.go48
-rw-r--r--internal/io/fs/readfile.go5
-rw-r--r--internal/mapr/aggregateset.go17
-rw-r--r--internal/mapr/client/aggregate.go18
-rw-r--r--internal/mapr/globalgroupset.go1
-rw-r--r--internal/mapr/groupset.go258
-rw-r--r--internal/mapr/logformat/default.go28
-rw-r--r--internal/mapr/logformat/default_test.go15
-rw-r--r--internal/mapr/logformat/generickv.go31
-rw-r--r--internal/mapr/logformat/parser.go2
-rw-r--r--internal/mapr/server/aggregate.go17
-rw-r--r--internal/protocol/protocol.go11
-rw-r--r--internal/server/handlers/controlhandler.go3
-rw-r--r--internal/server/handlers/serverhandler.go118
20 files changed, 469 insertions, 230 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index f20156f..de0c101 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -71,6 +71,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
go c.hostKeyCallback.PromptAddHosts(ctx)
// Print client stats every time something on statsCh is recieved.
go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet)
+
// Keep count of active connections
active := make(chan struct{}, len(c.connections))
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 63ceaac..51f33c1 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"encoding/base64"
"fmt"
"io"
@@ -17,7 +18,7 @@ type baseHandler struct {
server string
shellStarted bool
commands chan string
- receiveBuf []byte
+ receiveBuf bytes.Buffer
status int
}
@@ -56,14 +57,23 @@ func (h *baseHandler) SendMessage(command string) error {
// Read data from the dtail server via Writer interface.
func (h *baseHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- if b == protocol.MessageDelimiter || b == '\n' {
- if len(h.receiveBuf) == 0 {
+ switch b {
+ /*
+ // TODO: Next DTail version make it so that '\n' gets ignored. For now
+ // leave it for compatibility with older DTail server + ability to display
+ // the protocol mismatch warn message.
+ case '\n' {
+ continue
+ */
+ case '\n', protocol.MessageDelimiter:
+ message := h.receiveBuf.String()
+ if len(message) == 0 {
continue
}
- message := string(h.receiveBuf)
h.handleMessageType(message)
- } else {
- h.receiveBuf = append(h.receiveBuf, b)
+ h.receiveBuf.Reset()
+ default:
+ h.receiveBuf.WriteByte(b)
}
}
@@ -78,25 +88,22 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
case <-h.Done():
return 0, io.EOF
}
-
return
}
// Handle various message types.
func (h *baseHandler) handleMessageType(message string) {
- if len(h.receiveBuf) == 0 {
+ if len(message) == 0 {
return
}
// Hidden server commands starti with a dot "."
- if h.receiveBuf[0] == '.' {
+ if message[0] == '.' {
h.handleHiddenMessage(message)
- h.receiveBuf = h.receiveBuf[:0]
return
}
logger.Raw(message)
- h.receiveBuf = h.receiveBuf[:0]
}
// Handle messages received from server which are not meant to be displayed
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 213748c..eca0348 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"errors"
"fmt"
"time"
@@ -13,7 +14,7 @@ import (
type HealthHandler struct {
done *internal.Done
// Buffer of incoming data from server.
- receiveBuf []byte
+ receiveBuf bytes.Buffer
// To send commands to the server.
commands chan string
// To receive messages from the server.
@@ -72,10 +73,10 @@ func (h *HealthHandler) SendMessage(command string) error {
// Server writes byte stream to client.
func (h *HealthHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.receiveBuf = append(h.receiveBuf, b)
- if b == protocol.MessageDelimiter { // '\n' {
- h.receive <- string(h.receiveBuf)
- h.receiveBuf = h.receiveBuf[:0]
+ h.receiveBuf.WriteByte(b)
+ if b == protocol.MessageDelimiter {
+ h.receive <- h.receiveBuf.String()
+ h.receiveBuf.Reset()
}
}
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index afad507..65b1454 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -15,7 +15,6 @@ type MaprHandler struct {
baseHandler
aggregate *client.Aggregate
query *mapr.Query
- count uint64
}
// NewMaprHandler returns a new mapreduce client handler.
@@ -36,19 +35,20 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr
// Read data from the dtail server via Writer interface.
func (h *MaprHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
- h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b)
- if b == protocol.MessageDelimiter { // '\n' {
- if len(h.baseHandler.receiveBuf) == 0 {
- continue
+ switch b {
+ case '\n':
+ continue
+ case protocol.MessageDelimiter:
+ message := h.baseHandler.receiveBuf.String()
+ logger.Debug(message)
+ if message[0] == 'A' {
+ h.handleAggregateMessage(message)
+ } else {
+ h.baseHandler.handleMessageType(message)
}
- message := string(h.baseHandler.receiveBuf)
-
- if h.baseHandler.receiveBuf[0] == 'A' {
- h.handleAggregateMessage(strings.TrimSpace(message))
- h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0]
- continue
- }
- h.baseHandler.handleMessageType(message)
+ h.baseHandler.receiveBuf.Reset()
+ default:
+ h.baseHandler.receiveBuf.WriteByte(b)
}
}
@@ -58,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
// Handle a message received from server including mapr aggregation
// related data.
func (h *MaprHandler) handleAggregateMessage(message string) {
- h.count++
- parts := strings.Split(message, protocol.AggregateDelimiter)
-
- // Index 0 contains 'AGGREGATE', 1 contains server host.
- // Aggregation data begins from index 2.
- logger.Debug("Received aggregate data", h.server, h.count, parts)
- h.aggregate.Aggregate(parts[2:])
- logger.Debug("Aggregated aggregate data", h.server, h.count)
+ parts := strings.SplitN(message, protocol.FieldDelimiter, 3)
+ if len(parts) != 3 {
+ logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts")
+ return
+ }
+ if err := h.aggregate.Aggregate(parts[2]); err != nil {
+ logger.Error("Unable to aggregate data", h.server, message, err)
+ }
}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 77b674b..cab9a6c 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -9,6 +9,8 @@ import (
"time"
"github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/color"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/omode"
@@ -37,6 +39,8 @@ type MaprClient struct {
query *mapr.Query
// Additative result or new result every interval run?
cumulative bool
+ // The last result string received
+ lastResult string
}
// NewMaprClient returns a new mapreduce client.
@@ -154,24 +158,37 @@ func (c *MaprClient) reportResults() {
func (c *MaprClient) printResults() {
var result string
var err error
- var numLines int
+ var numRows int
if c.cumulative {
- result, numLines, err = c.globalGroup.Result(c.query)
+ result, numRows, err = c.globalGroup.Result(c.query)
} else {
- result, numLines, err = c.globalGroup.SwapOut().Result(c.query)
+ result, numRows, err = c.globalGroup.SwapOut().Result(c.query)
}
+
if err != nil {
logger.FatalExit(err)
}
- if numLines == 0 {
- logger.Warn("Empty result set this time...")
+ if result == c.lastResult {
+ logger.Debug("Result hasn't changed compared to last time...")
return
}
+ c.lastResult = result
- //logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
- logger.Raw(c.query.RawQuery)
+ if numRows == 0 {
+ logger.Debug("Empty result set this time...")
+ return
+ }
+
+ rawQuery := c.query.RawQuery
+ if config.Client.TermColorsEnable {
+ rawQuery = color.PaintStrWithAttr(rawQuery,
+ config.Client.TermColors.MaprTable.RawQueryFg,
+ config.Client.TermColors.MaprTable.RawQueryBg,
+ config.Client.TermColors.MaprTable.RawQueryAttr)
+ }
+ logger.Raw(rawQuery)
logger.Raw(result)
}
diff --git a/internal/color/paint.go b/internal/color/paint.go
index 5430acd..53c9abb 100644
--- a/internal/color/paint.go
+++ b/internal/color/paint.go
@@ -63,6 +63,19 @@ func PaintWithAttr(sb *strings.Builder, text string, fg FgColor, bg BgColor, att
sb.WriteString(string(FgDefault))
}
+// PaintWithAttrs is similar to PaintWithAttr, but it takes multiple text attributes.
+func PaintWithAttrs(sb *strings.Builder, text string, fg FgColor, bg BgColor, attrs []Attribute) {
+ sb.WriteString(string(fg))
+ sb.WriteString(string(bg))
+ for _, attr := range attrs {
+ sb.WriteString(string(attr))
+ }
+ sb.WriteString(text)
+ sb.WriteString(string(AttrReset))
+ sb.WriteString(string(BgDefault))
+ sb.WriteString(string(FgDefault))
+}
+
// ResetWithAttr resets background, foreground and attributes.
func ResetWithAttr(sb *strings.Builder) {
sb.WriteString(string(AttrReset))
diff --git a/internal/config/client.go b/internal/config/client.go
index 3c2f7de..795c4a4 100644
--- a/internal/config/client.go
+++ b/internal/config/client.go
@@ -71,11 +71,32 @@ type commonTermColors struct {
SeverityWarnFg color.FgColor
}
+type maprTableTermColors struct {
+ DataAttr color.Attribute
+ DataBg color.BgColor
+ DataFg color.FgColor
+ DelimiterAttr color.Attribute
+ DelimiterBg color.BgColor
+ DelimiterFg color.FgColor
+ HeaderAttr color.Attribute
+ HeaderBg color.BgColor
+ HeaderDelimiterAttr color.Attribute
+ HeaderDelimiterBg color.BgColor
+ HeaderDelimiterFg color.FgColor
+ HeaderFg color.FgColor
+ HeaderGroupKeyAttr color.Attribute
+ HeaderSortKeyAttr color.Attribute
+ RawQueryAttr color.Attribute
+ RawQueryBg color.BgColor
+ RawQueryFg color.FgColor
+}
+
type termColors struct {
- Remote remoteTermColors
- Client clientTermColors
- Server serverTermColors
- Common commonTermColors
+ Remote remoteTermColors
+ Client clientTermColors
+ Server serverTermColors
+ Common commonTermColors
+ MaprTable maprTableTermColors
}
// ClientConfig represents a DTail client configuration (empty as of now as there
@@ -155,6 +176,25 @@ func newDefaultClientConfig() *ClientConfig {
SeverityWarnBg: color.BgBlack,
SeverityWarnFg: color.FgWhite,
},
+ MaprTable: maprTableTermColors{
+ DataAttr: color.AttrNone,
+ DataBg: color.BgBlue,
+ DataFg: color.FgWhite,
+ DelimiterAttr: color.AttrDim,
+ DelimiterBg: color.BgBlue,
+ DelimiterFg: color.FgWhite,
+ HeaderAttr: color.AttrBold,
+ HeaderBg: color.BgBlue,
+ HeaderFg: color.FgWhite,
+ HeaderDelimiterAttr: color.AttrDim,
+ HeaderDelimiterBg: color.BgBlue,
+ HeaderDelimiterFg: color.FgWhite,
+ HeaderSortKeyAttr: color.AttrUnderline,
+ HeaderGroupKeyAttr: color.AttrReverse,
+ RawQueryAttr: color.AttrDim,
+ RawQueryBg: color.BgBlack,
+ RawQueryFg: color.FgCyan,
+ },
},
}
}
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index f2f672a..c0d44dd 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -16,7 +16,6 @@ import (
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/io/pool"
- "github.com/mimecast/dtail/internal/protocol"
"github.com/mimecast/dtail/internal/regex"
"github.com/DataDog/zstd"
@@ -187,7 +186,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu
time.Sleep(time.Millisecond * 100)
continue
}
- message.WriteByte(protocol.MessageDelimiter)
+ message.WriteString("\n")
select {
case rawLines <- message:
message = pool.BytesBuffer.Get().(*bytes.Buffer)
@@ -202,7 +201,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu
f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
warnedAboutLongLine = true
}
- message.WriteByte(protocol.MessageDelimiter)
+ message.WriteString("\n")
select {
case rawLines <- message:
message = pool.BytesBuffer.Get().(*bytes.Buffer)
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go
index 029d87b..47f4925 100644
--- a/internal/mapr/aggregateset.go
+++ b/internal/mapr/aggregateset.go
@@ -6,6 +6,8 @@ import (
"strconv"
"strings"
+ "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -68,22 +70,25 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
// Serialize the aggregate set so it can be sent over the wire.
func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) {
- //logger.Trace("Serialising mapr.AggregateSet", s)
- var sb strings.Builder
+ logger.Trace("Serialising mapr.AggregateSet", s)
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
sb.WriteString(groupKey)
sb.WriteString(protocol.AggregateDelimiter)
- sb.WriteString(fmt.Sprintf("%d%s", s.Samples, protocol.AggregateDelimiter))
+ sb.WriteString(fmt.Sprintf("%d", s.Samples))
+ sb.WriteString(protocol.AggregateDelimiter)
for k, v := range s.FValues {
sb.WriteString(k)
- sb.WriteString("=")
- sb.WriteString(fmt.Sprintf("%v%s", v, protocol.AggregateDelimiter))
+ sb.WriteString(protocol.AggregateKVDelimiter)
+ sb.WriteString(fmt.Sprintf("%v", v))
+ sb.WriteString(protocol.AggregateDelimiter)
}
for k, v := range s.SValues {
sb.WriteString(k)
- sb.WriteString("=")
+ sb.WriteString(protocol.AggregateKVDelimiter)
sb.WriteString(v)
sb.WriteString(protocol.AggregateDelimiter)
}
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 10b34d4..5cc09a1 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -1,11 +1,13 @@
package client
import (
+ "fmt"
"strconv"
"strings"
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
+ "github.com/mimecast/dtail/internal/protocol"
)
// Aggregate mapreduce data on the DTail client side.
@@ -31,12 +33,18 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou
}
// Aggregate data from mapr log line into local (and global) group sets.
-func (a *Aggregate) Aggregate(parts []string) {
+func (a *Aggregate) Aggregate(message string) error {
+ parts := strings.Split(message, protocol.AggregateDelimiter)
+ if len(parts) < 4 {
+ return fmt.Errorf("aggregate message without any real data")
+ }
+
groupKey := parts[0]
samples, err := strconv.Atoi(parts[1])
if err != nil {
- logger.FatalExit("Unable to parse sample count", parts[1], err, parts)
+ return fmt.Errorf("unable to parse sample count '%s': %v", parts[1], err)
}
+
fields := a.makeFields(parts[2:])
set := a.group.GetSet(groupKey)
@@ -63,6 +71,8 @@ func (a *Aggregate) Aggregate(parts []string) {
// Re-init local group (make it empty again).
a.group.InitSet()
}
+
+ return nil
}
// Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"].
@@ -70,8 +80,8 @@ func (a *Aggregate) makeFields(parts []string) map[string]string {
fields := make(map[string]string, len(parts))
for _, part := range parts {
- kv := strings.SplitN(part, "=", 2)
- if len(kv) < 2 {
+ kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2)
+ if len(kv) != 2 {
continue
}
fields[kv[0]] = kv[1]
diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go
index cfab506..21bf990 100644
--- a/internal/mapr/globalgroupset.go
+++ b/internal/mapr/globalgroupset.go
@@ -48,6 +48,7 @@ func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, erro
// Merge a group set into the global group set.
func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error {
+
for groupKey, set := range group.sets {
s := g.GetSet(groupKey)
if err := s.Merge(query, set); err != nil {
diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go
index 6ee2811..d29559a 100644
--- a/internal/mapr/groupset.go
+++ b/internal/mapr/groupset.go
@@ -4,13 +4,16 @@ import (
"context"
"errors"
"fmt"
- "io/ioutil"
"os"
"sort"
"strconv"
"strings"
+ "github.com/mimecast/dtail/internal/color"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
)
// GroupSet represents a map of aggregate sets. The group sets
@@ -22,6 +25,14 @@ type GroupSet struct {
sets map[string]*AggregateSet
}
+// Internal helper type
+type result struct {
+ groupKey string
+ values []string
+ widths []int
+ orderBy float64
+}
+
// NewGroupSet returns a new empty group set.
func NewGroupSet() *GroupSet {
g := GroupSet{}
@@ -58,17 +69,118 @@ func (g *GroupSet) Serialize(ctx context.Context, ch chan<- string) {
// Result returns a nicely formated result of the query from the group set.
func (g *GroupSet) Result(query *Query) (string, int, error) {
- return g.limitedResult(query, query.Limit, "\t", " ", false)
+ rows, widths, err := g.result(query, true)
+ if err != nil {
+ return "", 0, err
+ }
+
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
+
+ // Generate header now
+ lastIndex := len(query.Select) - 1
+ for i, sc := range query.Select {
+ format := fmt.Sprintf(" %%%ds ", widths[i])
+ str := fmt.Sprintf(format, sc.FieldStorage)
+ if config.Client.TermColorsEnable {
+ attrs := []color.Attribute{config.Client.TermColors.MaprTable.HeaderAttr}
+ if sc.FieldStorage == query.OrderBy {
+ attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderSortKeyAttr)
+ }
+ for _, groupBy := range query.GroupBy {
+ if sc.FieldStorage == groupBy {
+ attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderGroupKeyAttr)
+ break
+ }
+ }
+ color.PaintWithAttrs(sb, str,
+ config.Client.TermColors.MaprTable.HeaderFg,
+ config.Client.TermColors.MaprTable.HeaderBg,
+ attrs)
+ } else {
+ sb.WriteString(str)
+ }
+
+ if i == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+
+ for i := 0; i < len(query.Select); i++ {
+ str := fmt.Sprintf("-%s-", strings.Repeat("-", widths[i]))
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, str,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(str)
+ }
+ if i == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+
+ // And now write the data
+ for i, r := range rows {
+ if i == query.Limit {
+ break
+ }
+ for j, value := range r.values {
+ format := fmt.Sprintf(" %%%ds ", widths[j])
+ str := fmt.Sprintf(format, value)
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, str,
+ config.Client.TermColors.MaprTable.DataFg,
+ config.Client.TermColors.MaprTable.DataBg,
+ config.Client.TermColors.MaprTable.DataAttr)
+ } else {
+ sb.WriteString(str)
+ }
+
+ if j == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.DelimiterFg,
+ config.Client.TermColors.MaprTable.DelimiterBg,
+ config.Client.TermColors.MaprTable.DelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+ }
+
+ return sb.String(), len(rows), nil
}
-// WriteResult writes the result to an outfile.
+// WriteResult writes the result to an CSV outfile.
func (g *GroupSet) WriteResult(query *Query) error {
if !query.HasOutfile() {
return errors.New("No outfile specified")
}
- // -1: Don't limit the result, include all data sets
- result, _, err := g.limitedResult(query, -1, "", ",", true)
+ rows, _, err := g.result(query, false)
if err != nil {
return err
}
@@ -76,9 +188,37 @@ func (g *GroupSet) WriteResult(query *Query) error {
logger.Info("Writing outfile", query.Outfile)
tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile)
- if err := ioutil.WriteFile(tmpOutfile, []byte(result), 0644); err != nil {
+ file, err := os.Create(tmpOutfile)
+ if err != nil {
return err
}
+ defer file.Close()
+
+ // Generate header now
+ lastIndex := len(query.Select) - 1
+ for i, sc := range query.Select {
+ file.WriteString(sc.FieldStorage)
+ if i == lastIndex {
+ continue
+ }
+ file.WriteString(protocol.CSVDelimiter)
+ }
+ file.WriteString("\n")
+
+ // And now write the data
+ for i, r := range rows {
+ if i == query.Limit {
+ break
+ }
+ for j, value := range r.values {
+ file.WriteString(value)
+ if j == lastIndex {
+ continue
+ }
+ file.WriteString(protocol.CSVDelimiter)
+ }
+ file.WriteString("\n")
+ }
if err := os.Rename(tmpOutfile, query.Outfile); err != nil {
os.Remove(tmpOutfile)
@@ -88,32 +228,22 @@ func (g *GroupSet) WriteResult(query *Query) error {
return nil
}
-// Return a nicely formated result of the query from the group set.
-func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) {
- type result struct {
- groupKey string
- resultStr string
- orderBy float64
- }
+// Return a sorted result slice of the query from the group set.
+func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, error) {
+ var rows []result
+ widths := make([]int, len(query.Select))
- var resultSlice []result
+ var valueStr string
+ var value float64
for groupKey, set := range g.sets {
- var sb strings.Builder
r := result{groupKey: groupKey}
- lastIndex := len(query.Select) - 1
for i, sc := range query.Select {
- storage := sc.FieldStorage
- orderByThis := storage == query.OrderBy
-
switch sc.Operation {
case Count:
- value := set.FValues[storage]
- sb.WriteString(fmt.Sprintf("%d", int(value)))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage]
+ valueStr = fmt.Sprintf("%d", int(value))
case Len:
fallthrough
case Sum:
@@ -121,74 +251,48 @@ func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSepa
case Min:
fallthrough
case Max:
- value := set.FValues[storage]
- sb.WriteString(fmt.Sprintf("%f", value))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage]
+ valueStr = fmt.Sprintf("%f", value)
case Last:
- value := set.SValues[storage]
- if orderByThis {
- f, err := strconv.ParseFloat(value, 64)
- if err == nil {
- r.orderBy = f
- }
- }
- sb.WriteString(value)
+ valueStr = set.SValues[sc.FieldStorage]
+ value, _ = strconv.ParseFloat(valueStr, 64)
case Avg:
- value := set.FValues[storage] / float64(set.Samples)
- sb.WriteString(fmt.Sprintf("%f", value))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage] / float64(set.Samples)
+ valueStr = fmt.Sprintf("%f", value)
default:
- return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
+ return rows, widths, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
}
- if i != lastIndex {
- sb.WriteString(fieldSeparator)
+
+ if sc.FieldStorage == query.OrderBy {
+ r.orderBy = value
+ }
+ r.values = append(r.values, valueStr)
+
+ if !gatherWidths {
+ continue
+ }
+ if widths[i] < len(sc.FieldStorage) {
+ widths[i] = len(sc.FieldStorage)
+ }
+ if widths[i] < len(valueStr) {
+ widths[i] = len(valueStr)
}
}
- r.resultStr = sb.String()
- resultSlice = append(resultSlice, r)
+ rows = append(rows, r)
}
if query.OrderBy != "" {
if query.ReverseOrder {
- sort.SliceStable(resultSlice, func(i, j int) bool {
- return resultSlice[i].orderBy < resultSlice[j].orderBy
+ sort.SliceStable(rows, func(i, j int) bool {
+ return rows[i].orderBy < rows[j].orderBy
})
} else {
- sort.SliceStable(resultSlice, func(i, j int) bool {
- return resultSlice[i].orderBy > resultSlice[j].orderBy
+ sort.SliceStable(rows, func(i, j int) bool {
+ return rows[i].orderBy > rows[j].orderBy
})
}
}
- var sb strings.Builder
-
- // Write header first
- if addHeader {
- lastIndex := len(query.Select) - 1
- sb.WriteString(lineStarter)
- for i, sc := range query.Select {
- sb.WriteString(sc.FieldStorage)
- if i != lastIndex {
- sb.WriteString(fieldSeparator)
- }
- }
- sb.WriteString("\n")
- }
-
- // And now write the data
- for i, r := range resultSlice {
- if i == limit {
- break
- }
- sb.WriteString(lineStarter)
- sb.WriteString(r.resultStr)
- sb.WriteString("\n")
- }
-
- return sb.String(), len(resultSlice), nil
+ return rows, widths, nil
}
diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go
index da976bd..c67137e 100644
--- a/internal/mapr/logformat/default.go
+++ b/internal/mapr/logformat/default.go
@@ -1,16 +1,22 @@
package logformat
import (
+ "fmt"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/protocol"
)
-// MakeFieldsDEFAULT is the default log file mapreduce parser.
+// MakeFieldsDEFAULT is the default DTail log file key-value parser.
func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
splitted := strings.Split(maprLine, protocol.FieldDelimiter)
- fields := make(map[string]string, len(splitted))
+
+ if len(splitted) < 3 || !strings.HasPrefix(splitted[3], "MAPREDUCE:") || !strings.HasPrefix(splitted[0], "INFO") {
+ // Not a DTail mapreduce log line.
+ return nil, IgnoreFieldsErr
+ }
+
+ fields := make(map[string]string, len(splitted)+8)
fields["*"] = "*"
fields["$line"] = maprLine
@@ -19,20 +25,14 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
fields["$timezone"] = p.timeZoneName
fields["$timeoffset"] = p.timeZoneOffset
- kvStart := 0
- // DTail mapreduce format
- if len(splitted) > 3 && strings.HasPrefix(splitted[3], "MAPREDUCE:") {
- fields["$severity"] = splitted[0]
- // TODO: Parse time like we do at Mimecast
- fields["$time"] = splitted[1]
- kvStart = 4
- }
+ fields["$severity"] = splitted[0]
+ // TODO: Parse time like we do at Mimecast
+ fields["$time"] = splitted[1]
- for _, kv := range splitted[kvStart:] {
+ for _, kv := range splitted[4:] {
keyAndValue := strings.SplitN(kv, "=", 2)
if len(keyAndValue) != 2 {
- logger.Debug("Unable to parse key-value token, ignoring it", kv)
- continue
+ return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv)
}
fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
}
diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go
index 6284008..79911d1 100644
--- a/internal/mapr/logformat/default_test.go
+++ b/internal/mapr/logformat/default_test.go
@@ -11,8 +11,8 @@ func TestDefaultLogFormat(t *testing.T) {
}
inputs := []string{
- "foo=bar|baz=bay",
"INFO|20210907-065632|SERVER|MAPREDUCE:TEST|foo=bar|baz=bay",
+ "INFO|20210907-065732|CLIENT|MAPREDUCE:YOMAN|baz=bay|foo=bar",
}
for _, input := range inputs {
@@ -23,20 +23,21 @@ func TestDefaultLogFormat(t *testing.T) {
}
if bar, ok := fields["foo"]; !ok {
- t.Errorf("Expected field 'foo', but no such field there\n")
+ t.Errorf("Expected field 'foo', but no such field there in '%s'\n", input)
} else if bar != "bar" {
- t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar)
+ t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", bar, input)
}
if bay, ok := fields["baz"]; !ok {
- t.Errorf("Expected field 'baz', but no such field there\n")
+ t.Errorf("Expected field 'baz', but no such field there in '%s'\n", input)
} else if bay != "bay" {
- t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay)
+ t.Errorf("Expected 'bay' stored in field 'baz', but got '%s' in '%s'\n", bay, input)
}
}
- if _, err := parser.MakeFields("foo=bar|bazbay"); err == nil {
- t.Errorf("Expected error but didn't")
+ fields, err := parser.MakeFields("foozoo=bar|bazbay")
+ if _, ok := fields["foo"]; ok {
+ t.Errorf("Expected fiending field 'foo', but found it\n")
}
}
diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go
new file mode 100644
index 0000000..23d75cb
--- /dev/null
+++ b/internal/mapr/logformat/generickv.go
@@ -0,0 +1,31 @@
+package logformat
+
+import (
+ "strings"
+
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+// MakeFieldsGENERICKV is the generic key-value logfile parser.
+func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error) {
+ splitted := strings.Split(maprLine, protocol.FieldDelimiter)
+ fields := make(map[string]string, len(splitted))
+
+ fields["*"] = "*"
+ fields["$line"] = maprLine
+ fields["$empty"] = ""
+ fields["$hostname"] = p.hostname
+ fields["$timezone"] = p.timeZoneName
+ fields["$timeoffset"] = p.timeZoneOffset
+
+ for _, kv := range splitted[0:] {
+ keyAndValue := strings.SplitN(kv, "=", 2)
+ if len(keyAndValue) != 2 {
+ //logger.Debug("Unable to parse key-value token, ignoring it", kv)
+ continue
+ }
+ fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
+ }
+
+ return fields, nil
+}
diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go
index c53729a..6582b5f 100644
--- a/internal/mapr/logformat/parser.go
+++ b/internal/mapr/logformat/parser.go
@@ -12,6 +12,8 @@ import (
"github.com/mimecast/dtail/internal/mapr"
)
+var IgnoreFieldsErr error = errors.New("Ignore this field set")
+
// Parser is used to parse the mapreduce information from the server log files.
type Parser struct {
hostname string
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 9106f52..a6d6bb1 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -13,6 +13,7 @@ import (
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
+ "github.com/mimecast/dtail/internal/protocol"
)
// Aggregate is for aggregating mapreduce data on the DTail server side.
@@ -89,7 +90,6 @@ func (a *Aggregate) Shutdown() {
// Start an aggregation.
func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
-
myCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -109,6 +109,7 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
fieldsCh = a.addFields(myCtx, fieldsCh)
}
+ // Periodically pre-aggregate data every a.query.Interval seconds.
go a.aggregateTimer(myCtx)
a.makeMaprLines(myCtx, fieldsCh, maprLines)
}
@@ -139,13 +140,16 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
maprLine := strings.TrimSpace(line.Content.String())
pool.RecycleBytesBuffer(line.Content)
- fields, err := a.parser.MakeFields(maprLine)
- logger.Debug(fields, err)
+ fields, err := a.parser.MakeFields(maprLine)
if err != nil {
- logger.Error(err)
+ // Should fields be ignored anyway?
+ if err != logformat.IgnoreFieldsErr {
+ logger.Error(fields, err)
+ }
continue
}
+
if !a.query.WhereClause(fields) {
continue
}
@@ -170,7 +174,7 @@ func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]st
defer close(ch)
for {
- // fieldsCh will be closed via 'makeFields' if ctx is done
+ // fieldsCh will be closed via 'makeFields' when ctx is done
fields, ok := <-fieldsCh
if !ok {
return
@@ -219,12 +223,11 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin
}
func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
- //logger.Trace("Aggregating", group, fields)
var sb strings.Builder
for i, field := range a.query.GroupBy {
if i > 0 {
- sb.WriteString(" ")
+ sb.WriteString(protocol.AggregateGroupKeyCombinator)
}
if val, ok := fields[field]; ok {
sb.WriteString(val)
diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go
index d3035e4..8c9e861 100644
--- a/internal/protocol/protocol.go
+++ b/internal/protocol/protocol.go
@@ -5,8 +5,15 @@ const (
ProtocolCompat string = "4"
// MessageDelimiter delimits separate messages.
MessageDelimiter byte = '¬'
- // FieldDelimiter delimits aggregation fields.
+ // FieldDelimiter delimits messagefields.
FieldDelimiter string = "|"
+ // CSVDelimiter delimits CSV file fields.kj:w
+ CSVDelimiter string = ","
+ // AggregateKVDelimiter delimits key-values of an aggregation message.
+ AggregateKVDelimiter string = "≔"
// AggregateDelimiter delimits parts of an aggregation message.
- AggregateDelimiter string = "➔"
+ AggregateDelimiter string = "∥"
+ // AggregateDelimiter string = "⦀"
+ // AggregateGroupKeyCombinator combines the group set keys.
+ AggregateGroupKeyCombinator string = ","
)
diff --git a/internal/server/handlers/controlhandler.go b/internal/server/handlers/controlhandler.go
index a217b40..1e17c78 100644
--- a/internal/server/handlers/controlhandler.go
+++ b/internal/server/handlers/controlhandler.go
@@ -8,7 +8,6 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/io/logger"
- "github.com/mimecast/dtail/internal/protocol"
user "github.com/mimecast/dtail/internal/user/server"
)
@@ -57,7 +56,7 @@ func (h *ControlHandler) Read(p []byte) (n int, err error) {
for {
select {
case message := <-h.serverMessages:
- wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s%b", h.hostname, message, protocol.MessageDelimiter))
+ wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s\n", h.hostname, message))
n = copy(p, wholePayload)
return
case <-h.done.Done():
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 14f46a3..e74e686 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -38,7 +38,6 @@ type ServerHandler struct {
aggregate *server.Aggregate
aggregatedMessages chan string
serverMessages chan string
- payload []byte
hostname string
user *user.User
catLimiter chan struct{}
@@ -47,6 +46,8 @@ type ServerHandler struct {
activeCommands int32
activeReaders int32
quiet bool
+ readBuf bytes.Buffer
+ writeBuf bytes.Buffer
}
// NewServerHandler returns the server handler.
@@ -86,77 +87,74 @@ func (h *ServerHandler) Done() <-chan struct{} {
// Read is to send data to the dtail client via Reader interface.
func (h *ServerHandler) Read(p []byte) (n int, err error) {
- for {
- select {
- case message := <-h.serverMessages:
- if len(message) == 0 {
- logger.Warn(h.user, "Empty message recieved")
- return
- }
- if message[0] == '.' {
- // Handle hidden message (don't display to the user, interpreted by dtail client)
- payload := []byte(fmt.Sprintf("%s%s", message, string(protocol.MessageDelimiter)))
- n = copy(p, payload)
- return
- }
-
- // Handle normal server message (display to the user)
- payload := []byte(fmt.Sprintf("SERVER|%s|%s%s", h.hostname, message, string(protocol.MessageDelimiter)))
- n = copy(p, payload)
- return
+ defer h.readBuf.Reset()
- case message := <-h.aggregatedMessages:
- // Send mapreduce-aggregated data as a message.
- buf := pool.BytesBuffer.Get().(*bytes.Buffer)
- buf.WriteString("AGGREGATE")
- buf.WriteString(protocol.AggregateDelimiter)
- buf.WriteString(h.hostname)
- buf.WriteString(protocol.AggregateDelimiter)
- buf.WriteString(message)
- buf.WriteByte(protocol.MessageDelimiter)
- n = copy(p, buf.Bytes())
- pool.RecycleBytesBuffer(buf)
+ select {
+ case message := <-h.serverMessages:
+ if message[0] == '.' {
+ // Handle hidden message (don't display to the user, interpreted by dtail client)
+ h.readBuf.WriteString(message)
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ n = copy(p, h.readBuf.Bytes())
return
+ }
- case line := <-h.lines:
- buf := pool.BytesBuffer.Get().(*bytes.Buffer)
- buf.WriteString("REMOTE")
- buf.WriteString(protocol.FieldDelimiter)
- buf.WriteString(h.hostname)
- buf.WriteString(protocol.FieldDelimiter)
- buf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc))
- buf.WriteString(protocol.FieldDelimiter)
- buf.WriteString(fmt.Sprintf("%v", line.Count))
- buf.WriteString(protocol.FieldDelimiter)
- buf.WriteString(line.SourceID)
- buf.WriteString(protocol.FieldDelimiter)
- payload := append(buf.Bytes(), line.Content.Bytes()...)
- n = copy(p, payload)
- pool.RecycleBytesBuffer(buf)
- pool.RecycleBytesBuffer(line.Content)
+ // Handle normal server message (display to the user)
+ h.readBuf.WriteString("SERVER")
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(h.hostname)
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(message)
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ n = copy(p, h.readBuf.Bytes())
+
+ case message := <-h.aggregatedMessages:
+ // Send mapreduce-aggregated data as a message.
+ h.readBuf.WriteString("AGGREGATE")
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(h.hostname)
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(message)
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ n = copy(p, h.readBuf.Bytes())
+
+ case line := <-h.lines:
+ h.readBuf.WriteString("REMOTE")
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(h.hostname)
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc))
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(fmt.Sprintf("%v", line.Count))
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(line.SourceID)
+ h.readBuf.WriteString(protocol.FieldDelimiter)
+ h.readBuf.WriteString(line.Content.String())
+ h.readBuf.WriteByte(protocol.MessageDelimiter)
+ n = copy(p, h.readBuf.Bytes())
+ pool.RecycleBytesBuffer(line.Content)
+
+ case <-time.After(time.Second):
+ // Once in a while check whether we are done.
+ select {
+ case <-h.done.Done():
+ err = io.EOF
return
-
- case <-time.After(time.Second):
- // Once in a while check whether we are done.
- select {
- case <-h.done.Done():
- return 0, io.EOF
- default:
- }
+ default:
}
}
+ return
}
// Write is to receive data from the dtail client via Writer interface.
func (h *ServerHandler) Write(p []byte) (n int, err error) {
- for _, c := range p {
- switch c {
+ for _, b := range p {
+ switch b {
case ';':
- commandStr := strings.TrimSpace(string(h.payload))
- h.handleCommand(commandStr)
- h.payload = nil
+ h.handleCommand(string(h.writeBuf.Bytes()))
+ h.writeBuf.Reset()
default:
- h.payload = append(h.payload, c)
+ h.writeBuf.WriteByte(b)
}
}