diff options
| author | Paul Buetow <paul@buetow.org> | 2021-09-08 19:10:50 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-02 12:26:29 +0300 |
| commit | 16dc57e1e1c28e9d762424e596223a980770e059 (patch) | |
| tree | ea5a7d5caa7f4de7bd3b21e57d0e18c0d8507c7d /internal | |
| parent | c83c9e61a08c7ea1cb528bc26dfab25b46faa866 (diff) | |
mapreduce tables are in colors now too
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/baseclient.go | 1 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 29 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 11 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 42 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 31 | ||||
| -rw-r--r-- | internal/color/paint.go | 13 | ||||
| -rw-r--r-- | internal/config/client.go | 48 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 5 | ||||
| -rw-r--r-- | internal/mapr/aggregateset.go | 17 | ||||
| -rw-r--r-- | internal/mapr/client/aggregate.go | 18 | ||||
| -rw-r--r-- | internal/mapr/globalgroupset.go | 1 | ||||
| -rw-r--r-- | internal/mapr/groupset.go | 258 | ||||
| -rw-r--r-- | internal/mapr/logformat/default.go | 28 | ||||
| -rw-r--r-- | internal/mapr/logformat/default_test.go | 15 | ||||
| -rw-r--r-- | internal/mapr/logformat/generickv.go | 31 | ||||
| -rw-r--r-- | internal/mapr/logformat/parser.go | 2 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 17 | ||||
| -rw-r--r-- | internal/protocol/protocol.go | 11 | ||||
| -rw-r--r-- | internal/server/handlers/controlhandler.go | 3 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 118 |
20 files changed, 469 insertions, 230 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index f20156f..de0c101 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -71,6 +71,7 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i go c.hostKeyCallback.PromptAddHosts(ctx) // Print client stats every time something on statsCh is recieved. go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) + // Keep count of active connections active := make(chan struct{}, len(c.connections)) diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 63ceaac..51f33c1 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "encoding/base64" "fmt" "io" @@ -17,7 +18,7 @@ type baseHandler struct { server string shellStarted bool commands chan string - receiveBuf []byte + receiveBuf bytes.Buffer status int } @@ -56,14 +57,23 @@ func (h *baseHandler) SendMessage(command string) error { // Read data from the dtail server via Writer interface. func (h *baseHandler) Write(p []byte) (n int, err error) { for _, b := range p { - if b == protocol.MessageDelimiter || b == '\n' { - if len(h.receiveBuf) == 0 { + switch b { + /* + // TODO: Next DTail version make it so that '\n' gets ignored. For now + // leave it for compatibility with older DTail server + ability to display + // the protocol mismatch warn message. + case '\n' { + continue + */ + case '\n', protocol.MessageDelimiter: + message := h.receiveBuf.String() + if len(message) == 0 { continue } - message := string(h.receiveBuf) h.handleMessageType(message) - } else { - h.receiveBuf = append(h.receiveBuf, b) + h.receiveBuf.Reset() + default: + h.receiveBuf.WriteByte(b) } } @@ -78,25 +88,22 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { case <-h.Done(): return 0, io.EOF } - return } // Handle various message types. func (h *baseHandler) handleMessageType(message string) { - if len(h.receiveBuf) == 0 { + if len(message) == 0 { return } // Hidden server commands starti with a dot "." - if h.receiveBuf[0] == '.' { + if message[0] == '.' { h.handleHiddenMessage(message) - h.receiveBuf = h.receiveBuf[:0] return } logger.Raw(message) - h.receiveBuf = h.receiveBuf[:0] } // Handle messages received from server which are not meant to be displayed diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 213748c..eca0348 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -1,6 +1,7 @@ package handlers import ( + "bytes" "errors" "fmt" "time" @@ -13,7 +14,7 @@ import ( type HealthHandler struct { done *internal.Done // Buffer of incoming data from server. - receiveBuf []byte + receiveBuf bytes.Buffer // To send commands to the server. commands chan string // To receive messages from the server. @@ -72,10 +73,10 @@ func (h *HealthHandler) SendMessage(command string) error { // Server writes byte stream to client. func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.receiveBuf = append(h.receiveBuf, b) - if b == protocol.MessageDelimiter { // '\n' { - h.receive <- string(h.receiveBuf) - h.receiveBuf = h.receiveBuf[:0] + h.receiveBuf.WriteByte(b) + if b == protocol.MessageDelimiter { + h.receive <- h.receiveBuf.String() + h.receiveBuf.Reset() } } diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index afad507..65b1454 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -15,7 +15,6 @@ type MaprHandler struct { baseHandler aggregate *client.Aggregate query *mapr.Query - count uint64 } // NewMaprHandler returns a new mapreduce client handler. @@ -36,19 +35,20 @@ func NewMaprHandler(server string, query *mapr.Query, globalGroup *mapr.GlobalGr // Read data from the dtail server via Writer interface. func (h *MaprHandler) Write(p []byte) (n int, err error) { for _, b := range p { - h.baseHandler.receiveBuf = append(h.baseHandler.receiveBuf, b) - if b == protocol.MessageDelimiter { // '\n' { - if len(h.baseHandler.receiveBuf) == 0 { - continue + switch b { + case '\n': + continue + case protocol.MessageDelimiter: + message := h.baseHandler.receiveBuf.String() + logger.Debug(message) + if message[0] == 'A' { + h.handleAggregateMessage(message) + } else { + h.baseHandler.handleMessageType(message) } - message := string(h.baseHandler.receiveBuf) - - if h.baseHandler.receiveBuf[0] == 'A' { - h.handleAggregateMessage(strings.TrimSpace(message)) - h.baseHandler.receiveBuf = h.baseHandler.receiveBuf[:0] - continue - } - h.baseHandler.handleMessageType(message) + h.baseHandler.receiveBuf.Reset() + default: + h.baseHandler.receiveBuf.WriteByte(b) } } @@ -58,12 +58,12 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { // Handle a message received from server including mapr aggregation // related data. func (h *MaprHandler) handleAggregateMessage(message string) { - h.count++ - parts := strings.Split(message, protocol.AggregateDelimiter) - - // Index 0 contains 'AGGREGATE', 1 contains server host. - // Aggregation data begins from index 2. - logger.Debug("Received aggregate data", h.server, h.count, parts) - h.aggregate.Aggregate(parts[2:]) - logger.Debug("Aggregated aggregate data", h.server, h.count) + parts := strings.SplitN(message, protocol.FieldDelimiter, 3) + if len(parts) != 3 { + logger.Error("Unable to aggregate data", h.server, message, parts, len(parts), "expected 3 parts") + return + } + if err := h.aggregate.Aggregate(parts[2]); err != nil { + logger.Error("Unable to aggregate data", h.server, message, err) + } } diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 77b674b..cab9a6c 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -9,6 +9,8 @@ import ( "time" "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/color" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/omode" @@ -37,6 +39,8 @@ type MaprClient struct { query *mapr.Query // Additative result or new result every interval run? cumulative bool + // The last result string received + lastResult string } // NewMaprClient returns a new mapreduce client. @@ -154,24 +158,37 @@ func (c *MaprClient) reportResults() { func (c *MaprClient) printResults() { var result string var err error - var numLines int + var numRows int if c.cumulative { - result, numLines, err = c.globalGroup.Result(c.query) + result, numRows, err = c.globalGroup.Result(c.query) } else { - result, numLines, err = c.globalGroup.SwapOut().Result(c.query) + result, numRows, err = c.globalGroup.SwapOut().Result(c.query) } + if err != nil { logger.FatalExit(err) } - if numLines == 0 { - logger.Warn("Empty result set this time...") + if result == c.lastResult { + logger.Debug("Result hasn't changed compared to last time...") return } + c.lastResult = result - //logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) - logger.Raw(c.query.RawQuery) + if numRows == 0 { + logger.Debug("Empty result set this time...") + return + } + + rawQuery := c.query.RawQuery + if config.Client.TermColorsEnable { + rawQuery = color.PaintStrWithAttr(rawQuery, + config.Client.TermColors.MaprTable.RawQueryFg, + config.Client.TermColors.MaprTable.RawQueryBg, + config.Client.TermColors.MaprTable.RawQueryAttr) + } + logger.Raw(rawQuery) logger.Raw(result) } diff --git a/internal/color/paint.go b/internal/color/paint.go index 5430acd..53c9abb 100644 --- a/internal/color/paint.go +++ b/internal/color/paint.go @@ -63,6 +63,19 @@ func PaintWithAttr(sb *strings.Builder, text string, fg FgColor, bg BgColor, att sb.WriteString(string(FgDefault)) } +// PaintWithAttrs is similar to PaintWithAttr, but it takes multiple text attributes. +func PaintWithAttrs(sb *strings.Builder, text string, fg FgColor, bg BgColor, attrs []Attribute) { + sb.WriteString(string(fg)) + sb.WriteString(string(bg)) + for _, attr := range attrs { + sb.WriteString(string(attr)) + } + sb.WriteString(text) + sb.WriteString(string(AttrReset)) + sb.WriteString(string(BgDefault)) + sb.WriteString(string(FgDefault)) +} + // ResetWithAttr resets background, foreground and attributes. func ResetWithAttr(sb *strings.Builder) { sb.WriteString(string(AttrReset)) diff --git a/internal/config/client.go b/internal/config/client.go index 3c2f7de..795c4a4 100644 --- a/internal/config/client.go +++ b/internal/config/client.go @@ -71,11 +71,32 @@ type commonTermColors struct { SeverityWarnFg color.FgColor } +type maprTableTermColors struct { + DataAttr color.Attribute + DataBg color.BgColor + DataFg color.FgColor + DelimiterAttr color.Attribute + DelimiterBg color.BgColor + DelimiterFg color.FgColor + HeaderAttr color.Attribute + HeaderBg color.BgColor + HeaderDelimiterAttr color.Attribute + HeaderDelimiterBg color.BgColor + HeaderDelimiterFg color.FgColor + HeaderFg color.FgColor + HeaderGroupKeyAttr color.Attribute + HeaderSortKeyAttr color.Attribute + RawQueryAttr color.Attribute + RawQueryBg color.BgColor + RawQueryFg color.FgColor +} + type termColors struct { - Remote remoteTermColors - Client clientTermColors - Server serverTermColors - Common commonTermColors + Remote remoteTermColors + Client clientTermColors + Server serverTermColors + Common commonTermColors + MaprTable maprTableTermColors } // ClientConfig represents a DTail client configuration (empty as of now as there @@ -155,6 +176,25 @@ func newDefaultClientConfig() *ClientConfig { SeverityWarnBg: color.BgBlack, SeverityWarnFg: color.FgWhite, }, + MaprTable: maprTableTermColors{ + DataAttr: color.AttrNone, + DataBg: color.BgBlue, + DataFg: color.FgWhite, + DelimiterAttr: color.AttrDim, + DelimiterBg: color.BgBlue, + DelimiterFg: color.FgWhite, + HeaderAttr: color.AttrBold, + HeaderBg: color.BgBlue, + HeaderFg: color.FgWhite, + HeaderDelimiterAttr: color.AttrDim, + HeaderDelimiterBg: color.BgBlue, + HeaderDelimiterFg: color.FgWhite, + HeaderSortKeyAttr: color.AttrUnderline, + HeaderGroupKeyAttr: color.AttrReverse, + RawQueryAttr: color.AttrDim, + RawQueryBg: color.BgBlack, + RawQueryFg: color.FgCyan, + }, }, } } diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index f2f672a..c0d44dd 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -16,7 +16,6 @@ import ( "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/io/pool" - "github.com/mimecast/dtail/internal/protocol" "github.com/mimecast/dtail/internal/regex" "github.com/DataDog/zstd" @@ -187,7 +186,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu time.Sleep(time.Millisecond * 100) continue } - message.WriteByte(protocol.MessageDelimiter) + message.WriteString("\n") select { case rawLines <- message: message = pool.BytesBuffer.Get().(*bytes.Buffer) @@ -202,7 +201,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") warnedAboutLongLine = true } - message.WriteByte(protocol.MessageDelimiter) + message.WriteString("\n") select { case rawLines <- message: message = pool.BytesBuffer.Get().(*bytes.Buffer) diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index 029d87b..47f4925 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -6,6 +6,8 @@ import ( "strconv" "strings" + "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/protocol" ) @@ -68,22 +70,25 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { // Serialize the aggregate set so it can be sent over the wire. func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) { - //logger.Trace("Serialising mapr.AggregateSet", s) - var sb strings.Builder + logger.Trace("Serialising mapr.AggregateSet", s) + sb := pool.BuilderBuffer.Get().(*strings.Builder) + defer pool.RecycleBuilderBuffer(sb) sb.WriteString(groupKey) sb.WriteString(protocol.AggregateDelimiter) - sb.WriteString(fmt.Sprintf("%d%s", s.Samples, protocol.AggregateDelimiter)) + sb.WriteString(fmt.Sprintf("%d", s.Samples)) + sb.WriteString(protocol.AggregateDelimiter) for k, v := range s.FValues { sb.WriteString(k) - sb.WriteString("=") - sb.WriteString(fmt.Sprintf("%v%s", v, protocol.AggregateDelimiter)) + sb.WriteString(protocol.AggregateKVDelimiter) + sb.WriteString(fmt.Sprintf("%v", v)) + sb.WriteString(protocol.AggregateDelimiter) } for k, v := range s.SValues { sb.WriteString(k) - sb.WriteString("=") + sb.WriteString(protocol.AggregateKVDelimiter) sb.WriteString(v) sb.WriteString(protocol.AggregateDelimiter) } diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 10b34d4..5cc09a1 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -1,11 +1,13 @@ package client import ( + "fmt" "strconv" "strings" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" + "github.com/mimecast/dtail/internal/protocol" ) // Aggregate mapreduce data on the DTail client side. @@ -31,12 +33,18 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou } // Aggregate data from mapr log line into local (and global) group sets. -func (a *Aggregate) Aggregate(parts []string) { +func (a *Aggregate) Aggregate(message string) error { + parts := strings.Split(message, protocol.AggregateDelimiter) + if len(parts) < 4 { + return fmt.Errorf("aggregate message without any real data") + } + groupKey := parts[0] samples, err := strconv.Atoi(parts[1]) if err != nil { - logger.FatalExit("Unable to parse sample count", parts[1], err, parts) + return fmt.Errorf("unable to parse sample count '%s': %v", parts[1], err) } + fields := a.makeFields(parts[2:]) set := a.group.GetSet(groupKey) @@ -63,6 +71,8 @@ func (a *Aggregate) Aggregate(parts []string) { // Re-init local group (make it empty again). a.group.InitSet() } + + return nil } // Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. @@ -70,8 +80,8 @@ func (a *Aggregate) makeFields(parts []string) map[string]string { fields := make(map[string]string, len(parts)) for _, part := range parts { - kv := strings.SplitN(part, "=", 2) - if len(kv) < 2 { + kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2) + if len(kv) != 2 { continue } fields[kv[0]] = kv[1] diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go index cfab506..21bf990 100644 --- a/internal/mapr/globalgroupset.go +++ b/internal/mapr/globalgroupset.go @@ -48,6 +48,7 @@ func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, erro // Merge a group set into the global group set. func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error { + for groupKey, set := range group.sets { s := g.GetSet(groupKey) if err := s.Merge(query, set); err != nil { diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index 6ee2811..d29559a 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -4,13 +4,16 @@ import ( "context" "errors" "fmt" - "io/ioutil" "os" "sort" "strconv" "strings" + "github.com/mimecast/dtail/internal/color" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/protocol" ) // GroupSet represents a map of aggregate sets. The group sets @@ -22,6 +25,14 @@ type GroupSet struct { sets map[string]*AggregateSet } +// Internal helper type +type result struct { + groupKey string + values []string + widths []int + orderBy float64 +} + // NewGroupSet returns a new empty group set. func NewGroupSet() *GroupSet { g := GroupSet{} @@ -58,17 +69,118 @@ func (g *GroupSet) Serialize(ctx context.Context, ch chan<- string) { // Result returns a nicely formated result of the query from the group set. func (g *GroupSet) Result(query *Query) (string, int, error) { - return g.limitedResult(query, query.Limit, "\t", " ", false) + rows, widths, err := g.result(query, true) + if err != nil { + return "", 0, err + } + + sb := pool.BuilderBuffer.Get().(*strings.Builder) + defer pool.RecycleBuilderBuffer(sb) + + // Generate header now + lastIndex := len(query.Select) - 1 + for i, sc := range query.Select { + format := fmt.Sprintf(" %%%ds ", widths[i]) + str := fmt.Sprintf(format, sc.FieldStorage) + if config.Client.TermColorsEnable { + attrs := []color.Attribute{config.Client.TermColors.MaprTable.HeaderAttr} + if sc.FieldStorage == query.OrderBy { + attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderSortKeyAttr) + } + for _, groupBy := range query.GroupBy { + if sc.FieldStorage == groupBy { + attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderGroupKeyAttr) + break + } + } + color.PaintWithAttrs(sb, str, + config.Client.TermColors.MaprTable.HeaderFg, + config.Client.TermColors.MaprTable.HeaderBg, + attrs) + } else { + sb.WriteString(str) + } + + if i == lastIndex { + continue + } + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, protocol.FieldDelimiter, + config.Client.TermColors.MaprTable.HeaderDelimiterFg, + config.Client.TermColors.MaprTable.HeaderDelimiterBg, + config.Client.TermColors.MaprTable.HeaderDelimiterAttr) + } else { + sb.WriteString(protocol.FieldDelimiter) + } + } + sb.WriteString("\n") + + for i := 0; i < len(query.Select); i++ { + str := fmt.Sprintf("-%s-", strings.Repeat("-", widths[i])) + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, str, + config.Client.TermColors.MaprTable.HeaderDelimiterFg, + config.Client.TermColors.MaprTable.HeaderDelimiterBg, + config.Client.TermColors.MaprTable.HeaderDelimiterAttr) + } else { + sb.WriteString(str) + } + if i == lastIndex { + continue + } + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, protocol.FieldDelimiter, + config.Client.TermColors.MaprTable.HeaderDelimiterFg, + config.Client.TermColors.MaprTable.HeaderDelimiterBg, + config.Client.TermColors.MaprTable.HeaderDelimiterAttr) + } else { + sb.WriteString(protocol.FieldDelimiter) + } + } + sb.WriteString("\n") + + // And now write the data + for i, r := range rows { + if i == query.Limit { + break + } + for j, value := range r.values { + format := fmt.Sprintf(" %%%ds ", widths[j]) + str := fmt.Sprintf(format, value) + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, str, + config.Client.TermColors.MaprTable.DataFg, + config.Client.TermColors.MaprTable.DataBg, + config.Client.TermColors.MaprTable.DataAttr) + } else { + sb.WriteString(str) + } + + if j == lastIndex { + continue + } + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, protocol.FieldDelimiter, + config.Client.TermColors.MaprTable.DelimiterFg, + config.Client.TermColors.MaprTable.DelimiterBg, + config.Client.TermColors.MaprTable.DelimiterAttr) + } else { + sb.WriteString(protocol.FieldDelimiter) + } + } + sb.WriteString("\n") + } + + return sb.String(), len(rows), nil } -// WriteResult writes the result to an outfile. +// WriteResult writes the result to an CSV outfile. func (g *GroupSet) WriteResult(query *Query) error { if !query.HasOutfile() { return errors.New("No outfile specified") } - // -1: Don't limit the result, include all data sets - result, _, err := g.limitedResult(query, -1, "", ",", true) + rows, _, err := g.result(query, false) if err != nil { return err } @@ -76,9 +188,37 @@ func (g *GroupSet) WriteResult(query *Query) error { logger.Info("Writing outfile", query.Outfile) tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile) - if err := ioutil.WriteFile(tmpOutfile, []byte(result), 0644); err != nil { + file, err := os.Create(tmpOutfile) + if err != nil { return err } + defer file.Close() + + // Generate header now + lastIndex := len(query.Select) - 1 + for i, sc := range query.Select { + file.WriteString(sc.FieldStorage) + if i == lastIndex { + continue + } + file.WriteString(protocol.CSVDelimiter) + } + file.WriteString("\n") + + // And now write the data + for i, r := range rows { + if i == query.Limit { + break + } + for j, value := range r.values { + file.WriteString(value) + if j == lastIndex { + continue + } + file.WriteString(protocol.CSVDelimiter) + } + file.WriteString("\n") + } if err := os.Rename(tmpOutfile, query.Outfile); err != nil { os.Remove(tmpOutfile) @@ -88,32 +228,22 @@ func (g *GroupSet) WriteResult(query *Query) error { return nil } -// Return a nicely formated result of the query from the group set. -func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) { - type result struct { - groupKey string - resultStr string - orderBy float64 - } +// Return a sorted result slice of the query from the group set. +func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, error) { + var rows []result + widths := make([]int, len(query.Select)) - var resultSlice []result + var valueStr string + var value float64 for groupKey, set := range g.sets { - var sb strings.Builder r := result{groupKey: groupKey} - lastIndex := len(query.Select) - 1 for i, sc := range query.Select { - storage := sc.FieldStorage - orderByThis := storage == query.OrderBy - switch sc.Operation { case Count: - value := set.FValues[storage] - sb.WriteString(fmt.Sprintf("%d", int(value))) - if orderByThis { - r.orderBy = value - } + value = set.FValues[sc.FieldStorage] + valueStr = fmt.Sprintf("%d", int(value)) case Len: fallthrough case Sum: @@ -121,74 +251,48 @@ func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSepa case Min: fallthrough case Max: - value := set.FValues[storage] - sb.WriteString(fmt.Sprintf("%f", value)) - if orderByThis { - r.orderBy = value - } + value = set.FValues[sc.FieldStorage] + valueStr = fmt.Sprintf("%f", value) case Last: - value := set.SValues[storage] - if orderByThis { - f, err := strconv.ParseFloat(value, 64) - if err == nil { - r.orderBy = f - } - } - sb.WriteString(value) + valueStr = set.SValues[sc.FieldStorage] + value, _ = strconv.ParseFloat(valueStr, 64) case Avg: - value := set.FValues[storage] / float64(set.Samples) - sb.WriteString(fmt.Sprintf("%f", value)) - if orderByThis { - r.orderBy = value - } + value = set.FValues[sc.FieldStorage] / float64(set.Samples) + valueStr = fmt.Sprintf("%f", value) default: - return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) + return rows, widths, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) } - if i != lastIndex { - sb.WriteString(fieldSeparator) + + if sc.FieldStorage == query.OrderBy { + r.orderBy = value + } + r.values = append(r.values, valueStr) + + if !gatherWidths { + continue + } + if widths[i] < len(sc.FieldStorage) { + widths[i] = len(sc.FieldStorage) + } + if widths[i] < len(valueStr) { + widths[i] = len(valueStr) } } - r.resultStr = sb.String() - resultSlice = append(resultSlice, r) + rows = append(rows, r) } if query.OrderBy != "" { if query.ReverseOrder { - sort.SliceStable(resultSlice, func(i, j int) bool { - return resultSlice[i].orderBy < resultSlice[j].orderBy + sort.SliceStable(rows, func(i, j int) bool { + return rows[i].orderBy < rows[j].orderBy }) } else { - sort.SliceStable(resultSlice, func(i, j int) bool { - return resultSlice[i].orderBy > resultSlice[j].orderBy + sort.SliceStable(rows, func(i, j int) bool { + return rows[i].orderBy > rows[j].orderBy }) } } - var sb strings.Builder - - // Write header first - if addHeader { - lastIndex := len(query.Select) - 1 - sb.WriteString(lineStarter) - for i, sc := range query.Select { - sb.WriteString(sc.FieldStorage) - if i != lastIndex { - sb.WriteString(fieldSeparator) - } - } - sb.WriteString("\n") - } - - // And now write the data - for i, r := range resultSlice { - if i == limit { - break - } - sb.WriteString(lineStarter) - sb.WriteString(r.resultStr) - sb.WriteString("\n") - } - - return sb.String(), len(resultSlice), nil + return rows, widths, nil } diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index da976bd..c67137e 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -1,16 +1,22 @@ package logformat import ( + "fmt" "strings" - "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/protocol" ) -// MakeFieldsDEFAULT is the default log file mapreduce parser. +// MakeFieldsDEFAULT is the default DTail log file key-value parser. func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { splitted := strings.Split(maprLine, protocol.FieldDelimiter) - fields := make(map[string]string, len(splitted)) + + if len(splitted) < 3 || !strings.HasPrefix(splitted[3], "MAPREDUCE:") || !strings.HasPrefix(splitted[0], "INFO") { + // Not a DTail mapreduce log line. + return nil, IgnoreFieldsErr + } + + fields := make(map[string]string, len(splitted)+8) fields["*"] = "*" fields["$line"] = maprLine @@ -19,20 +25,14 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { fields["$timezone"] = p.timeZoneName fields["$timeoffset"] = p.timeZoneOffset - kvStart := 0 - // DTail mapreduce format - if len(splitted) > 3 && strings.HasPrefix(splitted[3], "MAPREDUCE:") { - fields["$severity"] = splitted[0] - // TODO: Parse time like we do at Mimecast - fields["$time"] = splitted[1] - kvStart = 4 - } + fields["$severity"] = splitted[0] + // TODO: Parse time like we do at Mimecast + fields["$time"] = splitted[1] - for _, kv := range splitted[kvStart:] { + for _, kv := range splitted[4:] { keyAndValue := strings.SplitN(kv, "=", 2) if len(keyAndValue) != 2 { - logger.Debug("Unable to parse key-value token, ignoring it", kv) - continue + return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv) } fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] } diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go index 6284008..79911d1 100644 --- a/internal/mapr/logformat/default_test.go +++ b/internal/mapr/logformat/default_test.go @@ -11,8 +11,8 @@ func TestDefaultLogFormat(t *testing.T) { } inputs := []string{ - "foo=bar|baz=bay", "INFO|20210907-065632|SERVER|MAPREDUCE:TEST|foo=bar|baz=bay", + "INFO|20210907-065732|CLIENT|MAPREDUCE:YOMAN|baz=bay|foo=bar", } for _, input := range inputs { @@ -23,20 +23,21 @@ func TestDefaultLogFormat(t *testing.T) { } if bar, ok := fields["foo"]; !ok { - t.Errorf("Expected field 'foo', but no such field there\n") + t.Errorf("Expected field 'foo', but no such field there in '%s'\n", input) } else if bar != "bar" { - t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar) + t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", bar, input) } if bay, ok := fields["baz"]; !ok { - t.Errorf("Expected field 'baz', but no such field there\n") + t.Errorf("Expected field 'baz', but no such field there in '%s'\n", input) } else if bay != "bay" { - t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay) + t.Errorf("Expected 'bay' stored in field 'baz', but got '%s' in '%s'\n", bay, input) } } - if _, err := parser.MakeFields("foo=bar|bazbay"); err == nil { - t.Errorf("Expected error but didn't") + fields, err := parser.MakeFields("foozoo=bar|bazbay") + if _, ok := fields["foo"]; ok { + t.Errorf("Expected fiending field 'foo', but found it\n") } } diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go new file mode 100644 index 0000000..23d75cb --- /dev/null +++ b/internal/mapr/logformat/generickv.go @@ -0,0 +1,31 @@ +package logformat + +import ( + "strings" + + "github.com/mimecast/dtail/internal/protocol" +) + +// MakeFieldsGENERICKV is the generic key-value logfile parser. +func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error) { + splitted := strings.Split(maprLine, protocol.FieldDelimiter) + fields := make(map[string]string, len(splitted)) + + fields["*"] = "*" + fields["$line"] = maprLine + fields["$empty"] = "" + fields["$hostname"] = p.hostname + fields["$timezone"] = p.timeZoneName + fields["$timeoffset"] = p.timeZoneOffset + + for _, kv := range splitted[0:] { + keyAndValue := strings.SplitN(kv, "=", 2) + if len(keyAndValue) != 2 { + //logger.Debug("Unable to parse key-value token, ignoring it", kv) + continue + } + fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] + } + + return fields, nil +} diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go index c53729a..6582b5f 100644 --- a/internal/mapr/logformat/parser.go +++ b/internal/mapr/logformat/parser.go @@ -12,6 +12,8 @@ import ( "github.com/mimecast/dtail/internal/mapr" ) +var IgnoreFieldsErr error = errors.New("Ignore this field set") + // Parser is used to parse the mapreduce information from the server log files. type Parser struct { hostname string diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 9106f52..a6d6bb1 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -13,6 +13,7 @@ import ( "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" + "github.com/mimecast/dtail/internal/protocol" ) // Aggregate is for aggregating mapreduce data on the DTail server side. @@ -89,7 +90,6 @@ func (a *Aggregate) Shutdown() { // Start an aggregation. func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { - myCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -109,6 +109,7 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { fieldsCh = a.addFields(myCtx, fieldsCh) } + // Periodically pre-aggregate data every a.query.Interval seconds. go a.aggregateTimer(myCtx) a.makeMaprLines(myCtx, fieldsCh, maprLines) } @@ -139,13 +140,16 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { maprLine := strings.TrimSpace(line.Content.String()) pool.RecycleBytesBuffer(line.Content) - fields, err := a.parser.MakeFields(maprLine) - logger.Debug(fields, err) + fields, err := a.parser.MakeFields(maprLine) if err != nil { - logger.Error(err) + // Should fields be ignored anyway? + if err != logformat.IgnoreFieldsErr { + logger.Error(fields, err) + } continue } + if !a.query.WhereClause(fields) { continue } @@ -170,7 +174,7 @@ func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]st defer close(ch) for { - // fieldsCh will be closed via 'makeFields' if ctx is done + // fieldsCh will be closed via 'makeFields' when ctx is done fields, ok := <-fieldsCh if !ok { return @@ -219,12 +223,11 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin } func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { - //logger.Trace("Aggregating", group, fields) var sb strings.Builder for i, field := range a.query.GroupBy { if i > 0 { - sb.WriteString(" ") + sb.WriteString(protocol.AggregateGroupKeyCombinator) } if val, ok := fields[field]; ok { sb.WriteString(val) diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index d3035e4..8c9e861 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -5,8 +5,15 @@ const ( ProtocolCompat string = "4" // MessageDelimiter delimits separate messages. MessageDelimiter byte = '¬' - // FieldDelimiter delimits aggregation fields. + // FieldDelimiter delimits messagefields. FieldDelimiter string = "|" + // CSVDelimiter delimits CSV file fields.kj:w + CSVDelimiter string = "," + // AggregateKVDelimiter delimits key-values of an aggregation message. + AggregateKVDelimiter string = "≔" // AggregateDelimiter delimits parts of an aggregation message. - AggregateDelimiter string = "➔" + AggregateDelimiter string = "∥" + // AggregateDelimiter string = "⦀" + // AggregateGroupKeyCombinator combines the group set keys. + AggregateGroupKeyCombinator string = "," ) diff --git a/internal/server/handlers/controlhandler.go b/internal/server/handlers/controlhandler.go index a217b40..1e17c78 100644 --- a/internal/server/handlers/controlhandler.go +++ b/internal/server/handlers/controlhandler.go @@ -8,7 +8,6 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/protocol" user "github.com/mimecast/dtail/internal/user/server" ) @@ -57,7 +56,7 @@ func (h *ControlHandler) Read(p []byte) (n int, err error) { for { select { case message := <-h.serverMessages: - wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s%b", h.hostname, message, protocol.MessageDelimiter)) + wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s\n", h.hostname, message)) n = copy(p, wholePayload) return case <-h.done.Done(): diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 14f46a3..e74e686 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -38,7 +38,6 @@ type ServerHandler struct { aggregate *server.Aggregate aggregatedMessages chan string serverMessages chan string - payload []byte hostname string user *user.User catLimiter chan struct{} @@ -47,6 +46,8 @@ type ServerHandler struct { activeCommands int32 activeReaders int32 quiet bool + readBuf bytes.Buffer + writeBuf bytes.Buffer } // NewServerHandler returns the server handler. @@ -86,77 +87,74 @@ func (h *ServerHandler) Done() <-chan struct{} { // Read is to send data to the dtail client via Reader interface. func (h *ServerHandler) Read(p []byte) (n int, err error) { - for { - select { - case message := <-h.serverMessages: - if len(message) == 0 { - logger.Warn(h.user, "Empty message recieved") - return - } - if message[0] == '.' { - // Handle hidden message (don't display to the user, interpreted by dtail client) - payload := []byte(fmt.Sprintf("%s%s", message, string(protocol.MessageDelimiter))) - n = copy(p, payload) - return - } - - // Handle normal server message (display to the user) - payload := []byte(fmt.Sprintf("SERVER|%s|%s%s", h.hostname, message, string(protocol.MessageDelimiter))) - n = copy(p, payload) - return + defer h.readBuf.Reset() - case message := <-h.aggregatedMessages: - // Send mapreduce-aggregated data as a message. - buf := pool.BytesBuffer.Get().(*bytes.Buffer) - buf.WriteString("AGGREGATE") - buf.WriteString(protocol.AggregateDelimiter) - buf.WriteString(h.hostname) - buf.WriteString(protocol.AggregateDelimiter) - buf.WriteString(message) - buf.WriteByte(protocol.MessageDelimiter) - n = copy(p, buf.Bytes()) - pool.RecycleBytesBuffer(buf) + select { + case message := <-h.serverMessages: + if message[0] == '.' { + // Handle hidden message (don't display to the user, interpreted by dtail client) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) return + } - case line := <-h.lines: - buf := pool.BytesBuffer.Get().(*bytes.Buffer) - buf.WriteString("REMOTE") - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(h.hostname) - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(fmt.Sprintf("%v", line.Count)) - buf.WriteString(protocol.FieldDelimiter) - buf.WriteString(line.SourceID) - buf.WriteString(protocol.FieldDelimiter) - payload := append(buf.Bytes(), line.Content.Bytes()...) - n = copy(p, payload) - pool.RecycleBytesBuffer(buf) - pool.RecycleBytesBuffer(line.Content) + // Handle normal server message (display to the user) + h.readBuf.WriteString("SERVER") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + + case message := <-h.aggregatedMessages: + // Send mapreduce-aggregated data as a message. + h.readBuf.WriteString("AGGREGATE") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(message) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + + case line := <-h.lines: + h.readBuf.WriteString("REMOTE") + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(h.hostname) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(fmt.Sprintf("%3d", line.TransmittedPerc)) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(fmt.Sprintf("%v", line.Count)) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(line.SourceID) + h.readBuf.WriteString(protocol.FieldDelimiter) + h.readBuf.WriteString(line.Content.String()) + h.readBuf.WriteByte(protocol.MessageDelimiter) + n = copy(p, h.readBuf.Bytes()) + pool.RecycleBytesBuffer(line.Content) + + case <-time.After(time.Second): + // Once in a while check whether we are done. + select { + case <-h.done.Done(): + err = io.EOF return - - case <-time.After(time.Second): - // Once in a while check whether we are done. - select { - case <-h.done.Done(): - return 0, io.EOF - default: - } + default: } } + return } // Write is to receive data from the dtail client via Writer interface. func (h *ServerHandler) Write(p []byte) (n int, err error) { - for _, c := range p { - switch c { + for _, b := range p { + switch b { case ';': - commandStr := strings.TrimSpace(string(h.payload)) - h.handleCommand(commandStr) - h.payload = nil + h.handleCommand(string(h.writeBuf.Bytes())) + h.writeBuf.Reset() default: - h.payload = append(h.payload, c) + h.writeBuf.WriteByte(b) } } |
