diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
| commit | f4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch) | |
| tree | ea5e4a2d2a67035f645bdee496ae55a52034178a /internal/mapr | |
| parent | d80d6070557e3a800e3a54967af9eced518f116b (diff) | |
| parent | 739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff) | |
merge develop
Diffstat (limited to 'internal/mapr')
| -rw-r--r-- | internal/mapr/aggregateset.go | 29 | ||||
| -rw-r--r-- | internal/mapr/client/aggregate.go | 29 | ||||
| -rw-r--r-- | internal/mapr/funcs/function.go | 11 | ||||
| -rw-r--r-- | internal/mapr/funcs/function_test.go | 21 | ||||
| -rw-r--r-- | internal/mapr/funcs/maskdigits.go | 2 | ||||
| -rw-r--r-- | internal/mapr/globalgroupset.go | 11 | ||||
| -rw-r--r-- | internal/mapr/groupset.go | 291 | ||||
| -rw-r--r-- | internal/mapr/logformat/default.go | 41 | ||||
| -rw-r--r-- | internal/mapr/logformat/default_test.go | 88 | ||||
| -rw-r--r-- | internal/mapr/logformat/generickv.go | 31 | ||||
| -rw-r--r-- | internal/mapr/logformat/parser.go | 15 | ||||
| -rw-r--r-- | internal/mapr/query.go | 21 | ||||
| -rw-r--r-- | internal/mapr/query_test.go | 125 | ||||
| -rw-r--r-- | internal/mapr/selectcondition.go | 12 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 155 | ||||
| -rw-r--r-- | internal/mapr/setclause.go | 2 | ||||
| -rw-r--r-- | internal/mapr/setcondition.go | 15 | ||||
| -rw-r--r-- | internal/mapr/token.go | 21 | ||||
| -rw-r--r-- | internal/mapr/whereclause.go | 16 | ||||
| -rw-r--r-- | internal/mapr/wherecondition.go | 30 |
20 files changed, 607 insertions, 359 deletions
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go index a6cc6eb..c50c7a1 100644 --- a/internal/mapr/aggregateset.go +++ b/internal/mapr/aggregateset.go @@ -5,6 +5,10 @@ import ( "fmt" "strconv" "strings" + + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/protocol" ) // AggregateSet represents aggregated key/value pairs from the @@ -33,8 +37,7 @@ func (s *AggregateSet) String() string { // Merge one aggregate set into this one. func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { s.Samples += set.Samples - //logger.Trace("Merge", set) - + //dlog.Common.Trace("Merge", set) for _, sc := range query.Select { storage := sc.FieldStorage switch sc.Operation { @@ -66,24 +69,27 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { // Serialize the aggregate set so it can be sent over the wire. func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) { - //logger.Trace("Serialising mapr.AggregateSet", s) - var sb strings.Builder + dlog.Common.Trace("Serialising mapr.AggregateSet", s) + sb := pool.BuilderBuffer.Get().(*strings.Builder) + defer pool.RecycleBuilderBuffer(sb) sb.WriteString(groupKey) - sb.WriteString("ā") - sb.WriteString(fmt.Sprintf("%dā", s.Samples)) + sb.WriteString(protocol.AggregateDelimiter) + sb.WriteString(fmt.Sprintf("%d", s.Samples)) + sb.WriteString(protocol.AggregateDelimiter) for k, v := range s.FValues { sb.WriteString(k) - sb.WriteString("=") - sb.WriteString(fmt.Sprintf("%vā", v)) + sb.WriteString(protocol.AggregateKVDelimiter) + sb.WriteString(fmt.Sprintf("%v", v)) + sb.WriteString(protocol.AggregateDelimiter) } for k, v := range s.SValues { sb.WriteString(k) - sb.WriteString("=") + sb.WriteString(protocol.AggregateKVDelimiter) sb.WriteString(v) - sb.WriteString("ā") + sb.WriteString(protocol.AggregateDelimiter) } select { @@ -108,7 +114,6 @@ func (s *AggregateSet) addFloatMin(key string, value float64) { s.FValues[key] = value return } - if f > value { s.FValues[key] = value } @@ -121,7 +126,6 @@ func (s *AggregateSet) addFloatMax(key string, value float64) { s.FValues[key] = value return } - if f < value { s.FValues[key] = value } @@ -140,7 +144,6 @@ func (s *AggregateSet) setFloat(key string, value float64) { // Aggregate data to the aggregate set. func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) { var f float64 - // First check if we can aggregate anything without converting value to float. switch agg { case Count: diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go index 10b34d4..02a6a5a 100644 --- a/internal/mapr/client/aggregate.go +++ b/internal/mapr/client/aggregate.go @@ -1,11 +1,13 @@ package client import ( + "fmt" "strconv" "strings" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" + "github.com/mimecast/dtail/internal/protocol" ) // Aggregate mapreduce data on the DTail client side. @@ -21,7 +23,9 @@ type Aggregate struct { } // NewAggregate create new client aggregator. -func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate { +func NewAggregate(server string, query *mapr.Query, + globalGroup *mapr.GlobalGroupSet) *Aggregate { + return &Aggregate{ query: query, group: mapr.NewGroupSet(), @@ -31,20 +35,26 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou } // Aggregate data from mapr log line into local (and global) group sets. -func (a *Aggregate) Aggregate(parts []string) { +func (a *Aggregate) Aggregate(message string) error { + parts := strings.Split(message, protocol.AggregateDelimiter) + if len(parts) < 4 { + return fmt.Errorf("aggregate message without any real data") + } + groupKey := parts[0] samples, err := strconv.Atoi(parts[1]) if err != nil { - logger.FatalExit("Unable to parse sample count", parts[1], err, parts) + return fmt.Errorf("unable to parse sample count '%s': %v", parts[1], err) } + fields := a.makeFields(parts[2:]) set := a.group.GetSet(groupKey) - var addedSamples bool + for _, sc := range a.query.Select { if val, ok := fields[sc.FieldStorage]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil { - logger.Error(err) + dlog.Common.Error(err) continue } addedSamples = true @@ -63,19 +73,18 @@ func (a *Aggregate) Aggregate(parts []string) { // Re-init local group (make it empty again). a.group.InitSet() } + return nil } // Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. func (a *Aggregate) makeFields(parts []string) map[string]string { fields := make(map[string]string, len(parts)) - for _, part := range parts { - kv := strings.SplitN(part, "=", 2) - if len(kv) < 2 { + kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2) + if len(kv) != 2 { continue } fields[kv[0]] = kv[1] } - return fields } diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go index 1a89c3a..418d86f 100644 --- a/internal/mapr/funcs/function.go +++ b/internal/mapr/funcs/function.go @@ -19,13 +19,12 @@ type Function struct { // FunctionStack is a list of functions stacked each other type FunctionStack []Function -// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns a corresponding function stack. +// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns +// a corresponding function stack. func NewFunctionStack(in string) (FunctionStack, string, error) { var fs FunctionStack - getCallback := func(name string) (CallbackFunc, error) { var cb CallbackFunc - switch name { case "md5sum": return Md5Sum, nil @@ -51,17 +50,15 @@ func NewFunctionStack(in string) (FunctionStack, string, error) { fs = append(fs, Function{name, call}) aux = aux[index+1 : len(aux)-1] } - return fs, aux, nil } // Call the function stack. func (fs FunctionStack) Call(str string) string { for i := len(fs) - 1; i >= 0; i-- { - //logger.Debug("Call", fs[i].Name, str) + //dlog.Common.Debug("Call", fs[i].Name, str) str = fs[i].call(str) - //logger.Debug("Call.result", fs[i].Name, str) + //dlog.Common.Debug("Call.result", fs[i].Name, str) } - return str } diff --git a/internal/mapr/funcs/function_test.go b/internal/mapr/funcs/function_test.go index 415683c..8b5d8b7 100644 --- a/internal/mapr/funcs/function_test.go +++ b/internal/mapr/funcs/function_test.go @@ -6,16 +6,19 @@ func TestFunction(t *testing.T) { input := "md5sum($line)" fs, arg, err := NewFunctionStack(input) if err != nil { - t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs) + t.Errorf("error parsing function input '%s': %s (%v)\n", + input, err.Error(), fs) } if arg != "$line" { - t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs) + t.Errorf("error parsing function input '%s': expected argument '$line' but "+ + "got '%s' (%v)\n", input, arg, fs) } t.Log(input, fs, arg) result := fs.Call(input) if result != "b38699013d79e50d9d122433753959c1" { - t.Errorf("error executing function stack '%s': expected result 'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs) + t.Errorf("error executing function stack '%s': expected result "+ + "'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs) } input = "maskdigits(md5sum(maskdigits($line)))" @@ -24,22 +27,26 @@ func TestFunction(t *testing.T) { t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs) } if arg != "$line" { - t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs) + t.Errorf("error parsing function input '%s': expected argument '$line' but "+ + "got '%s' (%v)\n", input, arg, fs) } t.Log(input, fs, arg) result = fs.Call(input) if result != ".fac.bbe..bb.........d...a.c..b." { - t.Errorf("error executing function stack '%s': expected result '.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs) + t.Errorf("error executing function stack '%s': expected result "+ + "'.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs) } input = "md5sum$line)" if fs, _, err := NewFunctionStack(input); err == nil { - t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs) + t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", + input, fs) } input = "md5sum(makedigits$line))" if fs, _, err := NewFunctionStack(input); err == nil { - t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs) + t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", + input, fs) } } diff --git a/internal/mapr/funcs/maskdigits.go b/internal/mapr/funcs/maskdigits.go index d51f3d8..925ec4d 100644 --- a/internal/mapr/funcs/maskdigits.go +++ b/internal/mapr/funcs/maskdigits.go @@ -3,12 +3,10 @@ package funcs // MaskDigits masks all digits (replaces them with .) func MaskDigits(input string) string { s := []byte(input) - for i, b := range s { if '0' <= b && b <= '9' { s[i] = '.' } } - return string(s) } diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go index cfab506..2d7f10b 100644 --- a/internal/mapr/globalgroupset.go +++ b/internal/mapr/globalgroupset.go @@ -17,7 +17,6 @@ func NewGlobalGroupSet() *GlobalGroupSet { semaphore: make(chan struct{}, 1), } g.InitSet() - return &g } @@ -30,7 +29,6 @@ func (g *GlobalGroupSet) String() string { func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return g.merge(query, group) } @@ -54,7 +52,6 @@ func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error { return err } } - return nil } @@ -67,7 +64,6 @@ func (g *GlobalGroupSet) IsEmpty() bool { func (g *GlobalGroupSet) NumSets() int { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return len(g.sets) } @@ -79,7 +75,6 @@ func (g *GlobalGroupSet) SwapOut() *GroupSet { set := &GroupSet{sets: g.sets} g.InitSet() - return set } @@ -87,14 +82,12 @@ func (g *GlobalGroupSet) SwapOut() *GroupSet { func (g *GlobalGroupSet) WriteResult(query *Query) error { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - return g.GroupSet.WriteResult(query) } // Result returns the result of the mapreduce aggregation as a string. -func (g *GlobalGroupSet) Result(query *Query) (string, int, error) { +func (g *GlobalGroupSet) Result(query *Query, rowsLimit int) (string, int, error) { g.semaphore <- struct{}{} defer func() { <-g.semaphore }() - - return g.GroupSet.Result(query) + return g.GroupSet.Result(query, rowsLimit) } diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index b5c8a48..6ffc8b9 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -4,13 +4,16 @@ import ( "context" "errors" "fmt" - "io/ioutil" "os" "sort" "strconv" "strings" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/color" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/io/pool" + "github.com/mimecast/dtail/internal/protocol" ) // GroupSet represents a map of aggregate sets. The group sets @@ -22,6 +25,14 @@ type GroupSet struct { sets map[string]*AggregateSet } +// Internal helper type +type result struct { + groupKey string + values []string + widths []int + orderBy float64 +} + // NewGroupSet returns a new empty group set. func NewGroupSet() *GroupSet { g := GroupSet{} @@ -57,28 +68,181 @@ func (g *GroupSet) Serialize(ctx context.Context, ch chan<- string) { } // Result returns a nicely formated result of the query from the group set. -func (g *GroupSet) Result(query *Query) (string, int, error) { - return g.limitedResult(query, query.Limit, "\t", " ", false) +func (g *GroupSet) Result(query *Query, rowsLimit int) (string, int, error) { + rows, widths, err := g.result(query, true) + if err != nil { + return "", 0, err + } + if query.Limit != -1 { + rowsLimit = query.Limit + } + + sb := pool.BuilderBuffer.Get().(*strings.Builder) + defer pool.RecycleBuilderBuffer(sb) + + // Generate header now + lastIndex := len(query.Select) - 1 + for i, sc := range query.Select { + format := fmt.Sprintf(" %%%ds ", widths[i]) + str := fmt.Sprintf(format, sc.FieldStorage) + if config.Client.TermColorsEnable { + attrs := []color.Attribute{config.Client.TermColors.MaprTable.HeaderAttr} + if sc.FieldStorage == query.OrderBy { + attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderSortKeyAttr) + } + + for _, groupBy := range query.GroupBy { + if sc.FieldStorage == groupBy { + attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderGroupKeyAttr) + break + } + } + + color.PaintWithAttrs(sb, str, + config.Client.TermColors.MaprTable.HeaderFg, + config.Client.TermColors.MaprTable.HeaderBg, + attrs) + } else { + sb.WriteString(str) + } + + if i == lastIndex { + continue + } + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, protocol.FieldDelimiter, + config.Client.TermColors.MaprTable.HeaderDelimiterFg, + config.Client.TermColors.MaprTable.HeaderDelimiterBg, + config.Client.TermColors.MaprTable.HeaderDelimiterAttr) + } else { + sb.WriteString(protocol.FieldDelimiter) + } + } + sb.WriteString("\n") + + for i := 0; i < len(query.Select); i++ { + str := fmt.Sprintf("-%s-", strings.Repeat("-", widths[i])) + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, str, + config.Client.TermColors.MaprTable.HeaderDelimiterFg, + config.Client.TermColors.MaprTable.HeaderDelimiterBg, + config.Client.TermColors.MaprTable.HeaderDelimiterAttr) + } else { + sb.WriteString(str) + } + if i == lastIndex { + continue + } + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, protocol.FieldDelimiter, + config.Client.TermColors.MaprTable.HeaderDelimiterFg, + config.Client.TermColors.MaprTable.HeaderDelimiterBg, + config.Client.TermColors.MaprTable.HeaderDelimiterAttr) + } else { + sb.WriteString(protocol.FieldDelimiter) + } + } + sb.WriteString("\n") + + // And now write the data + for i, r := range rows { + if i == rowsLimit { + break + } + for j, value := range r.values { + format := fmt.Sprintf(" %%%ds ", widths[j]) + str := fmt.Sprintf(format, value) + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, str, + config.Client.TermColors.MaprTable.DataFg, + config.Client.TermColors.MaprTable.DataBg, + config.Client.TermColors.MaprTable.DataAttr) + } else { + sb.WriteString(str) + } + + if j == lastIndex { + continue + } + if config.Client.TermColorsEnable { + color.PaintWithAttr(sb, protocol.FieldDelimiter, + config.Client.TermColors.MaprTable.DelimiterFg, + config.Client.TermColors.MaprTable.DelimiterBg, + config.Client.TermColors.MaprTable.DelimiterAttr) + } else { + sb.WriteString(protocol.FieldDelimiter) + } + } + sb.WriteString("\n") + } + + return sb.String(), len(rows), nil } -// WriteResult writes the result to an outfile. +func (*GroupSet) writeQueryFile(query *Query) error { + queryFile := fmt.Sprintf("%s.query", query.Outfile) + tmpQueryFile := fmt.Sprintf("%s.tmp", queryFile) + dlog.Common.Debug("Writing query file", queryFile) + + fd, err := os.Create(tmpQueryFile) + if err != nil { + return err + } + defer fd.Close() + + fd.WriteString(query.RawQuery) + os.Rename(tmpQueryFile, queryFile) + return nil +} + +// WriteResult writes the result to an CSV outfile. func (g *GroupSet) WriteResult(query *Query) error { if !query.HasOutfile() { return errors.New("No outfile specified") } + if err := g.writeQueryFile(query); err != nil { + return err + } - // -1: Don't limit the result, include all data sets - result, _, err := g.limitedResult(query, query.Limit, "", ",", true) + rows, _, err := g.result(query, false) if err != nil { return err } - logger.Info("Writing outfile", query.Outfile) + dlog.Common.Info("Writing outfile", query.Outfile) tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile) - if err := ioutil.WriteFile(tmpOutfile, []byte(result), 0644); err != nil { + fd, err := os.Create(tmpOutfile) + if err != nil { return err } + defer fd.Close() + + // Generate header now + lastIndex := len(query.Select) - 1 + for i, sc := range query.Select { + fd.WriteString(sc.FieldStorage) + if i == lastIndex { + continue + } + fd.WriteString(protocol.CSVDelimiter) + } + fd.WriteString("\n") + + // And now write the data + for i, r := range rows { + if i == query.Limit { + break + } + for j, value := range r.values { + fd.WriteString(value) + if j == lastIndex { + continue + } + fd.WriteString(protocol.CSVDelimiter) + } + fd.WriteString("\n") + } if err := os.Rename(tmpOutfile, query.Outfile); err != nil { os.Remove(tmpOutfile) @@ -88,32 +252,21 @@ func (g *GroupSet) WriteResult(query *Query) error { return nil } -// Return a nicely formated result of the query from the group set. -func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) { - type result struct { - groupKey string - resultStr string - orderBy float64 - } - - var resultSlice []result +// Return a sorted result slice of the query from the group set. +func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, error) { + var rows []result + widths := make([]int, len(query.Select)) + var valueStr string + var value float64 for groupKey, set := range g.sets { - var sb strings.Builder r := result{groupKey: groupKey} - lastIndex := len(query.Select) - 1 for i, sc := range query.Select { - storage := sc.FieldStorage - orderByThis := storage == query.OrderBy - switch sc.Operation { case Count: - value := set.FValues[storage] - sb.WriteString(fmt.Sprintf("%d", int(value))) - if orderByThis { - r.orderBy = value - } + value = set.FValues[sc.FieldStorage] + valueStr = fmt.Sprintf("%d", int(value)) case Len: fallthrough case Sum: @@ -121,74 +274,48 @@ func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSepa case Min: fallthrough case Max: - value := set.FValues[storage] - sb.WriteString(fmt.Sprintf("%f", value)) - if orderByThis { - r.orderBy = value - } + value = set.FValues[sc.FieldStorage] + valueStr = fmt.Sprintf("%f", value) case Last: - value := set.SValues[storage] - if orderByThis { - f, err := strconv.ParseFloat(value, 64) - if err == nil { - r.orderBy = f - } - } - sb.WriteString(value) + valueStr = set.SValues[sc.FieldStorage] + value, _ = strconv.ParseFloat(valueStr, 64) case Avg: - value := set.FValues[storage] / float64(set.Samples) - sb.WriteString(fmt.Sprintf("%f", value)) - if orderByThis { - r.orderBy = value - } + value = set.FValues[sc.FieldStorage] / float64(set.Samples) + valueStr = fmt.Sprintf("%f", value) default: - return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) + return rows, widths, fmt.Errorf("Unknown aggregation method '%v'", + sc.Operation) } - if i != lastIndex { - sb.WriteString(fieldSeparator) + + if sc.FieldStorage == query.OrderBy { + r.orderBy = value } - } + r.values = append(r.values, valueStr) - r.resultStr = sb.String() - resultSlice = append(resultSlice, r) + if !gatherWidths { + continue + } + if widths[i] < len(sc.FieldStorage) { + widths[i] = len(sc.FieldStorage) + } + if widths[i] < len(valueStr) { + widths[i] = len(valueStr) + } + } + rows = append(rows, r) } if query.OrderBy != "" { if query.ReverseOrder { - sort.SliceStable(resultSlice, func(i, j int) bool { - return resultSlice[i].orderBy < resultSlice[j].orderBy + sort.SliceStable(rows, func(i, j int) bool { + return rows[i].orderBy < rows[j].orderBy }) } else { - sort.SliceStable(resultSlice, func(i, j int) bool { - return resultSlice[i].orderBy > resultSlice[j].orderBy + sort.SliceStable(rows, func(i, j int) bool { + return rows[i].orderBy > rows[j].orderBy }) } } - var sb strings.Builder - - // Write header first - if addHeader { - lastIndex := len(query.Select) - 1 - sb.WriteString(lineStarter) - for i, sc := range query.Select { - sb.WriteString(sc.FieldStorage) - if i != lastIndex { - sb.WriteString(fieldSeparator) - } - } - sb.WriteString("\n") - } - - // And now write the data - for i, r := range resultSlice { - if i == limit { - break - } - sb.WriteString(lineStarter) - sb.WriteString(r.resultStr) - sb.WriteString("\n") - } - - return sb.String(), len(resultSlice), nil + return rows, widths, nil } diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go index 44bf558..9b6c855 100644 --- a/internal/mapr/logformat/default.go +++ b/internal/mapr/logformat/default.go @@ -1,14 +1,23 @@ package logformat import ( - "errors" + "fmt" "strings" + + "github.com/mimecast/dtail/internal/protocol" ) -// MakeFieldsDEFAULT is the default log file mapreduce parser. +// MakeFieldsDEFAULT is the default DTail log file key-value parser. func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { - fields := make(map[string]string, 20) - splitted := strings.Split(maprLine, "|") + splitted := strings.Split(maprLine, protocol.FieldDelimiter) + + if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") || + !strings.HasPrefix(splitted[0], "INFO") { + // Not a DTail mapreduce log line. + return nil, ErrIgnoreFields + } + + fields := make(map[string]string, len(splitted)+8) fields["*"] = "*" fields["$line"] = maprLine @@ -17,10 +26,30 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { fields["$timezone"] = p.timeZoneName fields["$timeoffset"] = p.timeZoneOffset - for _, kv := range splitted { + fields["$severity"] = splitted[0] + fields["$loglevel"] = splitted[0] + + time := splitted[1] + fields["$time"] = time + if len(time) == 15 { + // Example: 20211002-071209 + fields["$date"] = time[0:8] + fields["$hour"] = time[9:11] + fields["$minute"] = time[11:13] + fields["$second"] = time[13:] + } + fields["$pid"] = splitted[2] + fields["$caller"] = splitted[3] + fields["$cpus"] = splitted[4] + fields["$goroutines"] = splitted[5] + fields["$cgocalls"] = splitted[6] + fields["$loadavg"] = splitted[7] + fields["$uptime"] = splitted[8] + + for _, kv := range splitted[10:] { keyAndValue := strings.SplitN(kv, "=", 2) if len(keyAndValue) != 2 { - return fields, errors.New("Error parsing mapr token: " + kv) + return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv) } fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] } diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go index 10ec8b7..28e1acc 100644 --- a/internal/mapr/logformat/default_test.go +++ b/internal/mapr/logformat/default_test.go @@ -1,6 +1,7 @@ package logformat import ( + "fmt" "testing" ) @@ -10,26 +11,83 @@ func TestDefaultLogFormat(t *testing.T) { t.Errorf("Unable to create parser: %s", err.Error()) } - fields, err := parser.MakeFields("foo=bar|baz=bay") + date := "20211002" + hour := "07" + minute := "23" + second := "42" + time := fmt.Sprintf("%s-%s%s%s", date, hour, minute, second) - if err != nil { - t.Errorf("Unable to parse: %s", err.Error()) + inputs := []string{ + fmt.Sprintf("INFO|%s|1|default_test.go:0|8|14|7|0.21|471h0m21s|MAPREDUCE:STATS|foo=bar|bar=foo", time), + fmt.Sprintf("INFO|%s|1|default_test.go:0|8|14|7|0.21|471h0m21s|MAPREDUCE:STATS|bar=foo|foo=bar", time), } - if bar, ok := fields["foo"]; !ok { - t.Errorf("Expected field 'foo', but no such field there\n") - } else if bar != "bar" { - t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar) - } + for _, input := range inputs { + fields, err := parser.MakeFields(input) + + if err != nil { + t.Errorf("Parser unable to make fields: %s", err.Error()) + } + + if val, ok := fields["$severity"]; !ok { + t.Errorf("Expected field '$severity', but no such field there in '%s'\n", input) + } else if val != "INFO" { + t.Errorf("Expected 'Info' stored in field '$severity', but got '%s' in '%s'\n", + val, input) + } + + if val, ok := fields["$time"]; !ok { + t.Errorf("Expected field '$time', but no such field there in '%s'\n", input) + } else if val != time { + t.Errorf("Expected '%s' stored in field '$time', but got '%s' in '%s'\n", + time, val, input) + } + + if val, ok := fields["$date"]; !ok { + t.Errorf("Expected field '$date', but no such field there in '%s'\n", input) + } else if val != date { + t.Errorf("Expected '%s' stored in field '$date', but got '%s' in '%s'\n", + date, val, input) + } + + if val, ok := fields["$hour"]; !ok { + t.Errorf("Expected field '$hour', but no such field there in '%s'\n", input) + } else if val != hour { + t.Errorf("Expected '%s' stored in field '$hour', but got '%s' in '%s'\n", + hour, val, input) + } + + if val, ok := fields["$minute"]; !ok { + t.Errorf("Expected field '$minute', but no such field there in '%s'\n", input) + } else if val != minute { + t.Errorf("Expected '%s' stored in field '$minute', but got '%s' in '%s'\n", + minute, val, input) + } + + if val, ok := fields["$second"]; !ok { + t.Errorf("Expected field '$second', but no such field there in '%s'\n", input) + } else if val != second { + t.Errorf("Expected '%s' stored in field '$second', but got '%s' in '%s'\n", + second, val, input) + } + + if val, ok := fields["foo"]; !ok { + t.Errorf("Expected field 'foo', but no such field there in '%s'\n", input) + } else if val != "bar" { + t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n", + val, input) + } - if bay, ok := fields["baz"]; !ok { - t.Errorf("Expected field 'baz', but no such field there\n") - } else if bay != "bay" { - t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay) + if val, ok := fields["bar"]; !ok { + t.Errorf("Expected field 'bar', but no such field there in '%s'\n", input) + } else if val != "foo" { + t.Errorf("Expected 'foo' stored in field 'bar', but got '%s' in '%s'\n", + val, input) + } } - _, err = parser.MakeFields("foo=bar|bazbay") - if err == nil { - t.Errorf("Expected error but didn't: %s", err.Error()) + fields, err := parser.MakeFields("foozoo=bar|bazbay") + if _, ok := fields["foo"]; ok { + t.Errorf("Expected fiending field 'foo', but found it\n") } } diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go new file mode 100644 index 0000000..433eb5f --- /dev/null +++ b/internal/mapr/logformat/generickv.go @@ -0,0 +1,31 @@ +package logformat + +import ( + "strings" + + "github.com/mimecast/dtail/internal/protocol" +) + +// MakeFieldsGENERIGKV is the generic key-value logfile parser. +func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error) { + splitted := strings.Split(maprLine, protocol.FieldDelimiter) + fields := make(map[string]string, len(splitted)) + + fields["*"] = "*" + fields["$line"] = maprLine + fields["$empty"] = "" + fields["$hostname"] = p.hostname + fields["$timezone"] = p.timeZoneName + fields["$timeoffset"] = p.timeZoneOffset + + for _, kv := range splitted[0:] { + keyAndValue := strings.SplitN(kv, "=", 2) + if len(keyAndValue) != 2 { + //dlog.Common.Debug("Unable to parse key-value token, ignoring it", kv) + continue + } + fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] + } + + return fields, nil +} diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go index c53729a..129081d 100644 --- a/internal/mapr/logformat/parser.go +++ b/internal/mapr/logformat/parser.go @@ -8,10 +8,12 @@ import ( "strings" "time" - "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" ) +// ErrIgnoreFields indicates that the fields should be ignored. +var ErrIgnoreFields error = errors.New("Ignore this field set") + // Parser is used to parse the mapreduce information from the server log files. type Parser struct { hostname string @@ -25,11 +27,9 @@ type Parser struct { // NewParser returns a new log parser. func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) { hostname, err := os.Hostname() - if err != nil { return nil, err } - now := time.Now() zone, offset := now.Zone() @@ -43,7 +43,6 @@ func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) { if err != nil { return nil, err } - return &p, nil } @@ -52,7 +51,6 @@ func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) { // Parser. Whereas MODULENAME must be a upeprcase string. func (p *Parser) reflectLogFormat(logFormatName string) error { methodName := fmt.Sprintf("MakeFields%s", strings.ToUpper(logFormatName)) - rt := reflect.TypeOf(p) method, ok := rt.MethodByName(methodName) if !ok { @@ -61,7 +59,6 @@ func (p *Parser) reflectLogFormat(logFormatName string) error { p.makeFieldsFunc = method.Func p.makeFieldsReceiver = reflect.ValueOf(p) - return nil } @@ -69,17 +66,11 @@ func (p *Parser) reflectLogFormat(logFormatName string) error { func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err error) { inputValues := []reflect.Value{p.makeFieldsReceiver, reflect.ValueOf(maprLine)} returnValues := p.makeFieldsFunc.Call(inputValues) - errInterface := returnValues[1].Interface() - if errInterface == nil { fields, err = returnValues[0].Interface().(map[string]string), nil - logger.Trace("parser.MakeFields", fields, err) return } - fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error) - logger.Trace("parser.MakeFields", fields, err) - return } diff --git a/internal/mapr/query.go b/internal/mapr/query.go index 01852da..d7c32bd 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -6,8 +6,6 @@ import ( "strconv" "strings" "time" - - "github.com/mimecast/dtail/internal/io/logger" ) const ( @@ -34,7 +32,9 @@ type Query struct { } func (q Query) String() string { - return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,Set:%vGroupBy:%v,GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,RawQuery:%s,tokens:%v,LogFormat:%s)", + return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,Set:%vGroupBy:%v,"+ + "GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,"+ + "RawQuery:%s,tokens:%v,LogFormat:%s)", q.Select, q.Table, q.Where, @@ -56,20 +56,14 @@ func NewQuery(queryStr string) (*Query, error) { if queryStr == "" { return nil, nil } - tokens := tokenize(queryStr) - q := Query{ RawQuery: queryStr, tokens: tokens, Interval: time.Second * 5, Limit: -1, } - - err := q.parse(tokens) - - logger.Debug(q) - return &q, err + return &q, q.parse(tokens) } // HasOutfile returns true if query result will be written to a CVS output file. @@ -178,13 +172,13 @@ func (q *Query) parse(tokens []token) error { } if len(q.Select) < 1 { - return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none") + return errors.New(invalidQuery + "Expected at least one field in 'select' " + + "clause but got none") } if len(q.GroupBy) == 0 { field := q.Select[0].Field q.GroupBy = append(q.GroupBy, field) } - if q.OrderBy != "" { var orderFieldIsValid bool for _, sc := range q.Select { @@ -194,7 +188,8 @@ func (q *Query) parse(tokens []token) error { } } if !orderFieldIsValid { - return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s', must be present in 'select' clause", q.OrderBy)) + return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s',"+ + "must be present in 'select' clause", q.OrderBy)) } } diff --git a/internal/mapr/query_test.go b/internal/mapr/query_test.go index b0b6c3a..88f7387 100644 --- a/internal/mapr/query_test.go +++ b/internal/mapr/query_test.go @@ -13,18 +13,25 @@ func TestParseQuerySimple(t *testing.T) { "select foo from bar where baz <", "select foo from bar where baz < 100 bay eq 12 group", "select foo from bar where baz < 100 bay eq 12 group by foo order by", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit set foo = bar;", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit set foo = bar;", } okQueries := []string{"select foo from bar", "select foo from bar where", "select foo from bar where baz < 100 bay eq 12", "select foo from bar where baz < 100, bay eq 12", "select foo from bar where baz < 100 and bay eq 12", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\"", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\" set $foo = maskdigits(bar), $baz = 12, $bay = $foo;", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit 23", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit 23 outfile \"result.csv\"", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " + + "order by foo limit 23 outfile \"result.csv\" " + + "set $foo = maskdigits(bar), $baz = 12, $bay = $foo;", } for _, queryStr := range errorQueries { @@ -46,8 +53,13 @@ func TestParseQuerySimple(t *testing.T) { func TestParseQueryDeep(t *testing.T) { dialects := []string{ - "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23 set $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", - "SELECT s1, `from`, COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23 SET $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", + "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq " + + "\"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23 " + + "set $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", + + "SELECT s1, `from`, COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq " + + "\"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23 " + + "SET $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic", } for _, queryStr := range dialects { @@ -55,119 +67,144 @@ func TestParseQueryDeep(t *testing.T) { if err != nil { t.Errorf("%s: %s", err.Error(), queryStr) } - t.Log(q) // 'select' clause if len(q.Select) != 3 { - t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", q.Select, queryStr, q) + t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", + q.Select, queryStr, q) } - if q.Select[0].Field != "s1" { - t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Field, queryStr, q) + t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", + q.Select[0].Field, queryStr, q) } if q.Select[0].Operation != Last { - t.Errorf("Expected 'last' as aggregation function of first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q) + t.Errorf("Expected 'last' as aggregation function of first element in "+ + "'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q) } - if q.Select[1].Field != "from" { - t.Errorf("Expected 'from' as second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Field, queryStr, q) + t.Errorf("Expected 'from' as second element in 'select' clause but got "+ + "'%v': %s\n%v", q.Select[1].Field, queryStr, q) } if q.Select[1].Operation != Last { - t.Errorf("Expected 'last' as aggregation function of second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q) + t.Errorf("Expected 'last' as aggregation function of second element in "+ + "'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q) } - if q.Select[2].Field != "s3" { - t.Errorf("Expected 's3' as third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Field, queryStr, q) + t.Errorf("Expected 's3' as third element in 'select' clause but got "+ + "'%v': %s\n%v", q.Select[2].Field, queryStr, q) } if q.Select[2].Operation != Count { - t.Errorf("Expected 'count' as aggregation function of third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q) + t.Errorf("Expected 'count' as aggregation function of third element in "+ + "'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q) } if q.Select[2].FieldStorage != "count(s3)" { - t.Errorf("Expected 'count(s3)' as third element's storage in 'select' clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q) + t.Errorf("Expected 'count(s3)' as third element's storage in 'select' "+ + "clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q) } // 'from' clause if q.Table != "TABLE" { - t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", q.Table, queryStr, q) + t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", + q.Table, queryStr, q) } // 'where' clause if len(q.Where) != 2 { - t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", q.Where, queryStr, q) + t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", + q.Where, queryStr, q) } if q.Where[0].lString != "w1" { - t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", q.Where[0].lString, queryStr, q) + t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", + q.Where[0].lString, queryStr, q) } if q.Where[0].Operation != FloatEq { - t.Errorf("Expected FloatEq operation in first 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q) + t.Errorf("Expected FloatEq operation in first 'where' condition but got "+ + "'%v': %s\n%v", q.Where[0].Operation, queryStr, q) } if q.Where[0].rFloat != 2 { - t.Errorf("Expected '2' as float argument in first 'where' condition but got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q) + t.Errorf("Expected '2' as float argument in first 'where' condition but "+ + "got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q) } if q.Where[1].lString != "w2" { - t.Errorf("Expected w2 as second element in 'where' clause but got '%v': %s\n%v", q.Where[1].lString, queryStr, q) + t.Errorf("Expected w2 as second element in 'where' clause but got '%v': "+ + "%s\n%v", q.Where[1].lString, queryStr, q) } if q.Where[1].Operation != StringEq { - t.Errorf("Expected StringEq operation in second 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q) + t.Errorf("Expected StringEq operation in second 'where' condition but got "+ + "'%v': %s\n%v", q.Where[0].Operation, queryStr, q) } if q.Where[1].rString != "free beer" { - t.Errorf("Expected 'free beer' as string argument in second 'where' condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q) + t.Errorf("Expected 'free beer' as string argument in second 'where' "+ + "condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q) } // 'group by' clause if len(q.GroupBy) != 2 { - t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", q.GroupBy, queryStr, q) + t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", + q.GroupBy, queryStr, q) } if q.GroupBy[0] != "g1" { - t.Errorf("Expected 'g1' as first element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[0], queryStr, q) + t.Errorf("Expected 'g1' as first element in 'group by' clause but got "+ + "'%v': %s\n%v", q.GroupBy[0], queryStr, q) } if q.GroupBy[1] != "g2" { - t.Errorf("Expected 'g2' as second element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[1], queryStr, q) + t.Errorf("Expected 'g2' as second element in 'group by' clause but got "+ + "'%v': %s\n%v", q.GroupBy[1], queryStr, q) } if q.GroupKey != "g1,g2" { - t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got '%v': %s\n%v", q.GroupKey, queryStr, q) + t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got "+ + "'%v': %s\n%v", q.GroupKey, queryStr, q) } // 'order by' clause if q.OrderBy != "count(s3)" { - t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got '%v': %s\n%v", q.OrderBy, queryStr, q) + t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got "+ + "'%v': %s\n%v", q.OrderBy, queryStr, q) } // 'interval' clause if q.Interval != time.Second*time.Duration(10) { - t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", q.Interval, queryStr, q) + t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", + q.Interval, queryStr, q) } // 'limit' clause if q.Limit != 23 { - t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", q.Limit, queryStr, q) + t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", + q.Limit, queryStr, q) } // 'set' clause if q.Set[0].lString != "$foo" { - t.Errorf("Expected '$foo' lvalue in first 'set' condition clause but got '%v': %s\n%v", q.Set[0].lString, queryStr, q) + t.Errorf("Expected '$foo' lvalue in first 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[0].lString, queryStr, q) } if q.Set[0].rString != "bar" { - t.Errorf("Expected 'bar' rvalue in first 'set' condition clause but got '%v': %s\n%v", q.Set[0].rString, queryStr, q) + t.Errorf("Expected 'bar' rvalue in first 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[0].rString, queryStr, q) } - if q.Set[1].lString != "$baz" { - t.Errorf("Expected '$baz' lvalue in second 'set' condition clause but got '%v': %s\n%v", q.Set[1].lString, queryStr, q) + t.Errorf("Expected '$baz' lvalue in second 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[1].lString, queryStr, q) } if q.Set[1].rString != "12" { - t.Errorf("Expected '12' rvalue in second 'set' condition clause but got '%v': %s\n%v", q.Set[1].rString, queryStr, q) + t.Errorf("Expected '12' rvalue in second 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[1].rString, queryStr, q) } - if q.Set[2].lString != "$bay" { - t.Errorf("Expected '$bay' lvalue in third 'set' condition clause but got '%v': %s\n%v", q.Set[2].lString, queryStr, q) + t.Errorf("Expected '$bay' lvalue in third 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[2].lString, queryStr, q) } if q.Set[2].rString != "$foo" { - t.Errorf("Expected '$foo' rvalue in third 'set' condition clause but got '%v': %s\n%v", q.Set[2].rString, queryStr, q) + t.Errorf("Expected '$foo' rvalue in third 'set' condition clause but got "+ + "'%v': %s\n%v", q.Set[2].rString, queryStr, q) } + // 'logformat' clause if q.LogFormat != "generic" { - t.Errorf("Expected 'generic' logformat got '%v': %s\n%v", q.LogFormat, queryStr, q) + t.Errorf("Expected 'generic' logformat got '%v': %s\n%v", + q.LogFormat, queryStr, q) } } } diff --git a/internal/mapr/selectcondition.go b/internal/mapr/selectcondition.go index d6aa0d4..5cfb8c7 100644 --- a/internal/mapr/selectcondition.go +++ b/internal/mapr/selectcondition.go @@ -37,7 +37,6 @@ func (sc selectCondition) String() string { func makeSelectConditions(tokens []token) ([]selectCondition, error) { var sel []selectCondition - // Parse select aggregation, e.g. sum(foo) parse := func(token token) (selectCondition, error) { var sc selectCondition @@ -52,13 +51,15 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { a := strings.Split(tokenStr, "(") if len(a) != 2 { - return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + token.str) + return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + + token.str) } agg := a[0] // Aggregation, e.g. 'sum' b := strings.Split(a[1], ")") if len(b) != 2 { - return sc, errors.New(invalidQuery + "Can't parse 'select' field name from aggregation: " + token.str) + return sc, errors.New(invalidQuery + "Can't parse 'select' field name " + + "from aggregation: " + token.str) } sc.Field = b[0] // Field name, e.g. 'foo' sc.FieldStorage = tokenStr // e.g. 'sum(foo)' @@ -79,9 +80,9 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { case "len": sc.Operation = Len default: - return sc, errors.New(invalidQuery + "Unknown aggregation in 'select' clause: " + agg) + return sc, errors.New(invalidQuery + + "Unknown aggregation in 'select' clause: " + agg) } - return sc, nil } @@ -92,6 +93,5 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) { } sel = append(sel, sc) } - return sel, nil } diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 28bb074..97fee11 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -8,25 +8,22 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" + "github.com/mimecast/dtail/internal/protocol" ) // Aggregate is for aggregating mapreduce data on the DTail server side. type Aggregate struct { done *internal.Done - // Log lines to process (parsing MAPREDUCE lines). - Lines chan line.Line + // NextLinesCh can be used to use a new line ch. + NextLinesCh chan chan line.Line // Hostname of the current server (used to populate $hostname field). hostname string // Signals to serialize data. serialize chan struct{} - // Signals to flush data. - flush chan struct{} - // Signals that data has been flushed - flushed chan struct{} // The mapr query query *mapr.Query // The mapr log format parser @@ -42,7 +39,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { fqdn, err := os.Hostname() if err != nil { - logger.Error(err) + dlog.Common.Error(err) } s := strings.Split(fqdn, ".") @@ -57,38 +54,32 @@ func NewAggregate(queryStr string) (*Aggregate, error) { parserName = query.LogFormat } - logger.Info("Creating log format parser", parserName) + dlog.Common.Info("Creating log format parser", parserName) logParser, err := logformat.NewParser(parserName, query) if err != nil { - logger.Error("Could not create log format parser. Falling back to 'generic'", err) + dlog.Common.Error("Could not create log format parser. Falling back to 'generic'", err) if logParser, err = logformat.NewParser("generic", query); err != nil { - logger.FatalExit("Could not create log format parser", err) + dlog.Common.FatalPanic("Could not create log format parser", err) } } - a := Aggregate{ - done: internal.NewDone(), - Lines: make(chan line.Line, 100), - serialize: make(chan struct{}), - flush: make(chan struct{}), - flushed: make(chan struct{}), - hostname: s[0], - query: query, - parser: logParser, - } - - return &a, nil + return &Aggregate{ + done: internal.NewDone(), + NextLinesCh: make(chan chan line.Line, 10), + serialize: make(chan struct{}), + hostname: s[0], + query: query, + parser: logParser, + }, nil } // Shutdown the aggregation engine. func (a *Aggregate) Shutdown() { - a.Flush() a.done.Shutdown() } // Start an aggregation. -func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { - +func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) { myCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -101,15 +92,14 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { } }() - fieldsCh := a.makeFields(myCtx) - + fieldsCh := a.fieldsFromLines(myCtx) // Add fields (e.g. via 'set' clause) if len(a.query.Set) > 0 { - fieldsCh = a.addFields(myCtx, fieldsCh) + fieldsCh = a.setAdditionalFields(myCtx, fieldsCh) } - + // Periodically pre-aggregate data every a.query.Interval seconds. go a.aggregateTimer(myCtx) - a.makeMaprLines(myCtx, fieldsCh, maprLines) + a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages) } func (a *Aggregate) aggregateTimer(ctx context.Context) { @@ -123,25 +113,46 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { } } -func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { - ch := make(chan map[string]string) +func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string { + fieldsCh := make(chan map[string]string) go func() { - defer close(ch) + defer close(fieldsCh) + var lines chan line.Line + + // Gather first lines channel (first input file) + select { + case lines = <-a.NextLinesCh: + case <-ctx.Done(): + return + } for { select { - case line, ok := <-a.Lines: + case line, ok := <-lines: if !ok { - return + select { + case lines = <-a.NextLinesCh: + // Have a new lines channel (e.g. new input file) + case <-ctx.Done(): + default: + // No new lines channel found. + return + } } - maprLine := strings.TrimSpace(string(line.Content)) + maprLine := strings.TrimSpace(line.Content.String()) fields, err := a.parser.MakeFields(maprLine) - logger.Debug(fields, err) + // Can't recycle it here yet, as field slices are still + // TODO: Add unit test reading from multiple mapreduce files lines. + // TODO: Add capability to recycle this bytes buffer. + //pool.RecycleBytesBuffer(line.Content) if err != nil { - logger.Error(err) + // Should fields be ignored anyway? + if err != logformat.ErrIgnoreFields { + dlog.Common.Error(fields, err) + } continue } if !a.query.WhereClause(fields) { @@ -149,7 +160,7 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { } select { - case ch <- fields: + case fieldsCh <- fields: case <-ctx.Done(): } case <-ctx.Done(): @@ -158,45 +169,42 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { } }() - return ch + return fieldsCh } -func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { - ch := make(chan map[string]string) +func (a *Aggregate) setAdditionalFields(ctx context.Context, + fieldsCh <-chan map[string]string) <-chan map[string]string { + newFieldsCh := make(chan map[string]string) go func() { - defer close(ch) - + defer close(newFieldsCh) for { - // fieldsCh will be closed via 'makeFields' if ctx is done fields, ok := <-fieldsCh if !ok { return } if err := a.query.SetClause(fields); err != nil { - logger.Error(err) + dlog.Common.Error(err) } select { - case ch <- fields: + case newFieldsCh <- fields: case <-ctx.Done(): } } }() - - return ch + return newFieldsCh } -func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { - group := mapr.NewGroupSet() +func (a *Aggregate) aggregateAndSerialize(ctx context.Context, + fieldsCh <-chan map[string]string, maprMessages chan<- string) { + group := mapr.NewGroupSet() serialize := func() { - logger.Info("Serializing mapreduce result") - group.Serialize(ctx, maprLines) + dlog.Common.Info("Serializing mapreduce result") + group.Serialize(ctx, maprMessages) group = mapr.NewGroupSet() - logger.Info("Done serializing mapreduce result") } - for { select { case fields, ok := <-fieldsCh: @@ -207,9 +215,6 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin a.aggregate(group, fields) case <-a.serialize: serialize() - case <-a.flush: - serialize() - a.flushed <- struct{}{} case <-ctx.Done(): return } @@ -217,12 +222,10 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin } func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { - //logger.Trace("Aggregating", group, fields) var sb strings.Builder - for i, field := range a.query.GroupBy { if i > 0 { - sb.WriteString(" ") + sb.WriteString(protocol.AggregateGroupKeyCombinator) } if val, ok := fields[field]; ok { sb.WriteString(val) @@ -235,7 +238,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { for _, sc := range a.query.Select { if val, ok := fields[sc.Field]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { - logger.Error(err) + dlog.Common.Error(err) continue } addedSample = true @@ -246,8 +249,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { set.Samples++ return } - - logger.Trace("Aggregated data locally without adding new samples") + dlog.Common.Trace("Aggregated data locally without adding new samples") } // Serialize all the aggregated data. @@ -255,28 +257,7 @@ func (a *Aggregate) Serialize(ctx context.Context) { select { case a.serialize <- struct{}{}: case <-time.After(time.Minute): - logger.Warn("Starting to serialize mapredice data takes over a minute") + dlog.Common.Warn("Starting to serialize mapredice data takes over a minute") case <-ctx.Done(): } } - -// Flush all data. -func (a *Aggregate) Flush() { - select { - case a.flush <- struct{}{}: - logger.Info("Flushing mapreduce data") - case <-time.After(time.Minute): - logger.Warn("Starting to flush mapreduce data takes over a minute") - return - case <-a.done.Done(): - return - } - - select { - case <-a.flushed: - logger.Info("Done flushing") - case <-time.After(time.Minute): - logger.Warn("Waiting for data to be flushed takes over a minute") - case <-a.done.Done(): - } -} diff --git a/internal/mapr/setclause.go b/internal/mapr/setclause.go index b4c2f73..1843d31 100644 --- a/internal/mapr/setclause.go +++ b/internal/mapr/setclause.go @@ -7,7 +7,6 @@ func (q *Query) SetClause(fields map[string]string) error { if !ok { continue } - switch sc.rType { case FunctionStack: fields[sc.lString] = sc.functionStack.Call(value) @@ -15,6 +14,5 @@ func (q *Query) SetClause(fields map[string]string) error { fields[sc.lString] = value } } - return nil } diff --git a/internal/mapr/setcondition.go b/internal/mapr/setcondition.go index 8c5cfc9..92b21f4 100644 --- a/internal/mapr/setcondition.go +++ b/internal/mapr/setcondition.go @@ -39,20 +39,22 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) { switch setOp { case "=": default: - return sc, nil, errors.New(invalidQuery + "Unknown operation in 'set' clause: " + setOp) + return sc, nil, errors.New(invalidQuery + "Unknown operation in 'set' " + + "clause: " + setOp) } if !tokens[0].isBareword { - return sc, nil, errors.New(invalidQuery + "Expected bareword at 'set' clause's lValue: " + tokens[0].str) + return sc, nil, errors.New(invalidQuery + "Expected bareword at 'set' " + + "clause's lValue: " + tokens[0].str) } - sc.lString = tokens[0].str if !strings.HasPrefix(sc.lString, "$") { - return sc, nil, errors.New(invalidQuery + "Expected field variable name (starting with $) at 'set' clause's lValue: " + tokens[0].str) + return sc, nil, errors.New(invalidQuery + "Expected field variable name " + + "(starting with $) at 'set' clause's lValue: " + tokens[0].str) } sc.rType = Field - rString := tokens[2].str + // Seems like a function call? if strings.HasSuffix(rString, ")") { functionStack, functionArg, err := funcs.NewFunctionStack(tokens[2].str) @@ -72,7 +74,6 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) { } else { sc.rType = Field } - return sc, tokens[3:], nil } @@ -84,10 +85,8 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) { if err != nil { return nil, err } - set = append(set, sc) tokens = tokensConsumeOptional(tokens, ",") } - return } diff --git a/internal/mapr/token.go b/internal/mapr/token.go index 8972188..6ac7631 100644 --- a/internal/mapr/token.go +++ b/internal/mapr/token.go @@ -4,7 +4,8 @@ import ( "strings" ) -var keywords = [...]string{"select", "from", "where", "set", "group", "rorder", "order", "interval", "limit", "outfile", "logformat"} +var keywords = [...]string{"select", "from", "where", "set", "group", "rorder", + "order", "interval", "limit", "outfile", "logformat"} // Represents a parsed token, used to parse the mapr query. type token struct { @@ -16,13 +17,11 @@ func (t token) isKeyword() bool { if !t.isBareword { return false } - for _, keyword := range keywords { if strings.ToLower(t.str) == keyword { return true } } - return false } @@ -32,7 +31,6 @@ func (t token) String() string { func tokenize(queryStr string) []token { var tokens []token - for i, part := range strings.Split(queryStr, "\"") { // Even i, means that it is not a quoted string if i%2 == 0 { @@ -53,17 +51,15 @@ func tokenize(queryStr string) []token { } tokens = append(tokens, token) } - return tokens } func tokensConsume(tokens []token) ([]token, []token) { - //logger.Trace("=====================") + //dlog.Common.Trace("=====================") var consumed []token - for i, t := range tokens { if t.isKeyword() { - //logger.Trace("keyword", t) + //dlog.Common.Trace("keyword", t) return tokens[i:], consumed } // strip escapes, such as ` from `foo`, this allows to use keywords as field names @@ -73,7 +69,7 @@ func tokensConsume(tokens []token) ([]token, []token) { } if t.str[0] == '`' && t.str[length-1] == '`' { stripped := t.str[1 : length-1] - //logger.Trace("stripped", stripped) + //dlog.Common.Trace("stripped", stripped) t := token{ str: stripped, isBareword: t.isBareword, @@ -81,11 +77,10 @@ func tokensConsume(tokens []token) ([]token, []token) { consumed = append(consumed, t) continue } - //logger.Trace("bare", token) + //dlog.Common.Trace("bare", token) consumed = append(consumed, t) } - - //logger.Trace("result", consumed) + //dlog.Common.Trace("result", consumed) return nil, consumed } @@ -95,7 +90,6 @@ func tokensConsumeStr(tokens []token) ([]token, []string) { for _, token := range found { strings = append(strings, token.str) } - return tokens, strings } @@ -106,6 +100,5 @@ func tokensConsumeOptional(tokens []token, optional string) []token { if strings.ToLower(tokens[0].str) == strings.ToLower(optional) { return tokens[1:] } - return tokens } diff --git a/internal/mapr/whereclause.go b/internal/mapr/whereclause.go index cc1c164..d9f32eb 100644 --- a/internal/mapr/whereclause.go +++ b/internal/mapr/whereclause.go @@ -3,14 +3,13 @@ package mapr import ( "strconv" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // WhereClause interprets the where clause of the mapreduce query. func (q *Query) WhereClause(fields map[string]string) bool { for _, wc := range q.Where { var ok bool - if wc.Operation > FloatOperation { var lValue, rValue float64 if lValue, ok = whereClauseFloatValue(fields, wc.lString, wc.lFloat, wc.lType); !ok { @@ -36,11 +35,12 @@ func (q *Query) WhereClause(fields map[string]string) bool { return false } } - return true } -func whereClauseFloatValue(fields map[string]string, str string, float float64, t fieldType) (float64, bool) { +func whereClauseFloatValue(fields map[string]string, str string, float float64, + t fieldType) (float64, bool) { + switch t { case Float: return float, true @@ -55,12 +55,14 @@ func whereClauseFloatValue(fields map[string]string, str string, float float64, } return f, true default: - logger.Error("Unexpected argument in 'where' clause", str, float, t) + dlog.Common.Error("Unexpected argument in 'where' clause", str, float, t) return 0, false } } -func whereClauseStringValue(fields map[string]string, str string, t fieldType) (string, bool) { +func whereClauseStringValue(fields map[string]string, str string, + t fieldType) (string, bool) { + switch t { case Field: value, ok := fields[str] @@ -71,7 +73,7 @@ func whereClauseStringValue(fields map[string]string, str string, t fieldType) ( case String: return str, true default: - logger.Error("Unexpected argument in 'where' clause", str, t) + dlog.Common.Error("Unexpected argument in 'where' clause", str, t) return str, false } } diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go index 7a60dba..280dcfb 100644 --- a/internal/mapr/wherecondition.go +++ b/internal/mapr/wherecondition.go @@ -6,7 +6,7 @@ import ( "strconv" "strings" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // QueryOperation determines the mapreduce operation. @@ -46,15 +46,18 @@ type whereCondition struct { } func (wc *whereCondition) String() string { - return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,lType:%s,rString:%s,rFloat:%v,rType:%s)", - wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, wc.rFloat, wc.rType.String()) + return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,"+ + "lType:%s,rString:%s,rFloat:%v,rType:%s)", + wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, + wc.rFloat, wc.rType.String()) } func makeWhereConditions(tokens []token) (where []whereCondition, err error) { parse := func(tokens []token) (whereCondition, []token, error) { var wc whereCondition if len(tokens) < 3 { - return wc, nil, errors.New(invalidQuery + "Not enough arguments in 'where' clause") + err := errors.New(invalidQuery + "Not enough arguments in 'where' clause") + return wc, nil, err } whereOp := strings.ToLower(tokens[1].str) @@ -94,7 +97,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { case "nhassuffix": wc.Operation = StringNotHasSuffix default: - return wc, nil, errors.New(invalidQuery + "Unknown operation in 'where' clause: " + whereOp) + return wc, nil, errors.New(invalidQuery + + "Unknown operation in 'where' clause: " + whereOp) } wc.lString = tokens[0].str @@ -102,7 +106,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { if wc.Operation > FloatOperation { if !tokens[0].isBareword { - return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's lValue: " + tokens[0].str) + return wc, nil, errors.New(invalidQuery + + "Expected bareword at 'where' clause's lValue: " + tokens[0].str) } if f, err := strconv.ParseFloat(wc.lString, 64); err == nil { wc.lFloat = f @@ -112,7 +117,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { } if !tokens[2].isBareword { - return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's rValue: " + tokens[2].str) + return wc, nil, errors.New(invalidQuery + + "Expected bareword at 'where' clause's rValue: " + tokens[2].str) } if f, err := strconv.ParseFloat(wc.rString, 64); err == nil { wc.rFloat = f @@ -133,23 +139,19 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) { } else { wc.rType = String } - return wc, tokens[3:], nil } for len(tokens) > 0 { var wc whereCondition var err error - wc, tokens, err = parse(tokens) if err != nil { return nil, err } - where = append(where, wc) tokens = tokensConsumeOptional(tokens, "and") } - return } @@ -168,9 +170,8 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool { case FloatGe: return lValue >= rValue default: - logger.Error("Unknown float operation", lValue, wc.Operation, rValue) + dlog.Common.Error("Unknown float operation", lValue, wc.Operation, rValue) } - return false } @@ -193,8 +194,7 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool { case StringNotHasSuffix: return !strings.HasSuffix(lValue, rValue) default: - logger.Error("Unknown string operation", lValue, wc.Operation, rValue) + dlog.Common.Error("Unknown string operation", lValue, wc.Operation, rValue) } - return false } |
