From c2522ffb59514443816a96386c16bb7527cbe57c Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 21 Aug 2021 14:54:24 +0300 Subject: read files bytewise for more control of whats happening - change transport protocol for more control over newlines --- internal/mapr/aggregateset.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index a6cc6eb..029d87b 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -5,6 +5,8 @@ import ( "fmt" "strconv" "strings" + + "github.com/mimecast/dtail/internal/protocol" ) // AggregateSet represents aggregated key/value pairs from the @@ -70,20 +72,20 @@ func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- var sb strings.Builder sb.WriteString(groupKey) - sb.WriteString("āž”") - sb.WriteString(fmt.Sprintf("%dāž”", s.Samples)) + sb.WriteString(protocol.AggregateDelimiter) + sb.WriteString(fmt.Sprintf("%d%s", s.Samples, protocol.AggregateDelimiter)) for k, v := range s.FValues { sb.WriteString(k) sb.WriteString("=") - sb.WriteString(fmt.Sprintf("%vāž”", v)) + sb.WriteString(fmt.Sprintf("%v%s", v, protocol.AggregateDelimiter)) } for k, v := range s.SValues { sb.WriteString(k) sb.WriteString("=") sb.WriteString(v) - sb.WriteString("āž”") + sb.WriteString(protocol.AggregateDelimiter) } select { -- cgit v1.2.3 From 6d727b9bdbc387c8a5c34406a2c4de9140face38 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 28 Aug 2021 19:36:46 +0100 Subject: use a byte.Buffer in the file reader --- internal/mapr/server/aggregate.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'internal/mapr') diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 28bb074..9106f52 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -10,6 +10,7 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" ) @@ -136,7 +137,8 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { return } - maprLine := strings.TrimSpace(string(line.Content)) + maprLine := strings.TrimSpace(line.Content.String()) + pool.RecycleBytesBuffer(line.Content) fields, err := a.parser.MakeFields(maprLine) logger.Debug(fields, err) -- cgit v1.2.3 From 23982f331c2154a66b86d596226c24454fd06be5 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 28 Aug 2021 20:26:32 +0100 Subject: 1. Major performance gain by not checking for file truncation aftter each bytes read. 2. Introduce field separator to the protocol package. --- internal/mapr/logformat/default.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'internal/mapr') diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index 44bf558..32a34bd 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -3,12 +3,14 @@ package logformat import ( "errors" "strings" + + "github.com/mimecast/dtail/internal/protocol" ) // MakeFieldsDEFAULT is the default log file mapreduce parser. func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { fields := make(map[string]string, 20) - splitted := strings.Split(maprLine, "|") + splitted := strings.Split(maprLine, protocol.FieldDelimiter) fields["*"] = "*" fields["$line"] = maprLine -- cgit v1.2.3 From f74a9e4b35feb8c07d8a70b5a581088a0a59889d Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 7 Sep 2021 10:01:32 +0300 Subject: Produce MAPREDUCE lines, can aggregate these via default log format --- internal/mapr/logformat/default.go | 15 ++++++++++--- internal/mapr/logformat/default_test.go | 39 +++++++++++++++++++-------------- 2 files changed, 35 insertions(+), 19 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index 32a34bd..2881047 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -9,8 +9,8 @@ import ( // MakeFieldsDEFAULT is the default log file mapreduce parser. func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { - fields := make(map[string]string, 20) splitted := strings.Split(maprLine, protocol.FieldDelimiter) + fields := make(map[string]string, len(splitted)) fields["*"] = "*" fields["$line"] = maprLine @@ -19,10 +19,19 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { fields["$timezone"] = p.timeZoneName fields["$timeoffset"] = p.timeZoneOffset - for _, kv := range splitted { + kvStart := 0 + // DTail mapreduce format + if len(splitted) > 3 && strings.HasPrefix(splitted[3], "MAPREDUCE:") { + fields["$severity"] = splitted[0] + // TODO: Parse time like we do at Mimecast + fields["$time"] = splitted[1] + kvStart = 4 + } + + for _, kv := range splitted[kvStart:] { keyAndValue := strings.SplitN(kv, "=", 2) if len(keyAndValue) != 2 { - return fields, errors.New("Error parsing mapr token: " + kv) + return fields, errors.New("Error parsing mapreduce token: " + kv) } fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] } diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go index d7a4da4..6284008 100644 --- a/internal/mapr/logformat/default_test.go +++ b/internal/mapr/logformat/default_test.go @@ -10,26 +10,33 @@ func TestDefaultLogFormat(t *testing.T) { t.Errorf("Unable to create parser: %s", err.Error()) } - fields, err := parser.MakeFields("foo=bar|baz=bay") - - if err != nil { - t.Errorf("Unable to parse: %s", err.Error()) + inputs := []string{ + "foo=bar|baz=bay", + "INFO|20210907-065632|SERVER|MAPREDUCE:TEST|foo=bar|baz=bay", } - if bar, ok := fields["foo"]; !ok { - t.Errorf("Expected field 'foo', but no such field there\n") - } else if bar != "bar" { - t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar) - } + for _, input := range inputs { + fields, err := parser.MakeFields(input) + + if err != nil { + t.Errorf("Parser unable to make fields: %s", err.Error()) + } + + if bar, ok := fields["foo"]; !ok { + t.Errorf("Expected field 'foo', but no such field there\n") + } else if bar != "bar" { + t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar) + } + + if bay, ok := fields["baz"]; !ok { + t.Errorf("Expected field 'baz', but no such field there\n") + } else if bay != "bay" { + t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay) + } - if bay, ok := fields["baz"]; !ok { - t.Errorf("Expected field 'baz', but no such field there\n") - } else if bay != "bay" { - t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay) } - fields, err = parser.MakeFields("foo=bar|bazbay") - if err == nil { - t.Errorf("Expected error but didn't: %s", err.Error()) + if _, err := parser.MakeFields("foo=bar|bazbay"); err == nil { + t.Errorf("Expected error but didn't") } } -- cgit v1.2.3 From c83c9e61a08c7ea1cb528bc26dfab25b46faa866 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 8 Sep 2021 08:39:41 +0300 Subject: Don't fail parsing whole line when one kv could not be parsed --- internal/mapr/logformat/default.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index 2881047..da976bd 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -1,9 +1,9 @@ package logformat import ( - "errors" "strings" + "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/protocol" ) @@ -31,7 +31,8 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { for _, kv := range splitted[kvStart:] { keyAndValue := strings.SplitN(kv, "=", 2) if len(keyAndValue) != 2 { - return fields, errors.New("Error parsing mapreduce token: " + kv) + logger.Debug("Unable to parse key-value token, ignoring it", kv) + continue } fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] } -- cgit v1.2.3 From 16dc57e1e1c28e9d762424e596223a980770e059 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Wed, 8 Sep 2021 19:10:50 +0300 Subject: mapreduce tables are in colors now too --- internal/mapr/aggregateset.go | 17 ++- internal/mapr/client/aggregate.go | 18 ++- internal/mapr/globalgroupset.go | 1 + internal/mapr/groupset.go | 258 ++++++++++++++++++++++---------- internal/mapr/logformat/default.go | 28 ++-- internal/mapr/logformat/default_test.go | 15 +- internal/mapr/logformat/generickv.go | 31 ++++ internal/mapr/logformat/parser.go | 2 + internal/mapr/server/aggregate.go | 17 ++- 9 files changed, 272 insertions(+), 115 deletions(-) create mode 100644 internal/mapr/logformat/generickv.go (limited to 'internal/mapr') diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index 029d87b..47f4925 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -6,6 +6,8 @@ import ( "strconv" "strings" + "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/protocol" ) @@ -68,22 +70,25 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { // Serialize the aggregate set so it can be sent over the wire. func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) { - //logger.Trace("Serialising mapr.AggregateSet", s) - var sb strings.Builder + logger.Trace("Serialising mapr.AggregateSet", s) + sb := pool.BuilderBuffer.Get().(*strings.Builder) + defer pool.RecycleBuilderBuffer(sb) sb.WriteString(groupKey) sb.WriteString(protocol.AggregateDelimiter) - sb.WriteString(fmt.Sprintf("%d%s", s.Samples, protocol.AggregateDelimiter)) + sb.WriteString(fmt.Sprintf("%d", s.Samples)) + sb.WriteString(protocol.AggregateDelimiter) for k, v := range s.FValues { sb.WriteString(k) - sb.WriteString("=") - sb.WriteString(fmt.Sprintf("%v%s", v, protocol.AggregateDelimiter)) + sb.WriteString(protocol.AggregateKVDelimiter) + sb.WriteString(fmt.Sprintf("%v", v)) + sb.WriteString(protocol.AggregateDelimiter) } for k, v := range s.SValues { sb.WriteString(k) - sb.WriteString("=") + sb.WriteString(protocol.AggregateKVDelimiter) sb.WriteString(v) sb.WriteString(protocol.AggregateDelimiter) } diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 10b34d4..5cc09a1 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -1,11 +1,13 @@ package client import ( + "fmt" "strconv" "strings" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" + "github.com/mimecast/dtail/internal/protocol" ) // Aggregate mapreduce data on the DTail client side. @@ -31,12 +33,18 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou } // Aggregate data from mapr log line into local (and global) group sets. -func (a *Aggregate) Aggregate(parts []string) { +func (a *Aggregate) Aggregate(message string) error { + parts := strings.Split(message, protocol.AggregateDelimiter) + if len(parts) < 4 { + return fmt.Errorf("aggregate message without any real data") + } + groupKey := parts[0] samples, err := strconv.Atoi(parts[1]) if err != nil { - logger.FatalExit("Unable to parse sample count", parts[1], err, parts) + return fmt.Errorf("unable to parse sample count '%s': %v", parts[1], err) } + fields := a.makeFields(parts[2:]) set := a.group.GetSet(groupKey) @@ -63,6 +71,8 @@ func (a *Aggregate) Aggregate(parts []string) { // Re-init local group (make it empty again). a.group.InitSet() } + + return nil } // Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. @@ -70,8 +80,8 @@ func (a *Aggregate) makeFields(parts []string) map[string]string { fields := make(map[string]string, len(parts)) for _, part := range parts { - kv := strings.SplitN(part, "=", 2) - if len(kv) < 2 { + kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2) + if len(kv) != 2 { continue } fields[kv[0]] = kv[1] diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go index cfab506..21bf990 100644 --- a/internal/mapr/globalgroupset.go +++ b/internal/mapr/globalgroupset.go @@ -48,6 +48,7 @@ func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, erro // Merge a group set into the global group set. func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error { + for groupKey, set := range group.sets { s := g.GetSet(groupKey) if err := s.Merge(query, set); err != nil { diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index 6ee2811..d29559a 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -4,13 +4,16 @@ import ( "context" "errors" "fmt" - "io/ioutil" "os" "sort" "strconv" "strings" + "github.com/mimecast/dtail/internal/color" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/protocol" ) // GroupSet represents a map of aggregate sets. The group sets @@ -22,6 +25,14 @@ type GroupSet struct { sets map[string]*AggregateSet } +// Internal helper type +type result struct { + groupKey string + values []string + widths []int + orderBy float64 +} + // NewGroupSet returns a new empty group set. func NewGroupSet() *GroupSet { g := GroupSet{} @@ -58,17 +69,118 @@ func (g *GroupSet) Serialize(ctx context.Context, ch chan<- string) { // Result returns a nicely formated result of the query from the group set. func (g *GroupSet) Result(query *Query) (string, int, error) { - return g.limitedResult(query, query.Limit, "\t", " ", false) + rows, widths, err := g.result(query, true) + if err != nil { + return "", 0, err + } + + sb := pool.BuilderBuffer.Get().(*strings.Builder) + defer pool.RecycleBuilderBuffer(sb) + + // Generate header now + lastIndex := len(query.Select) - 1 + for i, sc := range query.Select { + format := fmt.Sprintf(" %%%ds ", widths[i]) + str := fmt.Sprintf(format, sc.FieldStorage) + if config.Client.TermColorsEnable { + attrs := []color.Attribute{config.Client.TermColors.MaprTable.HeaderAttr} + if sc.FieldStorage == query.OrderBy { + attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderSortKeyAttr) + } + for _, groupBy := range query.GroupBy { + if sc.FieldStorage == groupBy { + attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderGroupKeyAttr) + break + } + } + color.PaintWithAttrs(sb, str, + config.Client.TermColors.MaprTable.HeaderFg, + config.Client.TermColors.MaprTable.HeaderBg, + attrs) + } else { + sb.WriteString(str) + } + + if i == lastIndex { + continue + } + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, protocol.FieldDelimiter, + config.Client.TermColors.MaprTable.HeaderDelimiterFg, + config.Client.TermColors.MaprTable.HeaderDelimiterBg, + config.Client.TermColors.MaprTable.HeaderDelimiterAttr) + } else { + sb.WriteString(protocol.FieldDelimiter) + } + } + sb.WriteString("\n") + + for i := 0; i < len(query.Select); i++ { + str := fmt.Sprintf("-%s-", strings.Repeat("-", widths[i])) + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, str, + config.Client.TermColors.MaprTable.HeaderDelimiterFg, + config.Client.TermColors.MaprTable.HeaderDelimiterBg, + config.Client.TermColors.MaprTable.HeaderDelimiterAttr) + } else { + sb.WriteString(str) + } + if i == lastIndex { + continue + } + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, protocol.FieldDelimiter, + config.Client.TermColors.MaprTable.HeaderDelimiterFg, + config.Client.TermColors.MaprTable.HeaderDelimiterBg, + config.Client.TermColors.MaprTable.HeaderDelimiterAttr) + } else { + sb.WriteString(protocol.FieldDelimiter) + } + } + sb.WriteString("\n") + + // And now write the data + for i, r := range rows { + if i == query.Limit { + break + } + for j, value := range r.values { + format := fmt.Sprintf(" %%%ds ", widths[j]) + str := fmt.Sprintf(format, value) + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, str, + config.Client.TermColors.MaprTable.DataFg, + config.Client.TermColors.MaprTable.DataBg, + config.Client.TermColors.MaprTable.DataAttr) + } else { + sb.WriteString(str) + } + + if j == lastIndex { + continue + } + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, protocol.FieldDelimiter, + config.Client.TermColors.MaprTable.DelimiterFg, + config.Client.TermColors.MaprTable.DelimiterBg, + config.Client.TermColors.MaprTable.DelimiterAttr) + } else { + sb.WriteString(protocol.FieldDelimiter) + } + } + sb.WriteString("\n") + } + + return sb.String(), len(rows), nil } -// WriteResult writes the result to an outfile. +// WriteResult writes the result to an CSV outfile. func (g *GroupSet) WriteResult(query *Query) error { if !query.HasOutfile() { return errors.New("No outfile specified") } - // -1: Don't limit the result, include all data sets - result, _, err := g.limitedResult(query, -1, "", ",", true) + rows, _, err := g.result(query, false) if err != nil { return err } @@ -76,9 +188,37 @@ func (g *GroupSet) WriteResult(query *Query) error { logger.Info("Writing outfile", query.Outfile) tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile) - if err := ioutil.WriteFile(tmpOutfile, []byte(result), 0644); err != nil { + file, err := os.Create(tmpOutfile) + if err != nil { return err } + defer file.Close() + + // Generate header now + lastIndex := len(query.Select) - 1 + for i, sc := range query.Select { + file.WriteString(sc.FieldStorage) + if i == lastIndex { + continue + } + file.WriteString(protocol.CSVDelimiter) + } + file.WriteString("\n") + + // And now write the data + for i, r := range rows { + if i == query.Limit { + break + } + for j, value := range r.values { + file.WriteString(value) + if j == lastIndex { + continue + } + file.WriteString(protocol.CSVDelimiter) + } + file.WriteString("\n") + } if err := os.Rename(tmpOutfile, query.Outfile); err != nil { os.Remove(tmpOutfile) @@ -88,32 +228,22 @@ func (g *GroupSet) WriteResult(query *Query) error { return nil } -// Return a nicely formated result of the query from the group set. -func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) { - type result struct { - groupKey string - resultStr string - orderBy float64 - } +// Return a sorted result slice of the query from the group set. +func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, error) { + var rows []result + widths := make([]int, len(query.Select)) - var resultSlice []result + var valueStr string + var value float64 for groupKey, set := range g.sets { - var sb strings.Builder r := result{groupKey: groupKey} - lastIndex := len(query.Select) - 1 for i, sc := range query.Select { - storage := sc.FieldStorage - orderByThis := storage == query.OrderBy - switch sc.Operation { case Count: - value := set.FValues[storage] - sb.WriteString(fmt.Sprintf("%d", int(value))) - if orderByThis { - r.orderBy = value - } + value = set.FValues[sc.FieldStorage] + valueStr = fmt.Sprintf("%d", int(value)) case Len: fallthrough case Sum: @@ -121,74 +251,48 @@ func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSepa case Min: fallthrough case Max: - value := set.FValues[storage] - sb.WriteString(fmt.Sprintf("%f", value)) - if orderByThis { - r.orderBy = value - } + value = set.FValues[sc.FieldStorage] + valueStr = fmt.Sprintf("%f", value) case Last: - value := set.SValues[storage] - if orderByThis { - f, err := strconv.ParseFloat(value, 64) - if err == nil { - r.orderBy = f - } - } - sb.WriteString(value) + valueStr = set.SValues[sc.FieldStorage] + value, _ = strconv.ParseFloat(valueStr, 64) case Avg: - value := set.FValues[storage] / float64(set.Samples) - sb.WriteString(fmt.Sprintf("%f", value)) - if orderByThis { - r.orderBy = value - } + value = set.FValues[sc.FieldStorage] / float64(set.Samples) + valueStr = fmt.Sprintf("%f", value) default: - return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) + return rows, widths, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) } - if i != lastIndex { - sb.WriteString(fieldSeparator) + + if sc.FieldStorage == query.OrderBy { + r.orderBy = value + } + r.values = append(r.values, valueStr) + + if !gatherWidths { + continue + } + if widths[i] < len(sc.FieldStorage) { + widths[i] = len(sc.FieldStorage) + } + if widths[i] < len(valueStr) { + widths[i] = len(valueStr) } } - r.resultStr = sb.String() - resultSlice = append(resultSlice, r) + rows = append(rows, r) } if query.OrderBy != "" { if query.ReverseOrder { - sort.SliceStable(resultSlice, func(i, j int) bool { - return resultSlice[i].orderBy < resultSlice[j].orderBy + sort.SliceStable(rows, func(i, j int) bool { + return rows[i].orderBy < rows[j].orderBy }) } else { - sort.SliceStable(resultSlice, func(i, j int) bool { - return resultSlice[i].orderBy > resultSlice[j].orderBy + sort.SliceStable(rows, func(i, j int) bool { + return rows[i].orderBy > rows[j].orderBy }) } } - var sb strings.Builder - - // Write header first - if addHeader { - lastIndex := len(query.Select) - 1 - sb.WriteString(lineStarter) - for i, sc := range query.Select { - sb.WriteString(sc.FieldStorage) - if i != lastIndex { - sb.WriteString(fieldSeparator) - } - } - sb.WriteString("\n") - } - - // And now write the data - for i, r := range resultSlice { - if i == limit { - break - } - sb.WriteString(lineStarter) - sb.WriteString(r.resultStr) - sb.WriteString("\n") - } - - return sb.String(), len(resultSlice), nil + return rows, widths, nil } diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index da976bd..c67137e 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -1,16 +1,22 @@ package logformat import ( + "fmt" "strings" - "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/protocol" ) -// MakeFieldsDEFAULT is the default log file mapreduce parser. +// MakeFieldsDEFAULT is the default DTail log file key-value parser. func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { splitted := strings.Split(maprLine, protocol.FieldDelimiter) - fields := make(map[string]string, len(splitted)) + + if len(splitted) < 3 || !strings.HasPrefix(splitted[3], "MAPREDUCE:") || !strings.HasPrefix(splitted[0], "INFO") { + // Not a DTail mapreduce log line. + return nil, IgnoreFieldsErr + } + + fields := make(map[string]string, len(splitted)+8) fields["*"] = "*" fields["$line"] = maprLine @@ -19,20 +25,14 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { fields["$timezone"] = p.timeZoneName fields["$timeoffset"] = p.timeZoneOffset - kvStart := 0 - // DTail mapreduce format - if len(splitted) > 3 && strings.HasPrefix(splitted[3], "MAPREDUCE:") { - fields["$severity"] = splitted[0] - // TODO: Parse time like we do at Mimecast - fields["$time"] = splitted[1] - kvStart = 4 - } + fields["$severity"] = splitted[0] + // TODO: Parse time like we do at Mimecast + fields["$time"] = splitted[1] - for _, kv := range splitted[kvStart:] { + for _, kv := range splitted[4:] { keyAndValue := strings.SplitN(kv, "=", 2) if len(keyAndValue) != 2 { - logger.Debug("Unable to parse key-value token, ignoring it", kv) - continue + return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv) } fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] } diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go index 6284008..79911d1 100644 --- a/internal/mapr/logformat/default_test.go +++ b/internal/mapr/logformat/default_test.go @@ -11,8 +11,8 @@ func TestDefaultLogFormat(t *testing.T) { } inputs := []string{ - "foo=bar|baz=bay", "INFO|20210907-065632|SERVER|MAPREDUCE:TEST|foo=bar|baz=bay", + "INFO|20210907-065732|CLIENT|MAPREDUCE:YOMAN|baz=bay|foo=bar", } for _, input := range inputs { @@ -23,20 +23,21 @@ func TestDefaultLogFormat(t *testing.T) { } if bar, ok := fields["foo"]; !ok { - t.Errorf("Expected field 'foo', but no such field there\n") + t.Errorf("Expected field 'foo', but no such field there in '%s'\n", input) } else if bar != "bar" { - t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar) + t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", bar, input) } if bay, ok := fields["baz"]; !ok { - t.Errorf("Expected field 'baz', but no such field there\n") + t.Errorf("Expected field 'baz', but no such field there in '%s'\n", input) } else if bay != "bay" { - t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay) + t.Errorf("Expected 'bay' stored in field 'baz', but got '%s' in '%s'\n", bay, input) } } - if _, err := parser.MakeFields("foo=bar|bazbay"); err == nil { - t.Errorf("Expected error but didn't") + fields, err := parser.MakeFields("foozoo=bar|bazbay") + if _, ok := fields["foo"]; ok { + t.Errorf("Expected fiending field 'foo', but found it\n") } } diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go new file mode 100644 index 0000000..23d75cb --- /dev/null +++ b/internal/mapr/logformat/generickv.go @@ -0,0 +1,31 @@ +package logformat + +import ( + "strings" + + "github.com/mimecast/dtail/internal/protocol" +) + +// MakeFieldsGENERICKV is the generic key-value logfile parser. +func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error) { + splitted := strings.Split(maprLine, protocol.FieldDelimiter) + fields := make(map[string]string, len(splitted)) + + fields["*"] = "*" + fields["$line"] = maprLine + fields["$empty"] = "" + fields["$hostname"] = p.hostname + fields["$timezone"] = p.timeZoneName + fields["$timeoffset"] = p.timeZoneOffset + + for _, kv := range splitted[0:] { + keyAndValue := strings.SplitN(kv, "=", 2) + if len(keyAndValue) != 2 { + //logger.Debug("Unable to parse key-value token, ignoring it", kv) + continue + } + fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] + } + + return fields, nil +} diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go index c53729a..6582b5f 100644 --- a/internal/mapr/logformat/parser.go +++ b/internal/mapr/logformat/parser.go @@ -12,6 +12,8 @@ import ( "github.com/mimecast/dtail/internal/mapr" ) +var IgnoreFieldsErr error = errors.New("Ignore this field set") + // Parser is used to parse the mapreduce information from the server log files. type Parser struct { hostname string diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 9106f52..a6d6bb1 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -13,6 +13,7 @@ import ( "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" + "github.com/mimecast/dtail/internal/protocol" ) // Aggregate is for aggregating mapreduce data on the DTail server side. @@ -89,7 +90,6 @@ func (a *Aggregate) Shutdown() { // Start an aggregation. func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { - myCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -109,6 +109,7 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { fieldsCh = a.addFields(myCtx, fieldsCh) } + // Periodically pre-aggregate data every a.query.Interval seconds. go a.aggregateTimer(myCtx) a.makeMaprLines(myCtx, fieldsCh, maprLines) } @@ -139,13 +140,16 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { maprLine := strings.TrimSpace(line.Content.String()) pool.RecycleBytesBuffer(line.Content) - fields, err := a.parser.MakeFields(maprLine) - logger.Debug(fields, err) + fields, err := a.parser.MakeFields(maprLine) if err != nil { - logger.Error(err) + // Should fields be ignored anyway? + if err != logformat.IgnoreFieldsErr { + logger.Error(fields, err) + } continue } + if !a.query.WhereClause(fields) { continue } @@ -170,7 +174,7 @@ func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]st defer close(ch) for { - // fieldsCh will be closed via 'makeFields' if ctx is done + // fieldsCh will be closed via 'makeFields' when ctx is done fields, ok := <-fieldsCh if !ok { return @@ -219,12 +223,11 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin } func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { - //logger.Trace("Aggregating", group, fields) var sb strings.Builder for i, field := range a.query.GroupBy { if i > 0 { - sb.WriteString(" ") + sb.WriteString(protocol.AggregateGroupKeyCombinator) } if val, ok := fields[field]; ok { sb.WriteString(val) -- cgit v1.2.3 From 842fd5800000bb68d6306a9ecad80a98ed762a2f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 12 Sep 2021 19:01:34 +0300 Subject: limit mapreduce table output to 10 rows by default --- internal/mapr/globalgroupset.go | 4 ++-- internal/mapr/groupset.go | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go index 21bf990..50bac37 100644 --- a/internal/mapr/globalgroupset.go +++ b/internal/mapr/globalgroupset.go @@ -93,9 +93,9 @@ func (g *GlobalGroupSet) WriteResult(query *Query) error { } // Result returns the result of the mapreduce aggregation as a string. -func (g *GlobalGroupSet) Result(query *Query) (string, int, error) { +func (g *GlobalGroupSet) Result(query *Query, rowsLimit int) (string, int, error) { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return g.GroupSet.Result(query) + return g.GroupSet.Result(query, rowsLimit) } diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index d29559a..9bff790 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -68,12 +68,16 @@ func (g *GroupSet) Serialize(ctx context.Context, ch chan<- string) { } // Result returns a nicely formated result of the query from the group set. -func (g *GroupSet) Result(query *Query) (string, int, error) { +func (g *GroupSet) Result(query *Query, rowsLimit int) (string, int, error) { rows, widths, err := g.result(query, true) if err != nil { return "", 0, err } + if query.Limit != -1 { + rowsLimit = query.Limit + } + sb := pool.BuilderBuffer.Get().(*strings.Builder) defer pool.RecycleBuilderBuffer(sb) @@ -141,7 +145,7 @@ func (g *GroupSet) Result(query *Query) (string, int, error) { // And now write the data for i, r := range rows { - if i == query.Limit { + if i == rowsLimit { break } for j, value := range r.values { -- cgit v1.2.3 From 2ebe7e9d63ba62c6f19749c39fe0a577d86ca775 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 12 Sep 2021 19:04:42 +0300 Subject: bugfix: dmap skipped the last couple of mapreduce lines --- internal/mapr/server/aggregate.go | 102 ++++++++++++++++---------------------- 1 file changed, 42 insertions(+), 60 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index a6d6bb1..d11ed7d 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -19,16 +19,12 @@ import ( // Aggregate is for aggregating mapreduce data on the DTail server side. type Aggregate struct { done *internal.Done - // Log lines to process (parsing MAPREDUCE lines). - Lines chan line.Line + // NextLinesCh can be used to use a new line ch. + NextLinesCh chan chan line.Line // Hostname of the current server (used to populate $hostname field). hostname string // Signals to serialize data. serialize chan struct{} - // Signals to flush data. - flush chan struct{} - // Signals that data has been flushed - flushed chan struct{} // The mapr query query *mapr.Query // The mapr log format parser @@ -69,14 +65,12 @@ func NewAggregate(queryStr string) (*Aggregate, error) { } a := Aggregate{ - done: internal.NewDone(), - Lines: make(chan line.Line, 100), - serialize: make(chan struct{}), - flush: make(chan struct{}), - flushed: make(chan struct{}), - hostname: s[0], - query: query, - parser: logParser, + done: internal.NewDone(), + NextLinesCh: make(chan chan line.Line, 10), + serialize: make(chan struct{}), + hostname: s[0], + query: query, + parser: logParser, } return &a, nil @@ -84,12 +78,11 @@ func NewAggregate(queryStr string) (*Aggregate, error) { // Shutdown the aggregation engine. func (a *Aggregate) Shutdown() { - a.Flush() a.done.Shutdown() } // Start an aggregation. -func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { +func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) { myCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -102,16 +95,16 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { } }() - fieldsCh := a.makeFields(myCtx) + fieldsCh := a.fieldsFromLines(myCtx) // Add fields (e.g. via 'set' clause) if len(a.query.Set) > 0 { - fieldsCh = a.addFields(myCtx, fieldsCh) + fieldsCh = a.setAdditionalFields(myCtx, fieldsCh) } // Periodically pre-aggregate data every a.query.Interval seconds. go a.aggregateTimer(myCtx) - a.makeMaprLines(myCtx, fieldsCh, maprLines) + a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages) } func (a *Aggregate) aggregateTimer(ctx context.Context) { @@ -125,23 +118,38 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { } } -func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { - ch := make(chan map[string]string) +func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string { + fieldsCh := make(chan map[string]string) go func() { - defer close(ch) + defer close(fieldsCh) + var lines chan line.Line + + // Gather first lines channel (first input file) + select { + case lines = <-a.NextLinesCh: + case <-ctx.Done(): + return + } for { select { - case line, ok := <-a.Lines: + case line, ok := <-lines: if !ok { - return + select { + case lines = <-a.NextLinesCh: + // Have a new lines channel (e.g. new input file) + case <-ctx.Done(): + default: + // No new lines channel found. + return + } } maprLine := strings.TrimSpace(line.Content.String()) + fields, err := a.parser.MakeFields(maprLine) pool.RecycleBytesBuffer(line.Content) - fields, err := a.parser.MakeFields(maprLine) if err != nil { // Should fields be ignored anyway? if err != logformat.IgnoreFieldsErr { @@ -155,7 +163,7 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { } select { - case ch <- fields: + case fieldsCh <- fields: case <-ctx.Done(): } case <-ctx.Done(): @@ -164,17 +172,16 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { } }() - return ch + return fieldsCh } -func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { - ch := make(chan map[string]string) +func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { + newFieldsCh := make(chan map[string]string) go func() { - defer close(ch) + defer close(newFieldsCh) for { - // fieldsCh will be closed via 'makeFields' when ctx is done fields, ok := <-fieldsCh if !ok { return @@ -184,23 +191,22 @@ func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]st } select { - case ch <- fields: + case newFieldsCh <- fields: case <-ctx.Done(): } } }() - return ch + return newFieldsCh } -func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { +func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan map[string]string, maprMessages chan<- string) { group := mapr.NewGroupSet() serialize := func() { logger.Info("Serializing mapreduce result") - group.Serialize(ctx, maprLines) + group.Serialize(ctx, maprMessages) group = mapr.NewGroupSet() - logger.Info("Done serializing mapreduce result") } for { @@ -213,9 +219,6 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin a.aggregate(group, fields) case <-a.serialize: serialize() - case <-a.flush: - serialize() - a.flushed <- struct{}{} case <-ctx.Done(): return } @@ -264,24 +267,3 @@ func (a *Aggregate) Serialize(ctx context.Context) { case <-ctx.Done(): } } - -// Flush all data. -func (a *Aggregate) Flush() { - select { - case a.flush <- struct{}{}: - logger.Info("Flushing mapreduce data") - case <-time.After(time.Minute): - logger.Warn("Starting to flush mapreduce data takes over a minute") - return - case <-a.done.Done(): - return - } - - select { - case <-a.flushed: - logger.Info("Done flushing") - case <-time.After(time.Minute): - logger.Warn("Waiting for data to be flushed takes over a minute") - case <-a.done.Done(): - } -} -- cgit v1.2.3 From fe3e68afd99d8ea246be52893730f987e138ec24 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 19 Sep 2021 13:22:59 +0300 Subject: move args to config package logger package rewrite as dlog --- internal/mapr/aggregateset.go | 6 +++--- internal/mapr/client/aggregate.go | 4 ++-- internal/mapr/funcs/function.go | 4 ++-- internal/mapr/groupset.go | 4 ++-- internal/mapr/logformat/default.go | 1 + internal/mapr/logformat/generickv.go | 2 +- internal/mapr/logformat/parser.go | 3 --- internal/mapr/query.go | 4 ---- internal/mapr/server/aggregate.go | 22 +++++++++++----------- internal/mapr/token.go | 10 +++++----- internal/mapr/whereclause.go | 6 +++--- internal/mapr/wherecondition.go | 6 +++--- 12 files changed, 33 insertions(+), 39 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index 47f4925..14e6943 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -6,7 +6,7 @@ import ( "strconv" "strings" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/protocol" ) @@ -37,7 +37,7 @@ func (s *AggregateSet) String() string { // Merge one aggregate set into this one. func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { s.Samples += set.Samples - //logger.Trace("Merge", set) + //dlog.Common.Trace("Merge", set) for _, sc := range query.Select { storage := sc.FieldStorage @@ -70,7 +70,7 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { // Serialize the aggregate set so it can be sent over the wire. func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) { - logger.Trace("Serialising mapr.AggregateSet", s) + dlog.Common.Trace("Serialising mapr.AggregateSet", s) sb := pool.BuilderBuffer.Get().(*strings.Builder) defer pool.RecycleBuilderBuffer(sb) diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 5cc09a1..d0c1d70 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -5,7 +5,7 @@ import ( "strconv" "strings" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/protocol" ) @@ -52,7 +52,7 @@ func (a *Aggregate) Aggregate(message string) error { for _, sc := range a.query.Select { if val, ok := fields[sc.FieldStorage]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil { - logger.Error(err) + dlog.Common.Error(err) continue } addedSamples = true diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go index 1a89c3a..0433b9a 100644 --- a/internal/mapr/funcs/function.go +++ b/internal/mapr/funcs/function.go @@ -58,9 +58,9 @@ func NewFunctionStack(in string) (FunctionStack, string, error) { // Call the function stack. func (fs FunctionStack) Call(str string) string { for i := len(fs) - 1; i >= 0; i-- { - //logger.Debug("Call", fs[i].Name, str) + //dlog.Common.Debug("Call", fs[i].Name, str) str = fs[i].call(str) - //logger.Debug("Call.result", fs[i].Name, str) + //dlog.Common.Debug("Call.result", fs[i].Name, str) } return str diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index 9bff790..df8c603 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -11,7 +11,7 @@ import ( "github.com/mimecast/dtail/internal/color" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/protocol" ) @@ -189,7 +189,7 @@ func (g *GroupSet) WriteResult(query *Query) error { return err } - logger.Info("Writing outfile", query.Outfile) + dlog.Common.Info("Writing outfile", query.Outfile) tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile) file, err := os.Create(tmpOutfile) diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index c67137e..e0bbc30 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -26,6 +26,7 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { fields["$timeoffset"] = p.timeZoneOffset fields["$severity"] = splitted[0] + fields["$loglevel"] = splitted[0] // TODO: Parse time like we do at Mimecast fields["$time"] = splitted[1] diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go index 23d75cb..3769c22 100644 --- a/internal/mapr/logformat/generickv.go +++ b/internal/mapr/logformat/generickv.go @@ -21,7 +21,7 @@ func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error) for _, kv := range splitted[0:] { keyAndValue := strings.SplitN(kv, "=", 2) if len(keyAndValue) != 2 { - //logger.Debug("Unable to parse key-value token, ignoring it", kv) + //dlog.Common.Debug("Unable to parse key-value token, ignoring it", kv) continue } fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go index 6582b5f..a352580 100644 --- a/internal/mapr/logformat/parser.go +++ b/internal/mapr/logformat/parser.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" ) @@ -76,12 +75,10 @@ func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err erro if errInterface == nil { fields, err = returnValues[0].Interface().(map[string]string), nil - logger.Trace("parser.MakeFields", fields, err) return } fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error) - logger.Trace("parser.MakeFields", fields, err) return } diff --git a/internal/mapr/query.go b/internal/mapr/query.go index 01852da..6c1d849 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -6,8 +6,6 @@ import ( "strconv" "strings" "time" - - "github.com/mimecast/dtail/internal/io/logger" ) const ( @@ -67,8 +65,6 @@ func NewQuery(queryStr string) (*Query, error) { } err := q.parse(tokens) - - logger.Debug(q) return &q, err } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index d11ed7d..767aada 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -9,7 +9,7 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" @@ -40,7 +40,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { fqdn, err := os.Hostname() if err != nil { - logger.Error(err) + dlog.Common.Error(err) } s := strings.Split(fqdn, ".") @@ -55,12 +55,12 @@ func NewAggregate(queryStr string) (*Aggregate, error) { parserName = query.LogFormat } - logger.Info("Creating log format parser", parserName) + dlog.Common.Info("Creating log format parser", parserName) logParser, err := logformat.NewParser(parserName, query) if err != nil { - logger.Error("Could not create log format parser. Falling back to 'generic'", err) + dlog.Common.Error("Could not create log format parser. Falling back to 'generic'", err) if logParser, err = logformat.NewParser("generic", query); err != nil { - logger.FatalExit("Could not create log format parser", err) + dlog.Common.FatalPanic("Could not create log format parser", err) } } @@ -153,7 +153,7 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin if err != nil { // Should fields be ignored anyway? if err != logformat.IgnoreFieldsErr { - logger.Error(fields, err) + dlog.Common.Error(fields, err) } continue } @@ -187,7 +187,7 @@ func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map return } if err := a.query.SetClause(fields); err != nil { - logger.Error(err) + dlog.Common.Error(err) } select { @@ -204,7 +204,7 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan m group := mapr.NewGroupSet() serialize := func() { - logger.Info("Serializing mapreduce result") + dlog.Common.Info("Serializing mapreduce result") group.Serialize(ctx, maprMessages) group = mapr.NewGroupSet() } @@ -243,7 +243,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { for _, sc := range a.query.Select { if val, ok := fields[sc.Field]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { - logger.Error(err) + dlog.Common.Error(err) continue } addedSample = true @@ -255,7 +255,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { return } - logger.Trace("Aggregated data locally without adding new samples") + dlog.Common.Trace("Aggregated data locally without adding new samples") } // Serialize all the aggregated data. @@ -263,7 +263,7 @@ func (a *Aggregate) Serialize(ctx context.Context) { select { case a.serialize <- struct{}{}: case <-time.After(time.Minute): - logger.Warn("Starting to serialize mapredice data takes over a minute") + dlog.Common.Warn("Starting to serialize mapredice data takes over a minute") case <-ctx.Done(): } } diff --git a/internal/mapr/token.go b/internal/mapr/token.go index 8972188..7c6578b 100644 --- a/internal/mapr/token.go +++ b/internal/mapr/token.go @@ -58,12 +58,12 @@ func tokenize(queryStr string) []token { } func tokensConsume(tokens []token) ([]token, []token) { - //logger.Trace("=====================") + //dlog.Common.Trace("=====================") var consumed []token for i, t := range tokens { if t.isKeyword() { - //logger.Trace("keyword", t) + //dlog.Common.Trace("keyword", t) return tokens[i:], consumed } // strip escapes, such as ` from `foo`, this allows to use keywords as field names @@ -73,7 +73,7 @@ func tokensConsume(tokens []token) ([]token, []token) { } if t.str[0] == '`' && t.str[length-1] == '`' { stripped := t.str[1 : length-1] - //logger.Trace("stripped", stripped) + //dlog.Common.Trace("stripped", stripped) t := token{ str: stripped, isBareword: t.isBareword, @@ -81,11 +81,11 @@ func tokensConsume(tokens []token) ([]token, []token) { consumed = append(consumed, t) continue } - //logger.Trace("bare", token) + //dlog.Common.Trace("bare", token) consumed = append(consumed, t) } - //logger.Trace("result", consumed) + //dlog.Common.Trace("result", consumed) return nil, consumed } diff --git a/internal/mapr/whereclause.go b/internal/mapr/whereclause.go index cc1c164..6356d94 100644 --- a/internal/mapr/whereclause.go +++ b/internal/mapr/whereclause.go @@ -3,7 +3,7 @@ package mapr import ( "strconv" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // WhereClause interprets the where clause of the mapreduce query. @@ -55,7 +55,7 @@ func whereClauseFloatValue(fields map[string]string, str string, float float64, } return f, true default: - logger.Error("Unexpected argument in 'where' clause", str, float, t) + dlog.Common.Error("Unexpected argument in 'where' clause", str, float, t) return 0, false } } @@ -71,7 +71,7 @@ func whereClauseStringValue(fields map[string]string, str string, t fieldType) ( case String: return str, true default: - logger.Error("Unexpected argument in 'where' clause", str, t) + dlog.Common.Error("Unexpected argument in 'where' clause", str, t) return str, false } } diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go index 7a60dba..c60c0a5 100644 --- a/internal/mapr/wherecondition.go +++ b/internal/mapr/wherecondition.go @@ -6,7 +6,7 @@ import ( "strconv" "strings" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // QueryOperation determines the mapreduce operation. @@ -168,7 +168,7 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool { case FloatGe: return lValue >= rValue default: - logger.Error("Unknown float operation", lValue, wc.Operation, rValue) + dlog.Common.Error("Unknown float operation", lValue, wc.Operation, rValue) } return false @@ -193,7 +193,7 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool { case StringNotHasSuffix: return !strings.HasSuffix(lValue, rValue) default: - logger.Error("Unknown string operation", lValue, wc.Operation, rValue) + dlog.Common.Error("Unknown string operation", lValue, wc.Operation, rValue) } return false -- cgit v1.2.3 From fcaa94c7453efa0d74e330128c0f5c2cde8f11b3 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 26 Sep 2021 16:42:47 +0300 Subject: refactor config reader - also looks in additional search paths for config file unless NONE is specified --- internal/mapr/logformat/default.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/mapr') diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index e0bbc30..7fb1700 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -27,7 +27,7 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { fields["$severity"] = splitted[0] fields["$loglevel"] = splitted[0] - // TODO: Parse time like we do at Mimecast + // NEXT: Parse time like we do at Mimecast fields["$time"] = splitted[1] for _, kv := range splitted[4:] { -- cgit v1.2.3 From 764ef99a3d779a0db1fb60679292af52425ba2f6 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 2 Oct 2021 10:46:47 +0300 Subject: add more default fields to MAPREDUCE --- internal/mapr/logformat/default.go | 23 +++++++++--- internal/mapr/logformat/default_test.go | 62 +++++++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 14 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index 7fb1700..8016667 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -11,7 +11,7 @@ import ( func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { splitted := strings.Split(maprLine, protocol.FieldDelimiter) - if len(splitted) < 3 || !strings.HasPrefix(splitted[3], "MAPREDUCE:") || !strings.HasPrefix(splitted[0], "INFO") { + if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") || !strings.HasPrefix(splitted[0], "INFO") { // Not a DTail mapreduce log line. return nil, IgnoreFieldsErr } @@ -27,10 +27,25 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { fields["$severity"] = splitted[0] fields["$loglevel"] = splitted[0] - // NEXT: Parse time like we do at Mimecast - fields["$time"] = splitted[1] - for _, kv := range splitted[4:] { + time := splitted[1] + fields["$time"] = time + if len(time) == 15 { + // Example: 20211002-071209 + fields["$date"] = time[0:8] + fields["$hour"] = time[9:11] + fields["$minute"] = time[11:13] + fields["$second"] = time[13:] + } + fields["$pid"] = splitted[2] + fields["$caller"] = splitted[3] + fields["$cpus"] = splitted[4] + fields["$goroutines"] = splitted[5] + fields["$cgocalls"] = splitted[6] + fields["$loadavg"] = splitted[7] + fields["$uptime"] = splitted[8] + + for _, kv := range splitted[10:] { keyAndValue := strings.SplitN(kv, "=", 2) if len(keyAndValue) != 2 { return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv) diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go index 79911d1..02c03a3 100644 --- a/internal/mapr/logformat/default_test.go +++ b/internal/mapr/logformat/default_test.go @@ -1,6 +1,7 @@ package logformat import ( + "fmt" "testing" ) @@ -10,9 +11,15 @@ func TestDefaultLogFormat(t *testing.T) { t.Errorf("Unable to create parser: %s", err.Error()) } + date := "20211002" + hour := "07" + minute := "23" + second := "42" + time := fmt.Sprintf("%s-%s%s%s", date, hour, minute, second) + inputs := []string{ - "INFO|20210907-065632|SERVER|MAPREDUCE:TEST|foo=bar|baz=bay", - "INFO|20210907-065732|CLIENT|MAPREDUCE:YOMAN|baz=bay|foo=bar", + fmt.Sprintf("INFO|%s|1|default_test.go:0|8|14|7|0.21|471h0m21s|MAPREDUCE:STATS|foo=bar|bar=foo", time), + fmt.Sprintf("INFO|%s|1|default_test.go:0|8|14|7|0.21|471h0m21s|MAPREDUCE:STATS|bar=foo|foo=bar", time), } for _, input := range inputs { @@ -22,18 +29,53 @@ func TestDefaultLogFormat(t *testing.T) { t.Errorf("Parser unable to make fields: %s", err.Error()) } - if bar, ok := fields["foo"]; !ok { - t.Errorf("Expected field 'foo', but no such field there in '%s'\n", input) - } else if bar != "bar" { - t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", bar, input) + if val, ok := fields["$severity"]; !ok { + t.Errorf("Expected field '$severity', but no such field there in '%s'\n", input) + } else if val != "INFO" { + t.Errorf("Expected 'INFO' stored in field '$severity', but got '%s' in '%s'\n", val, input) + } + + if val, ok := fields["$time"]; !ok { + t.Errorf("Expected field '$time', but no such field there in '%s'\n", input) + } else if val != time { + t.Errorf("Expected '%s' stored in field '$time', but got '%s' in '%s'\n", time, val, input) + } + + if val, ok := fields["$date"]; !ok { + t.Errorf("Expected field '$date', but no such field there in '%s'\n", input) + } else if val != date { + t.Errorf("Expected '%s' stored in field '$date', but got '%s' in '%s'\n", date, val, input) + } + + if val, ok := fields["$hour"]; !ok { + t.Errorf("Expected field '$hour', but no such field there in '%s'\n", input) + } else if val != hour { + t.Errorf("Expected '%s' stored in field '$hour', but got '%s' in '%s'\n", hour, val, input) + } + + if val, ok := fields["$minute"]; !ok { + t.Errorf("Expected field '$minute', but no such field there in '%s'\n", input) + } else if val != minute { + t.Errorf("Expected '%s' stored in field '$minute', but got '%s' in '%s'\n", minute, val, input) } - if bay, ok := fields["baz"]; !ok { - t.Errorf("Expected field 'baz', but no such field there in '%s'\n", input) - } else if bay != "bay" { - t.Errorf("Expected 'bay' stored in field 'baz', but got '%s' in '%s'\n", bay, input) + if val, ok := fields["$second"]; !ok { + t.Errorf("Expected field '$second', but no such field there in '%s'\n", input) + } else if val != second { + t.Errorf("Expected '%s' stored in field '$second', but got '%s' in '%s'\n", second, val, input) } + if val, ok := fields["foo"]; !ok { + t.Errorf("Expected field 'foo', but no such field there in '%s'\n", input) + } else if val != "bar" { + t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", val, input) + } + + if val, ok := fields["bar"]; !ok { + t.Errorf("Expected field 'bar', but no such field there in '%s'\n", input) + } else if val != "foo" { + t.Errorf("Expected 'foo' stored in field 'bar', but got '%s' in '%s'\n", val, input) + } } fields, err := parser.MakeFields("foozoo=bar|bazbay") -- cgit v1.2.3 From b2dbe133347ef220ff781ffeb1f8137245f5235f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 3 Oct 2021 13:32:20 +0300 Subject: when a mapreduce outfile is specified also always write a outfile.query file --- internal/mapr/groupset.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index df8c603..ce7630d 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -178,11 +178,31 @@ func (g *GroupSet) Result(query *Query, rowsLimit int) (string, int, error) { return sb.String(), len(rows), nil } +func (*GroupSet) writeQueryFile(query *Query) error { + queryFile := fmt.Sprintf("%s.query", query.Outfile) + tmpQueryFile := fmt.Sprintf("%s.tmp", queryFile) + dlog.Common.Debug("Writing query file", queryFile) + + fd, err := os.Create(tmpQueryFile) + if err != nil { + return err + } + defer fd.Close() + + fd.WriteString(query.RawQuery) + os.Rename(tmpQueryFile, queryFile) + + return nil +} + // WriteResult writes the result to an CSV outfile. func (g *GroupSet) WriteResult(query *Query) error { if !query.HasOutfile() { return errors.New("No outfile specified") } + if err := g.writeQueryFile(query); err != nil { + return err + } rows, _, err := g.result(query, false) if err != nil { @@ -192,22 +212,22 @@ func (g *GroupSet) WriteResult(query *Query) error { dlog.Common.Info("Writing outfile", query.Outfile) tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile) - file, err := os.Create(tmpOutfile) + fd, err := os.Create(tmpOutfile) if err != nil { return err } - defer file.Close() + defer fd.Close() // Generate header now lastIndex := len(query.Select) - 1 for i, sc := range query.Select { - file.WriteString(sc.FieldStorage) + fd.WriteString(sc.FieldStorage) if i == lastIndex { continue } - file.WriteString(protocol.CSVDelimiter) + fd.WriteString(protocol.CSVDelimiter) } - file.WriteString("\n") + fd.WriteString("\n") // And now write the data for i, r := range rows { @@ -215,13 +235,13 @@ func (g *GroupSet) WriteResult(query *Query) error { break } for j, value := range r.values { - file.WriteString(value) + fd.WriteString(value) if j == lastIndex { continue } - file.WriteString(protocol.CSVDelimiter) + fd.WriteString(protocol.CSVDelimiter) } - file.WriteString("\n") + fd.WriteString("\n") } if err := os.Rename(tmpOutfile, query.Outfile); err != nil { -- cgit v1.2.3 From 2d7ddbeae8286373ac19787dc7dde598a7cb0598 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Fri, 8 Oct 2021 11:43:43 +0300 Subject: refactor --- internal/mapr/logformat/default_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/mapr') diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go index 02c03a3..a777156 100644 --- a/internal/mapr/logformat/default_test.go +++ b/internal/mapr/logformat/default_test.go @@ -32,7 +32,7 @@ func TestDefaultLogFormat(t *testing.T) { if val, ok := fields["$severity"]; !ok { t.Errorf("Expected field '$severity', but no such field there in '%s'\n", input) } else if val != "INFO" { - t.Errorf("Expected 'INFO' stored in field '$severity', but got '%s' in '%s'\n", val, input) + t.Errorf("Expected 'Info' stored in field '$severity', but got '%s' in '%s'\n", val, input) } if val, ok := fields["$time"]; !ok { -- cgit v1.2.3 From 7a7169791a64190e1002e38bc9c04ad0d5c1ce1f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 16:44:28 +0300 Subject: add dtail health check unit test. --- internal/mapr/server/aggregate.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 767aada..1f5d1c3 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -8,9 +8,8 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/dlog" - "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" "github.com/mimecast/dtail/internal/protocol" @@ -148,7 +147,8 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin maprLine := strings.TrimSpace(line.Content.String()) fields, err := a.parser.MakeFields(maprLine) - pool.RecycleBytesBuffer(line.Content) + // Can not recycle here for some rason. + //pool.RecycleBytesBuffer(line.Content) if err != nil { // Should fields be ignored anyway? -- cgit v1.2.3 From 97747ea0f3178f7f5890512d483fdccaa82846b0 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 9 Oct 2021 21:10:29 +0300 Subject: vetting and linting and some code restyling --- internal/mapr/aggregateset.go | 4 - internal/mapr/client/aggregate.go | 9 +-- internal/mapr/funcs/function.go | 7 +- internal/mapr/funcs/function_test.go | 21 ++++-- internal/mapr/funcs/maskdigits.go | 2 - internal/mapr/globalgroupset.go | 8 -- internal/mapr/groupset.go | 9 +-- internal/mapr/logformat/default.go | 5 +- internal/mapr/logformat/default_test.go | 24 ++++-- internal/mapr/logformat/generickv.go | 2 +- internal/mapr/logformat/parser.go | 12 +-- internal/mapr/query.go | 17 ++--- internal/mapr/query_test.go | 125 +++++++++++++++++++++----------- internal/mapr/selectcondition.go | 12 +-- internal/mapr/server/aggregate.go | 30 +++----- internal/mapr/setclause.go | 2 - internal/mapr/setcondition.go | 15 ++-- internal/mapr/token.go | 11 +-- internal/mapr/whereclause.go | 10 ++- internal/mapr/wherecondition.go | 24 +++--- 20 files changed, 180 insertions(+), 169 deletions(-) (limited to 'internal/mapr') diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index 14e6943..c50c7a1 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -38,7 +38,6 @@ func (s *AggregateSet) String() string { func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { s.Samples += set.Samples //dlog.Common.Trace("Merge", set) - for _, sc := range query.Select { storage := sc.FieldStorage switch sc.Operation { @@ -115,7 +114,6 @@ func (s *AggregateSet) addFloatMin(key string, value float64) { s.FValues[key] = value return } - if f > value { s.FValues[key] = value } @@ -128,7 +126,6 @@ func (s *AggregateSet) addFloatMax(key string, value float64) { s.FValues[key] = value return } - if f < value { s.FValues[key] = value } @@ -147,7 +144,6 @@ func (s *AggregateSet) setFloat(key string, value float64) { // Aggregate data to the aggregate set. func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) { var f float64 - // First check if we can aggregate anything without converting value to float. switch agg { case Count: diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index d0c1d70..02a6a5a 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -23,7 +23,9 @@ type Aggregate struct { } // NewAggregate create new client aggregator. -func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate { +func NewAggregate(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *Aggregate { + return &Aggregate{ query: query, group: mapr.NewGroupSet(), @@ -47,8 +49,8 @@ func (a *Aggregate) Aggregate(message string) error { fields := a.makeFields(parts[2:]) set := a.group.GetSet(groupKey) - var addedSamples bool + for _, sc := range a.query.Select { if val, ok := fields[sc.FieldStorage]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil { @@ -71,14 +73,12 @@ func (a *Aggregate) Aggregate(message string) error { // Re-init local group (make it empty again). a.group.InitSet() } - return nil } // Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. func (a *Aggregate) makeFields(parts []string) map[string]string { fields := make(map[string]string, len(parts)) - for _, part := range parts { kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2) if len(kv) != 2 { @@ -86,6 +86,5 @@ func (a *Aggregate) makeFields(parts []string) map[string]string { } fields[kv[0]] = kv[1] } - return fields } diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go index 0433b9a..418d86f 100644 --- a/internal/mapr/funcs/function.go +++ b/internal/mapr/funcs/function.go @@ -19,13 +19,12 @@ type Function struct { // FunctionStack is a list of functions stacked each other type FunctionStack []Function -// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns a corresponding function stack. +// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns +// a corresponding function stack. func NewFunctionStack(in string) (FunctionStack, string, error) { var fs FunctionStack - getCallback := func(name string) (CallbackFunc, error) { var cb CallbackFunc - switch name { case "md5sum": return Md5Sum, nil @@ -51,7 +50,6 @@ func NewFunctionStack(in string) (FunctionStack, string, error) { fs = append(fs, Function{name, call}) aux = aux[index+1 : len(aux)-1] } - return fs, aux, nil } @@ -62,6 +60,5 @@ func (fs FunctionStack) Call(str string) string { str = fs[i].call(str) //dlog.Common.Debug("Call.result", fs[i].Name, str) } - return str } diff --git a/internal/mapr/funcs/function_test.go b/internal/mapr/funcs/function_test.go index 415683c..8b5d8b7 100644 --- a/internal/mapr/funcs/function_test.go +++ b/internal/mapr/funcs/function_test.go @@ -6,16 +6,19 @@ func TestFunction(t *testing.T) { input := "md5sum($line)" fs, arg, err := NewFunctionStack(input) if err != nil { - t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs) + t.Errorf("error parsing function input '%s': %s (%v)\n", + input, err.Error(), fs) } if arg != "$line" { - t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs) + t.Errorf("error parsing function input '%s': expected argument '$line' but "+ + "got '%s' (%v)\n", input, arg, fs) } t.Log(input, fs, arg) result := fs.Call(input) if result != "b38699013d79e50d9d122433753959c1" { - t.Errorf("error executing function stack '%s': expected result 'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs) + t.Errorf("error executing function stack '%s': expected result "+ + "'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs) } input = "maskdigits(md5sum(maskdigits($line)))" @@ -24,22 +27,26 @@ func TestFunction(t *testing.T) { t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs) } if arg != "$line" { - t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs) + t.Errorf("error parsing function input '%s': expected argument '$line' but "+ + "got '%s' (%v)\n", input, arg, fs) } t.Log(input, fs, arg) result = fs.Call(input) if result != ".fac.bbe..bb.........d...a.c..b." { - t.Errorf("error executing function stack '%s': expected result '.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs) + t.Errorf("error executing function stack '%s': expected result "+ + "'.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs) } input = "md5sum$line)" if fs, _, err := NewFunctionStack(input); err == nil { - t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs) + t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", + input, fs) } input = "md5sum(makedigits$line))" if fs, _, err := NewFunctionStack(input); err == nil { - t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs) + t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", + input, fs) } } diff --git a/internal/mapr/funcs/maskdigits.go b/internal/mapr/funcs/maskdigits.go index d51f3d8..925ec4d 100644 --- a/internal/mapr/funcs/maskdigits.go +++ b/internal/mapr/funcs/maskdigits.go @@ -3,12 +3,10 @@ package funcs // MaskDigits masks all digits (replaces them with .) func MaskDigits(input string) string { s := []byte(input) - for i, b := range s { if '0' <= b && b <= '9' { s[i] = '.' } } - return string(s) } diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go index 50bac37..2d7f10b 100644 --- a/internal/mapr/globalgroupset.go +++ b/internal/mapr/globalgroupset.go @@ -17,7 +17,6 @@ func NewGlobalGroupSet() *GlobalGroupSet { semaphore: make(chan struct{}, 1), } g.InitSet() - return &g } @@ -30,7 +29,6 @@ func (g *GlobalGroupSet) String() string { func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return g.merge(query, group) } @@ -48,14 +46,12 @@ func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, erro // Merge a group set into the global group set. func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error { - for groupKey, set := range group.sets { s := g.GetSet(groupKey) if err := s.Merge(query, set); err != nil { return err } } - return nil } @@ -68,7 +64,6 @@ func (g *GlobalGroupSet) IsEmpty() bool { func (g *GlobalGroupSet) NumSets() int { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return len(g.sets) } @@ -80,7 +75,6 @@ func (g *GlobalGroupSet) SwapOut() *GroupSet { set := &GroupSet{sets: g.sets} g.InitSet() - return set } @@ -88,7 +82,6 @@ func (g *GlobalGroupSet) SwapOut() *GroupSet { func (g *GlobalGroupSet) WriteResult(query *Query) error { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return g.GroupSet.WriteResult(query) } @@ -96,6 +89,5 @@ func (g *GlobalGroupSet) WriteResult(query *Query) error { func (g *GlobalGroupSet) Result(query *Query, rowsLimit int) (string, int, error) { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return g.GroupSet.Result(query, rowsLimit) } diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index ce7630d..6ffc8b9 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -73,7 +73,6 @@ func (g *GroupSet) Result(query *Query, rowsLimit int) (string, int, error) { if err != nil { return "", 0, err } - if query.Limit != -1 { rowsLimit = query.Limit } @@ -91,12 +90,14 @@ func (g *GroupSet) Result(query *Query, rowsLimit int) (string, int, error) { if sc.FieldStorage == query.OrderBy { attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderSortKeyAttr) } + for _, groupBy := range query.GroupBy { if sc.FieldStorage == groupBy { attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderGroupKeyAttr) break } } + color.PaintWithAttrs(sb, str, config.Client.TermColors.MaprTable.HeaderFg, config.Client.TermColors.MaprTable.HeaderBg, @@ -191,7 +192,6 @@ func (*GroupSet) writeQueryFile(query *Query) error { fd.WriteString(query.RawQuery) os.Rename(tmpQueryFile, queryFile) - return nil } @@ -256,7 +256,6 @@ func (g *GroupSet) WriteResult(query *Query) error { func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, error) { var rows []result widths := make([]int, len(query.Select)) - var valueStr string var value float64 @@ -284,7 +283,8 @@ func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, err value = set.FValues[sc.FieldStorage] / float64(set.Samples) valueStr = fmt.Sprintf("%f", value) default: - return rows, widths, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) + return rows, widths, fmt.Errorf("Unknown aggregation method '%v'", + sc.Operation) } if sc.FieldStorage == query.OrderBy { @@ -302,7 +302,6 @@ func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, err widths[i] = len(valueStr) } } - rows = append(rows, r) } diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index 8016667..9b6c855 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -11,9 +11,10 @@ import ( func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { splitted := strings.Split(maprLine, protocol.FieldDelimiter) - if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") || !strings.HasPrefix(splitted[0], "INFO") { + if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") || + !strings.HasPrefix(splitted[0], "INFO") { // Not a DTail mapreduce log line. - return nil, IgnoreFieldsErr + return nil, ErrIgnoreFields } fields := make(map[string]string, len(splitted)+8) diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go index a777156..28e1acc 100644 --- a/internal/mapr/logformat/default_test.go +++ b/internal/mapr/logformat/default_test.go @@ -32,49 +32,57 @@ func TestDefaultLogFormat(t *testing.T) { if val, ok := fields["$severity"]; !ok { t.Errorf("Expected field '$severity', but no such field there in '%s'\n", input) } else if val != "INFO" { - t.Errorf("Expected 'Info' stored in field '$severity', but got '%s' in '%s'\n", val, input) + t.Errorf("Expected 'Info' stored in field '$severity', but got '%s' in '%s'\n", + val, input) } if val, ok := fields["$time"]; !ok { t.Errorf("Expected field '$time', but no such field there in '%s'\n", input) } else if val != time { - t.Errorf("Expected '%s' stored in field '$time', but got '%s' in '%s'\n", time, val, input) + t.Errorf("Expected '%s' stored in field '$time', but got '%s' in '%s'\n", + time, val, input) } if val, ok := fields["$date"]; !ok { t.Errorf("Expected field '$date', but no such field there in '%s'\n", input) } else if val != date { - t.Errorf("Expected '%s' stored in field '$date', but got '%s' in '%s'\n", date, val, input) + t.Errorf("Expected '%s' stored in field '$date', but got '%s' in '%s'\n", + date, val, input) } if val, ok := fields["$hour"]; !ok { t.Errorf("Expected field '$hour', but no such field there in '%s'\n", input) } else if val != hour { - t.Errorf("Expected '%s' stored in field '$hour', but got '%s' in '%s'\n", hour, val, input) + t.Errorf("Expected '%s' stored in field '$hour', but got '%s' in '%s'\n", + hour, val, input) } if val, ok := fields["$minute"]; !ok { t.Errorf("Expected field '$minute', but no such field there in '%s'\n", input) } else if val != minute { - t.Errorf("Expected '%s' stored in field '$minute', but got '%s' in '%s'\n", minute, val, input) + t.Errorf("Expected '%s' stored in field '$minute', but got '%s' in '%s'\n", + minute, val, input) } if val, ok := fields["$second"]; !ok { t.Errorf("Expected field '$second', but no such field there in '%s'\n", input) } else if val != second { - t.Errorf("Expected '%s' stored in field '$second', but got '%s' in '%s'\n", second, val, input) + t.Errorf("Expected '%s' stored in field '$second', but got '%s' in '%s'\n", + second, val, input) } if val, ok := fields["foo"]; !ok { t.Errorf("Expected field 'foo', but no such field there in '%s'\n", input) } else if val != "bar" { - t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", val, input) + t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", + val, input) } if val, ok := fields["bar"]; !ok { t.Errorf("Expected field 'bar', but no such field there in '%s'\n", input) } else if val != "foo" { - t.Errorf("Expected 'foo' stored in field 'bar', but got '%s' in '%s'\n", val, input) + t.Errorf("Expected 'foo' stored in field 'bar', but got '%s' in '%s'\n", + val, input) } } diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go index 3769c22..433eb5f 100644 --- a/internal/mapr/logformat/generickv.go +++ b/internal/mapr/logformat/generickv.go @@ -6,7 +6,7 @@ import ( "github.com/mimecast/dtail/internal/protocol" ) -// MakeFieldsGENERICKV is the generic key-value logfile parser. +// MakeFieldsGENERIGKV is the generic key-value logfile parser. func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error) { splitted := strings.Split(maprLine, protocol.FieldDelimiter) fields := make(map[string]string, len(splitted)) diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go index a352580..129081d 100644 --- a/internal/mapr/logformat/parser.go +++ b/internal/mapr/logformat/parser.go @@ -11,7 +11,8 @@ import ( "github.com/mimecast/dtail/internal/mapr" ) -var IgnoreFieldsErr error = errors.New("Ignore this field set") +// ErrIgnoreFields indicates that the fields should be ignored. +var ErrIgnoreFields error = errors.New("Ignore this field set") // Parser is used to parse the mapreduce information from the server log files. type Parser struct { @@ -26,11 +27,9 @@ type Parser struct { // NewParser returns a new log parser. func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) { hostname, err := os.Hostname() - if err != nil { return nil, err } - now := time.Now() zone, offset := now.Zone() @@ -44,7 +43,6 @@ func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) { if err != nil { return nil, err } - return &p, nil } @@ -53,7 +51,6 @@ func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) { // Parser. Whereas MODULENAME must be a upeprcase string. func (p *Parser) reflectLogFormat(logFormatName string) error { methodName := fmt.Sprintf("MakeFields%s", strings.ToUpper(logFormatName)) - rt := reflect.TypeOf(p) method, ok := rt.MethodByName(methodName) if !ok { @@ -62,7 +59,6 @@ func (p *Parser) reflectLogFormat(logFormatName string) error { p.makeFieldsFunc = method.Func p.makeFieldsReceiver = reflect.ValueOf(p) - return nil } @@ -70,15 +66,11 @@ func (p *Parser) reflectLogFormat(logFormatName string) error { func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err error) { inputValues := []reflect.Value{p.makeFieldsReceiver, reflect.ValueOf(maprLine)} returnValues := p.makeFieldsFunc.Call(inputValues) - errInterface := returnValues[1].Interface() - if errInterface == nil { fields, err = returnValues[0].Interface().(map[string]string), nil return } - fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error) - return } diff --git a/internal/mapr/query.go b/internal/mapr/query.go index 6c1d849..d7c32bd 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -32,7 +32,9 @@ type Query struct { } func (q Query) String() string { - return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,Set:%vGroupBy:%v,GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,RawQuery:%s,tokens:%v,LogFormat:%s)", + return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,Set:%vGroupBy:%v,"+ + "GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,"+ + "RawQuery:%s,tokens:%v,LogFormat:%s)", q.Select, q.Table, q.Where, @@ -54,18 +56,14 @@ func NewQuery(queryStr string) (*Query, error) { if queryStr == "" { return nil, nil } - tokens := tokenize(queryStr) - q := Query{ RawQuery: queryStr, tokens: tokens, Interval: time.Second * 5, Limit: -1, } - - err := q.parse(tokens) - return &q, err + return &q, q.parse(tokens) } // HasOutfile returns true if query result will be written to a CVS output file. @@ -174,13 +172,13 @@ func (q *Query) parse(tokens []token) error { } if len(q.Select) < 1 { - return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none") + return errors.New(invalidQuery + "Expected at least one field in 'select' " + + "clause but got none") } if len(q.GroupBy) == 0 { field := q.Select[0].Field q.GroupBy = append(q.GroupBy, field) } - if q.OrderBy != "" { var orderFieldIsValid bool for _, sc := range q.Select { @@ -190,7 +188,8 @@ func (q *Query) parse(tokens []token) error { } } if !orderFieldIsValid { - return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s', must be present in 'select' clause", q.OrderBy)) + return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s',"+ + "must be present in 'select' clause", q.OrderBy)) } } diff --git a/internal/mapr/query_test.go b/internal/mapr/query_test.go index b0b6c3a..88f7387 100644 --- a/internal/mapr/query_test.go +++ b/internal/mapr/query_test.go @@ -13,18 +13,25 @@ func TestParseQuerySimple(t *testing.T) { "select foo from bar where baz <", "select foo from bar where baz < 100 bay eq 12 group", "select foo from bar where baz < 100 bay eq 12 group by foo order by", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit set foo = bar;", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit set foo = bar;", } okQueries := []string{"select foo from bar", "select foo from bar where", "select foo from bar where baz < 100 bay eq 12", "select foo from bar where baz < 100, bay eq 12", "select foo from bar where baz < 100 and bay eq 12", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\"", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\" set $foo = maskdigits(bar), $baz = 12, $bay = $foo;", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit 23", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit 23 outfile \"result.csv\"", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit 23 outfile \"result.csv\" " + + "set $foo = maskdigits(bar), $baz = 12, $bay = $foo;", } for _, queryStr := range errorQueries { @@ -46,8 +53,13 @@ func TestParseQuerySimple(t *testing.T) { func TestParseQueryDeep(t *testing.T) { dialects := []string{ - "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23 set $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", - "SELECT s1, `from`, COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23 SET $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", + "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq " + + "\"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23 " + + "set $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", + + "SELECT s1, `from`, COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq " + + "\"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23 " + + "SET $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", } for _, queryStr := range dialects { @@ -55,119 +67,144 @@ func TestParseQueryDeep(t *testing.T) { if err != nil { t.Errorf("%s: %s", err.Error(), queryStr) } - t.Log(q) // 'select' clause if len(q.Select) != 3 { - t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", q.Select, queryStr, q) + t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", + q.Select, queryStr, q) } - if q.Select[0].Field != "s1" { - t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Field, queryStr, q) + t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", + q.Select[0].Field, queryStr, q) } if q.Select[0].Operation != Last { - t.Errorf("Expected 'last' as aggregation function of first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q) + t.Errorf("Expected 'last' as aggregation function of first element in "+ + "'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q) } - if q.Select[1].Field != "from" { - t.Errorf("Expected 'from' as second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Field, queryStr, q) + t.Errorf("Expected 'from' as second element in 'select' clause but got "+ + "'%v': %s\n%v", q.Select[1].Field, queryStr, q) } if q.Select[1].Operation != Last { - t.Errorf("Expected 'last' as aggregation function of second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q) + t.Errorf("Expected 'last' as aggregation function of second element in "+ + "'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q) } - if q.Select[2].Field != "s3" { - t.Errorf("Expected 's3' as third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Field, queryStr, q) + t.Errorf("Expected 's3' as third element in 'select' clause but got "+ + "'%v': %s\n%v", q.Select[2].Field, queryStr, q) } if q.Select[2].Operation != Count { - t.Errorf("Expected 'count' as aggregation function of third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q) + t.Errorf("Expected 'count' as aggregation function of third element in "+ + "'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q) } if q.Select[2].FieldStorage != "count(s3)" { - t.Errorf("Expected 'count(s3)' as third element's storage in 'select' clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q) + t.Errorf("Expected 'count(s3)' as third element's storage in 'select' "+ + "clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q) } // 'from' clause if q.Table != "TABLE" { - t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", q.Table, queryStr, q) + t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", + q.Table, queryStr, q) } // 'where' clause if len(q.Where) != 2 { - t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", q.Where, queryStr, q) + t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", + q.Where, queryStr, q) } if q.Where[0].lString != "w1" { - t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", q.Where[0].lString, queryStr, q) + t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", + q.Where[0].lString, queryStr, q) } if q.Where[0].Operation != FloatEq { - t.Errorf("Expected FloatEq operation in first 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q) + t.Errorf("Expected FloatEq operation in first 'where' condition but got "+ + "'%v': %s\n%v", q.Where[0].Operation, queryStr, q) } if q.Where[0].rFloat != 2 { - t.Errorf("Expected '2' as float argument in first 'where' condition but got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q) + t.Errorf("Expected '2' as float argument in first 'where' condition but "+ + "got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q) } if q.Where[1].lString != "w2" { - t.Errorf("Expected w2 as second element in 'where' clause but got '%v': %s\n%v", q.Where[1].lString, queryStr, q) + t.Errorf("Expected w2 as second element in 'where' clause but got '%v': "+ + "%s\n%v", q.Where[1].lString, queryStr, q) } if q.Where[1].Operation != StringEq { - t.Errorf("Expected StringEq operation in second 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q) + t.Errorf("Expected StringEq operation in second 'where' condition but got "+ + "'%v': %s\n%v", q.Where[0].Operation, queryStr, q) } if q.Where[1].rString != "free beer" { - t.Errorf("Expected 'free beer' as string argument in second 'where' condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q) + t.Errorf("Expected 'free beer' as string argument in second 'where' "+ + "condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q) } // 'group by' clause if len(q.GroupBy) != 2 { - t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", q.GroupBy, queryStr, q) + t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", + q.GroupBy, queryStr, q) } if q.GroupBy[0] != "g1" { - t.Errorf("Expected 'g1' as first element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[0], queryStr, q) + t.Errorf("Expected 'g1' as first element in 'group by' clause but got "+ + "'%v': %s\n%v", q.GroupBy[0], queryStr, q) } if q.GroupBy[1] != "g2" { - t.Errorf("Expected 'g2' as second element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[1], queryStr, q) + t.Errorf("Expected 'g2' as second element in 'group by' clause but got "+ + "'%v': %s\n%v", q.GroupBy[1], queryStr, q) } if q.GroupKey != "g1,g2" { - t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got '%v': %s\n%v", q.GroupKey, queryStr, q) + t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got "+ + "'%v': %s\n%v", q.GroupKey, queryStr, q) } // 'order by' clause if q.OrderBy != "count(s3)" { - t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got '%v': %s\n%v", q.OrderBy, queryStr, q) + t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got "+ + "'%v': %s\n%v", q.OrderBy, queryStr, q) } // 'interval' clause if q.Interval != time.Second*time.Duration(10) { - t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", q.Interval, queryStr, q) + t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", + q.Interval, queryStr, q) } // 'limit' clause if q.Limit != 23 { - t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", q.Limit, queryStr, q) + t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", + q.Limit, queryStr, q) } // 'set' clause if q.Set[0].lString != "$foo" { - t.Errorf("Expected '$foo' lvalue in first 'set' condition clause but got '%v': %s\n%v", q.Set[0].lString, queryStr, q) + t.Errorf("Expected '$foo' lvalue in first 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[0].lString, queryStr, q) } if q.Set[0].rString != "bar" { - t.Errorf("Expected 'bar' rvalue in first 'set' condition clause but got '%v': %s\n%v", q.Set[0].rString, queryStr, q) + t.Errorf("Expected 'bar' rvalue in first 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[0].rString, queryStr, q) } - if q.Set[1].lString != "$baz" { - t.Errorf("Expected '$baz' lvalue in second 'set' condition clause but got '%v': %s\n%v", q.Set[1].lString, queryStr, q) + t.Errorf("Expected '$baz' lvalue in second 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[1].lString, queryStr, q) } if q.Set[1].rString != "12" { - t.Errorf("Expected '12' rvalue in second 'set' condition clause but got '%v': %s\n%v", q.Set[1].rString, queryStr, q) + t.Errorf("Expected '12' rvalue in second 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[1].rString, queryStr, q) } - if q.Set[2].lString != "$bay" { - t.Errorf("Expected '$bay' lvalue in third 'set' condition clause but got '%v': %s\n%v", q.Set[2].lString, queryStr, q) + t.Errorf("Expected '$bay' lvalue in third 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[2].lString, queryStr, q) } if q.Set[2].rString != "$foo" { - t.Errorf("Expected '$foo' rvalue in third 'set' condition clause but got '%v': %s\n%v", q.Set[2].rString, queryStr, q) + t.Errorf("Expected '$foo' rvalue in third 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[2].rString, queryStr, q) } + // 'logformat' clause if q.LogFormat != "generic" { - t.Errorf("Expected 'generic' logformat got '%v': %s\n%v", q.LogFormat, queryStr, q) + t.Errorf("Expected 'generic' logformat got '%v': %s\n%v", + q.LogFormat, queryStr, q) } } } diff --git a/internal/mapr/selectcondition.go b/internal/mapr/selectcondition.go index d6aa0d4..5cfb8c7 100644 --- a/internal/mapr/selectcondition.go +++ b/internal/mapr/selectcondition.go @@ -37,7 +37,6 @@ func (sc selectCondition) String() string { func makeSelectConditions(tokens []token) ([]selectCondition, error) { var sel []selectCondition - // Parse select aggregation, e.g. sum(foo) parse := func(token token) (selectCondition, error) { var sc selectCondition @@ -52,13 +51,15 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { a := strings.Split(tokenStr, "(") if len(a) != 2 { - return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + token.str) + return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + + token.str) } agg := a[0] // Aggregation, e.g. 'sum' b := strings.Split(a[1], ")") if len(b) != 2 { - return sc, errors.New(invalidQuery + "Can't parse 'select' field name from aggregation: " + token.str) + return sc, errors.New(invalidQuery + "Can't parse 'select' field name " + + "from aggregation: " + token.str) } sc.Field = b[0] // Field name, e.g. 'foo' sc.FieldStorage = tokenStr // e.g. 'sum(foo)' @@ -79,9 +80,9 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { case "len": sc.Operation = Len default: - return sc, errors.New(invalidQuery + "Unknown aggregation in 'select' clause: " + agg) + return sc, errors.New(invalidQuery + + "Unknown aggregation in 'select' clause: " + agg) } - return sc, nil } @@ -92,6 +93,5 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { } sel = append(sel, sc) } - return sel, nil } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 1f5d1c3..97fee11 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -63,16 +63,14 @@ func NewAggregate(queryStr string) (*Aggregate, error) { } } - a := Aggregate{ + return &Aggregate{ done: internal.NewDone(), NextLinesCh: make(chan chan line.Line, 10), serialize: make(chan struct{}), hostname: s[0], query: query, parser: logParser, - } - - return &a, nil + }, nil } // Shutdown the aggregation engine. @@ -95,12 +93,10 @@ func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) { }() fieldsCh := a.fieldsFromLines(myCtx) - // Add fields (e.g. via 'set' clause) if len(a.query.Set) > 0 { fieldsCh = a.setAdditionalFields(myCtx, fieldsCh) } - // Periodically pre-aggregate data every a.query.Interval seconds. go a.aggregateTimer(myCtx) a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages) @@ -147,17 +143,18 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin maprLine := strings.TrimSpace(line.Content.String()) fields, err := a.parser.MakeFields(maprLine) - // Can not recycle here for some rason. + // Can't recycle it here yet, as field slices are still + // TODO: Add unit test reading from multiple mapreduce files lines. + // TODO: Add capability to recycle this bytes buffer. //pool.RecycleBytesBuffer(line.Content) if err != nil { // Should fields be ignored anyway? - if err != logformat.IgnoreFieldsErr { + if err != logformat.ErrIgnoreFields { dlog.Common.Error(fields, err) } continue } - if !a.query.WhereClause(fields) { continue } @@ -175,12 +172,12 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin return fieldsCh } -func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { - newFieldsCh := make(chan map[string]string) +func (a *Aggregate) setAdditionalFields(ctx context.Context, + fieldsCh <-chan map[string]string) <-chan map[string]string { + newFieldsCh := make(chan map[string]string) go func() { defer close(newFieldsCh) - for { fields, ok := <-fieldsCh if !ok { @@ -196,19 +193,18 @@ func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map } } }() - return newFieldsCh } -func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan map[string]string, maprMessages chan<- string) { - group := mapr.NewGroupSet() +func (a *Aggregate) aggregateAndSerialize(ctx context.Context, + fieldsCh <-chan map[string]string, maprMessages chan<- string) { + group := mapr.NewGroupSet() serialize := func() { dlog.Common.Info("Serializing mapreduce result") group.Serialize(ctx, maprMessages) group = mapr.NewGroupSet() } - for { select { case fields, ok := <-fieldsCh: @@ -227,7 +223,6 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan m func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { var sb strings.Builder - for i, field := range a.query.GroupBy { if i > 0 { sb.WriteString(protocol.AggregateGroupKeyCombinator) @@ -254,7 +249,6 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { set.Samples++ return } - dlog.Common.Trace("Aggregated data locally without adding new samples") } diff --git a/internal/mapr/setclause.go b/internal/mapr/setclause.go index b4c2f73..1843d31 100644 --- a/internal/mapr/setclause.go +++ b/internal/mapr/setclause.go @@ -7,7 +7,6 @@ func (q *Query) SetClause(fields map[string]string) error { if !ok { continue } - switch sc.rType { case FunctionStack: fields[sc.lString] = sc.functionStack.Call(value) @@ -15,6 +14,5 @@ func (q *Query) SetClause(fields map[string]string) error { fields[sc.lString] = value } } - return nil } diff --git a/internal/mapr/setcondition.go b/internal/mapr/setcondition.go index 8c5cfc9..92b21f4 100644 --- a/internal/mapr/setcondition.go +++ b/internal/mapr/setcondition.go @@ -39,20 +39,22 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) { switch setOp { case "=": default: - return sc, nil, errors.New(invalidQuery + "Unknown operation in 'set' clause: " + setOp) + return sc, nil, errors.New(invalidQuery + "Unknown operation in 'set' " + + "clause: " + setOp) } if !tokens[0].isBareword { - return sc, nil, errors.New(invalidQuery + "Expected bareword at 'set' clause's lValue: " + tokens[0].str) + return sc, nil, errors.New(invalidQuery + "Expected bareword at 'set' " + + "clause's lValue: " + tokens[0].str) } - sc.lString = tokens[0].str if !strings.HasPrefix(sc.lString, "$") { - return sc, nil, errors.New(invalidQuery + "Expected field variable name (starting with $) at 'set' clause's lValue: " + tokens[0].str) + return sc, nil, errors.New(invalidQuery + "Expected field variable name " + + "(starting with $) at 'set' clause's lValue: " + tokens[0].str) } sc.rType = Field - rString := tokens[2].str + // Seems like a function call? if strings.HasSuffix(rString, ")") { functionStack, functionArg, err := funcs.NewFunctionStack(tokens[2].str) @@ -72,7 +74,6 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) { } else { sc.rType = Field } - return sc, tokens[3:], nil } @@ -84,10 +85,8 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) { if err != nil { return nil, err } - set = append(set, sc) tokens = tokensConsumeOptional(tokens, ",") } - return } diff --git a/internal/mapr/token.go b/internal/mapr/token.go index 7c6578b..6ac7631 100644 --- a/internal/mapr/token.go +++ b/internal/mapr/token.go @@ -4,7 +4,8 @@ import ( "strings" ) -var keywords = [...]string{"select", "from", "where", "set", "group", "rorder", "order", "interval", "limit", "outfile", "logformat"} +var keywords = [...]string{"select", "from", "where", "set", "group", "rorder", + "order", "interval", "limit", "outfile", "logformat"} // Represents a parsed token, used to parse the mapr query. type token struct { @@ -16,13 +17,11 @@ func (t token) isKeyword() bool { if !t.isBareword { return false } - for _, keyword := range keywords { if strings.ToLower(t.str) == keyword { return true } } - return false } @@ -32,7 +31,6 @@ func (t token) String() string { func tokenize(queryStr string) []token { var tokens []token - for i, part := range strings.Split(queryStr, "\"") { // Even i, means that it is not a quoted string if i%2 == 0 { @@ -53,14 +51,12 @@ func tokenize(queryStr string) []token { } tokens = append(tokens, token) } - return tokens } func tokensConsume(tokens []token) ([]token, []token) { //dlog.Common.Trace("=====================") var consumed []token - for i, t := range tokens { if t.isKeyword() { //dlog.Common.Trace("keyword", t) @@ -84,7 +80,6 @@ func tokensConsume(tokens []token) ([]token, []token) { //dlog.Common.Trace("bare", token) consumed = append(consumed, t) } - //dlog.Common.Trace("result", consumed) return nil, consumed } @@ -95,7 +90,6 @@ func tokensConsumeStr(tokens []token) ([]token, []string) { for _, token := range found { strings = append(strings, token.str) } - return tokens, strings } @@ -106,6 +100,5 @@ func tokensConsumeOptional(tokens []token, optional string) []token { if strings.ToLower(tokens[0].str) == strings.ToLower(optional) { return tokens[1:] } - return tokens } diff --git a/internal/mapr/whereclause.go b/internal/mapr/whereclause.go index 6356d94..d9f32eb 100644 --- a/internal/mapr/whereclause.go +++ b/internal/mapr/whereclause.go @@ -10,7 +10,6 @@ import ( func (q *Query) WhereClause(fields map[string]string) bool { for _, wc := range q.Where { var ok bool - if wc.Operation > FloatOperation { var lValue, rValue float64 if lValue, ok = whereClauseFloatValue(fields, wc.lString, wc.lFloat, wc.lType); !ok { @@ -36,11 +35,12 @@ func (q *Query) WhereClause(fields map[string]string) bool { return false } } - return true } -func whereClauseFloatValue(fields map[string]string, str string, float float64, t fieldType) (float64, bool) { +func whereClauseFloatValue(fields map[string]string, str string, float float64, + t fieldType) (float64, bool) { + switch t { case Float: return float, true @@ -60,7 +60,9 @@ func whereClauseFloatValue(fields map[string]string, str string, float float64, } } -func whereClauseStringValue(fields map[string]string, str string, t fieldType) (string, bool) { +func whereClauseStringValue(fields map[string]string, str string, + t fieldType) (string, bool) { + switch t { case Field: value, ok := fields[str] diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go index c60c0a5..280dcfb 100644 --- a/internal/mapr/wherecondition.go +++ b/internal/mapr/wherecondition.go @@ -46,15 +46,18 @@ type whereCondition struct { } func (wc *whereCondition) String() string { - return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,lType:%s,rString:%s,rFloat:%v,rType:%s)", - wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, wc.rFloat, wc.rType.String()) + return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,"+ + "lType:%s,rString:%s,rFloat:%v,rType:%s)", + wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, + wc.rFloat, wc.rType.String()) } func makeWhereConditions(tokens []token) (where []whereCondition, err error) { parse := func(tokens []token) (whereCondition, []token, error) { var wc whereCondition if len(tokens) < 3 { - return wc, nil, errors.New(invalidQuery + "Not enough arguments in 'where' clause") + err := errors.New(invalidQuery + "Not enough arguments in 'where' clause") + return wc, nil, err } whereOp := strings.ToLower(tokens[1].str) @@ -94,7 +97,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { case "nhassuffix": wc.Operation = StringNotHasSuffix default: - return wc, nil, errors.New(invalidQuery + "Unknown operation in 'where' clause: " + whereOp) + return wc, nil, errors.New(invalidQuery + + "Unknown operation in 'where' clause: " + whereOp) } wc.lString = tokens[0].str @@ -102,7 +106,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { if wc.Operation > FloatOperation { if !tokens[0].isBareword { - return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's lValue: " + tokens[0].str) + return wc, nil, errors.New(invalidQuery + + "Expected bareword at 'where' clause's lValue: " + tokens[0].str) } if f, err := strconv.ParseFloat(wc.lString, 64); err == nil { wc.lFloat = f @@ -112,7 +117,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { } if !tokens[2].isBareword { - return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's rValue: " + tokens[2].str) + return wc, nil, errors.New(invalidQuery + + "Expected bareword at 'where' clause's rValue: " + tokens[2].str) } if f, err := strconv.ParseFloat(wc.rString, 64); err == nil { wc.rFloat = f @@ -133,23 +139,19 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { } else { wc.rType = String } - return wc, tokens[3:], nil } for len(tokens) > 0 { var wc whereCondition var err error - wc, tokens, err = parse(tokens) if err != nil { return nil, err } - where = append(where, wc) tokens = tokensConsumeOptional(tokens, "and") } - return } @@ -170,7 +172,6 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool { default: dlog.Common.Error("Unknown float operation", lValue, wc.Operation, rValue) } - return false } @@ -195,6 +196,5 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool { default: dlog.Common.Error("Unknown string operation", lValue, wc.Operation, rValue) } - return false } -- cgit v1.2.3