summaryrefslogtreecommitdiff
path: root/internal/mapr
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2021-10-21 21:28:49 +0300
committerPaul Buetow <pbuetow@mimecast.com>2021-10-21 21:28:49 +0300
commitf4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch)
treeea5e4a2d2a67035f645bdee496ae55a52034178a /internal/mapr
parentd80d6070557e3a800e3a54967af9eced518f116b (diff)
parent739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff)
merge develop
Diffstat (limited to 'internal/mapr')
-rw-r--r--internal/mapr/aggregateset.go29
-rw-r--r--internal/mapr/client/aggregate.go29
-rw-r--r--internal/mapr/funcs/function.go11
-rw-r--r--internal/mapr/funcs/function_test.go21
-rw-r--r--internal/mapr/funcs/maskdigits.go2
-rw-r--r--internal/mapr/globalgroupset.go11
-rw-r--r--internal/mapr/groupset.go291
-rw-r--r--internal/mapr/logformat/default.go41
-rw-r--r--internal/mapr/logformat/default_test.go88
-rw-r--r--internal/mapr/logformat/generickv.go31
-rw-r--r--internal/mapr/logformat/parser.go15
-rw-r--r--internal/mapr/query.go21
-rw-r--r--internal/mapr/query_test.go125
-rw-r--r--internal/mapr/selectcondition.go12
-rw-r--r--internal/mapr/server/aggregate.go155
-rw-r--r--internal/mapr/setclause.go2
-rw-r--r--internal/mapr/setcondition.go15
-rw-r--r--internal/mapr/token.go21
-rw-r--r--internal/mapr/whereclause.go16
-rw-r--r--internal/mapr/wherecondition.go30
20 files changed, 607 insertions, 359 deletions
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go
index a6cc6eb..c50c7a1 100644
--- a/internal/mapr/aggregateset.go
+++ b/internal/mapr/aggregateset.go
@@ -5,6 +5,10 @@ import (
"fmt"
"strconv"
"strings"
+
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
)
// AggregateSet represents aggregated key/value pairs from the
@@ -33,8 +37,7 @@ func (s *AggregateSet) String() string {
// Merge one aggregate set into this one.
func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
s.Samples += set.Samples
- //logger.Trace("Merge", set)
-
+ //dlog.Common.Trace("Merge", set)
for _, sc := range query.Select {
storage := sc.FieldStorage
switch sc.Operation {
@@ -66,24 +69,27 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
// Serialize the aggregate set so it can be sent over the wire.
func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) {
- //logger.Trace("Serialising mapr.AggregateSet", s)
- var sb strings.Builder
+ dlog.Common.Trace("Serialising mapr.AggregateSet", s)
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
sb.WriteString(groupKey)
- sb.WriteString("āž”")
- sb.WriteString(fmt.Sprintf("%dāž”", s.Samples))
+ sb.WriteString(protocol.AggregateDelimiter)
+ sb.WriteString(fmt.Sprintf("%d", s.Samples))
+ sb.WriteString(protocol.AggregateDelimiter)
for k, v := range s.FValues {
sb.WriteString(k)
- sb.WriteString("=")
- sb.WriteString(fmt.Sprintf("%vāž”", v))
+ sb.WriteString(protocol.AggregateKVDelimiter)
+ sb.WriteString(fmt.Sprintf("%v", v))
+ sb.WriteString(protocol.AggregateDelimiter)
}
for k, v := range s.SValues {
sb.WriteString(k)
- sb.WriteString("=")
+ sb.WriteString(protocol.AggregateKVDelimiter)
sb.WriteString(v)
- sb.WriteString("āž”")
+ sb.WriteString(protocol.AggregateDelimiter)
}
select {
@@ -108,7 +114,6 @@ func (s *AggregateSet) addFloatMin(key string, value float64) {
s.FValues[key] = value
return
}
-
if f > value {
s.FValues[key] = value
}
@@ -121,7 +126,6 @@ func (s *AggregateSet) addFloatMax(key string, value float64) {
s.FValues[key] = value
return
}
-
if f < value {
s.FValues[key] = value
}
@@ -140,7 +144,6 @@ func (s *AggregateSet) setFloat(key string, value float64) {
// Aggregate data to the aggregate set.
func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) {
var f float64
-
// First check if we can aggregate anything without converting value to float.
switch agg {
case Count:
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 10b34d4..02a6a5a 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -1,11 +1,13 @@
package client
import (
+ "fmt"
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
+ "github.com/mimecast/dtail/internal/protocol"
)
// Aggregate mapreduce data on the DTail client side.
@@ -21,7 +23,9 @@ type Aggregate struct {
}
// NewAggregate create new client aggregator.
-func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate {
+func NewAggregate(server string, query *mapr.Query,
+ globalGroup *mapr.GlobalGroupSet) *Aggregate {
+
return &Aggregate{
query: query,
group: mapr.NewGroupSet(),
@@ -31,20 +35,26 @@ func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGrou
}
// Aggregate data from mapr log line into local (and global) group sets.
-func (a *Aggregate) Aggregate(parts []string) {
+func (a *Aggregate) Aggregate(message string) error {
+ parts := strings.Split(message, protocol.AggregateDelimiter)
+ if len(parts) < 4 {
+ return fmt.Errorf("aggregate message without any real data")
+ }
+
groupKey := parts[0]
samples, err := strconv.Atoi(parts[1])
if err != nil {
- logger.FatalExit("Unable to parse sample count", parts[1], err, parts)
+ return fmt.Errorf("unable to parse sample count '%s': %v", parts[1], err)
}
+
fields := a.makeFields(parts[2:])
set := a.group.GetSet(groupKey)
-
var addedSamples bool
+
for _, sc := range a.query.Select {
if val, ok := fields[sc.FieldStorage]; ok {
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
continue
}
addedSamples = true
@@ -63,19 +73,18 @@ func (a *Aggregate) Aggregate(parts []string) {
// Re-init local group (make it empty again).
a.group.InitSet()
}
+ return nil
}
// Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"].
func (a *Aggregate) makeFields(parts []string) map[string]string {
fields := make(map[string]string, len(parts))
-
for _, part := range parts {
- kv := strings.SplitN(part, "=", 2)
- if len(kv) < 2 {
+ kv := strings.SplitN(part, protocol.AggregateKVDelimiter, 2)
+ if len(kv) != 2 {
continue
}
fields[kv[0]] = kv[1]
}
-
return fields
}
diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go
index 1a89c3a..418d86f 100644
--- a/internal/mapr/funcs/function.go
+++ b/internal/mapr/funcs/function.go
@@ -19,13 +19,12 @@ type Function struct {
// FunctionStack is a list of functions stacked each other
type FunctionStack []Function
-// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns a corresponding function stack.
+// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns
+// a corresponding function stack.
func NewFunctionStack(in string) (FunctionStack, string, error) {
var fs FunctionStack
-
getCallback := func(name string) (CallbackFunc, error) {
var cb CallbackFunc
-
switch name {
case "md5sum":
return Md5Sum, nil
@@ -51,17 +50,15 @@ func NewFunctionStack(in string) (FunctionStack, string, error) {
fs = append(fs, Function{name, call})
aux = aux[index+1 : len(aux)-1]
}
-
return fs, aux, nil
}
// Call the function stack.
func (fs FunctionStack) Call(str string) string {
for i := len(fs) - 1; i >= 0; i-- {
- //logger.Debug("Call", fs[i].Name, str)
+ //dlog.Common.Debug("Call", fs[i].Name, str)
str = fs[i].call(str)
- //logger.Debug("Call.result", fs[i].Name, str)
+ //dlog.Common.Debug("Call.result", fs[i].Name, str)
}
-
return str
}
diff --git a/internal/mapr/funcs/function_test.go b/internal/mapr/funcs/function_test.go
index 415683c..8b5d8b7 100644
--- a/internal/mapr/funcs/function_test.go
+++ b/internal/mapr/funcs/function_test.go
@@ -6,16 +6,19 @@ func TestFunction(t *testing.T) {
input := "md5sum($line)"
fs, arg, err := NewFunctionStack(input)
if err != nil {
- t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs)
+ t.Errorf("error parsing function input '%s': %s (%v)\n",
+ input, err.Error(), fs)
}
if arg != "$line" {
- t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs)
+ t.Errorf("error parsing function input '%s': expected argument '$line' but "+
+ "got '%s' (%v)\n", input, arg, fs)
}
t.Log(input, fs, arg)
result := fs.Call(input)
if result != "b38699013d79e50d9d122433753959c1" {
- t.Errorf("error executing function stack '%s': expected result 'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs)
+ t.Errorf("error executing function stack '%s': expected result "+
+ "'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs)
}
input = "maskdigits(md5sum(maskdigits($line)))"
@@ -24,22 +27,26 @@ func TestFunction(t *testing.T) {
t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs)
}
if arg != "$line" {
- t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs)
+ t.Errorf("error parsing function input '%s': expected argument '$line' but "+
+ "got '%s' (%v)\n", input, arg, fs)
}
t.Log(input, fs, arg)
result = fs.Call(input)
if result != ".fac.bbe..bb.........d...a.c..b." {
- t.Errorf("error executing function stack '%s': expected result '.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs)
+ t.Errorf("error executing function stack '%s': expected result "+
+ "'.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs)
}
input = "md5sum$line)"
if fs, _, err := NewFunctionStack(input); err == nil {
- t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs)
+ t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n",
+ input, fs)
}
input = "md5sum(makedigits$line))"
if fs, _, err := NewFunctionStack(input); err == nil {
- t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs)
+ t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n",
+ input, fs)
}
}
diff --git a/internal/mapr/funcs/maskdigits.go b/internal/mapr/funcs/maskdigits.go
index d51f3d8..925ec4d 100644
--- a/internal/mapr/funcs/maskdigits.go
+++ b/internal/mapr/funcs/maskdigits.go
@@ -3,12 +3,10 @@ package funcs
// MaskDigits masks all digits (replaces them with .)
func MaskDigits(input string) string {
s := []byte(input)
-
for i, b := range s {
if '0' <= b && b <= '9' {
s[i] = '.'
}
}
-
return string(s)
}
diff --git a/internal/mapr/globalgroupset.go b/internal/mapr/globalgroupset.go
index cfab506..2d7f10b 100644
--- a/internal/mapr/globalgroupset.go
+++ b/internal/mapr/globalgroupset.go
@@ -17,7 +17,6 @@ func NewGlobalGroupSet() *GlobalGroupSet {
semaphore: make(chan struct{}, 1),
}
g.InitSet()
-
return &g
}
@@ -30,7 +29,6 @@ func (g *GlobalGroupSet) String() string {
func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error {
g.semaphore <- struct{}{}
defer func() { <-g.semaphore }()
-
return g.merge(query, group)
}
@@ -54,7 +52,6 @@ func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error {
return err
}
}
-
return nil
}
@@ -67,7 +64,6 @@ func (g *GlobalGroupSet) IsEmpty() bool {
func (g *GlobalGroupSet) NumSets() int {
g.semaphore <- struct{}{}
defer func() { <-g.semaphore }()
-
return len(g.sets)
}
@@ -79,7 +75,6 @@ func (g *GlobalGroupSet) SwapOut() *GroupSet {
set := &GroupSet{sets: g.sets}
g.InitSet()
-
return set
}
@@ -87,14 +82,12 @@ func (g *GlobalGroupSet) SwapOut() *GroupSet {
func (g *GlobalGroupSet) WriteResult(query *Query) error {
g.semaphore <- struct{}{}
defer func() { <-g.semaphore }()
-
return g.GroupSet.WriteResult(query)
}
// Result returns the result of the mapreduce aggregation as a string.
-func (g *GlobalGroupSet) Result(query *Query) (string, int, error) {
+func (g *GlobalGroupSet) Result(query *Query, rowsLimit int) (string, int, error) {
g.semaphore <- struct{}{}
defer func() { <-g.semaphore }()
-
- return g.GroupSet.Result(query)
+ return g.GroupSet.Result(query, rowsLimit)
}
diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go
index b5c8a48..6ffc8b9 100644
--- a/internal/mapr/groupset.go
+++ b/internal/mapr/groupset.go
@@ -4,13 +4,16 @@ import (
"context"
"errors"
"fmt"
- "io/ioutil"
"os"
"sort"
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/color"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
)
// GroupSet represents a map of aggregate sets. The group sets
@@ -22,6 +25,14 @@ type GroupSet struct {
sets map[string]*AggregateSet
}
+// Internal helper type
+type result struct {
+ groupKey string
+ values []string
+ widths []int
+ orderBy float64
+}
+
// NewGroupSet returns a new empty group set.
func NewGroupSet() *GroupSet {
g := GroupSet{}
@@ -57,28 +68,181 @@ func (g *GroupSet) Serialize(ctx context.Context, ch chan<- string) {
}
// Result returns a nicely formated result of the query from the group set.
-func (g *GroupSet) Result(query *Query) (string, int, error) {
- return g.limitedResult(query, query.Limit, "\t", " ", false)
+func (g *GroupSet) Result(query *Query, rowsLimit int) (string, int, error) {
+ rows, widths, err := g.result(query, true)
+ if err != nil {
+ return "", 0, err
+ }
+ if query.Limit != -1 {
+ rowsLimit = query.Limit
+ }
+
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
+
+ // Generate header now
+ lastIndex := len(query.Select) - 1
+ for i, sc := range query.Select {
+ format := fmt.Sprintf(" %%%ds ", widths[i])
+ str := fmt.Sprintf(format, sc.FieldStorage)
+ if config.Client.TermColorsEnable {
+ attrs := []color.Attribute{config.Client.TermColors.MaprTable.HeaderAttr}
+ if sc.FieldStorage == query.OrderBy {
+ attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderSortKeyAttr)
+ }
+
+ for _, groupBy := range query.GroupBy {
+ if sc.FieldStorage == groupBy {
+ attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderGroupKeyAttr)
+ break
+ }
+ }
+
+ color.PaintWithAttrs(sb, str,
+ config.Client.TermColors.MaprTable.HeaderFg,
+ config.Client.TermColors.MaprTable.HeaderBg,
+ attrs)
+ } else {
+ sb.WriteString(str)
+ }
+
+ if i == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+
+ for i := 0; i < len(query.Select); i++ {
+ str := fmt.Sprintf("-%s-", strings.Repeat("-", widths[i]))
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, str,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(str)
+ }
+ if i == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+
+ // And now write the data
+ for i, r := range rows {
+ if i == rowsLimit {
+ break
+ }
+ for j, value := range r.values {
+ format := fmt.Sprintf(" %%%ds ", widths[j])
+ str := fmt.Sprintf(format, value)
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, str,
+ config.Client.TermColors.MaprTable.DataFg,
+ config.Client.TermColors.MaprTable.DataBg,
+ config.Client.TermColors.MaprTable.DataAttr)
+ } else {
+ sb.WriteString(str)
+ }
+
+ if j == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.DelimiterFg,
+ config.Client.TermColors.MaprTable.DelimiterBg,
+ config.Client.TermColors.MaprTable.DelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+ }
+
+ return sb.String(), len(rows), nil
}
-// WriteResult writes the result to an outfile.
+func (*GroupSet) writeQueryFile(query *Query) error {
+ queryFile := fmt.Sprintf("%s.query", query.Outfile)
+ tmpQueryFile := fmt.Sprintf("%s.tmp", queryFile)
+ dlog.Common.Debug("Writing query file", queryFile)
+
+ fd, err := os.Create(tmpQueryFile)
+ if err != nil {
+ return err
+ }
+ defer fd.Close()
+
+ fd.WriteString(query.RawQuery)
+ os.Rename(tmpQueryFile, queryFile)
+ return nil
+}
+
+// WriteResult writes the result to an CSV outfile.
func (g *GroupSet) WriteResult(query *Query) error {
if !query.HasOutfile() {
return errors.New("No outfile specified")
}
+ if err := g.writeQueryFile(query); err != nil {
+ return err
+ }
- // -1: Don't limit the result, include all data sets
- result, _, err := g.limitedResult(query, query.Limit, "", ",", true)
+ rows, _, err := g.result(query, false)
if err != nil {
return err
}
- logger.Info("Writing outfile", query.Outfile)
+ dlog.Common.Info("Writing outfile", query.Outfile)
tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile)
- if err := ioutil.WriteFile(tmpOutfile, []byte(result), 0644); err != nil {
+ fd, err := os.Create(tmpOutfile)
+ if err != nil {
return err
}
+ defer fd.Close()
+
+ // Generate header now
+ lastIndex := len(query.Select) - 1
+ for i, sc := range query.Select {
+ fd.WriteString(sc.FieldStorage)
+ if i == lastIndex {
+ continue
+ }
+ fd.WriteString(protocol.CSVDelimiter)
+ }
+ fd.WriteString("\n")
+
+ // And now write the data
+ for i, r := range rows {
+ if i == query.Limit {
+ break
+ }
+ for j, value := range r.values {
+ fd.WriteString(value)
+ if j == lastIndex {
+ continue
+ }
+ fd.WriteString(protocol.CSVDelimiter)
+ }
+ fd.WriteString("\n")
+ }
if err := os.Rename(tmpOutfile, query.Outfile); err != nil {
os.Remove(tmpOutfile)
@@ -88,32 +252,21 @@ func (g *GroupSet) WriteResult(query *Query) error {
return nil
}
-// Return a nicely formated result of the query from the group set.
-func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) {
- type result struct {
- groupKey string
- resultStr string
- orderBy float64
- }
-
- var resultSlice []result
+// Return a sorted result slice of the query from the group set.
+func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, error) {
+ var rows []result
+ widths := make([]int, len(query.Select))
+ var valueStr string
+ var value float64
for groupKey, set := range g.sets {
- var sb strings.Builder
r := result{groupKey: groupKey}
- lastIndex := len(query.Select) - 1
for i, sc := range query.Select {
- storage := sc.FieldStorage
- orderByThis := storage == query.OrderBy
-
switch sc.Operation {
case Count:
- value := set.FValues[storage]
- sb.WriteString(fmt.Sprintf("%d", int(value)))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage]
+ valueStr = fmt.Sprintf("%d", int(value))
case Len:
fallthrough
case Sum:
@@ -121,74 +274,48 @@ func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSepa
case Min:
fallthrough
case Max:
- value := set.FValues[storage]
- sb.WriteString(fmt.Sprintf("%f", value))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage]
+ valueStr = fmt.Sprintf("%f", value)
case Last:
- value := set.SValues[storage]
- if orderByThis {
- f, err := strconv.ParseFloat(value, 64)
- if err == nil {
- r.orderBy = f
- }
- }
- sb.WriteString(value)
+ valueStr = set.SValues[sc.FieldStorage]
+ value, _ = strconv.ParseFloat(valueStr, 64)
case Avg:
- value := set.FValues[storage] / float64(set.Samples)
- sb.WriteString(fmt.Sprintf("%f", value))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage] / float64(set.Samples)
+ valueStr = fmt.Sprintf("%f", value)
default:
- return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
+ return rows, widths, fmt.Errorf("Unknown aggregation method '%v'",
+ sc.Operation)
}
- if i != lastIndex {
- sb.WriteString(fieldSeparator)
+
+ if sc.FieldStorage == query.OrderBy {
+ r.orderBy = value
}
- }
+ r.values = append(r.values, valueStr)
- r.resultStr = sb.String()
- resultSlice = append(resultSlice, r)
+ if !gatherWidths {
+ continue
+ }
+ if widths[i] < len(sc.FieldStorage) {
+ widths[i] = len(sc.FieldStorage)
+ }
+ if widths[i] < len(valueStr) {
+ widths[i] = len(valueStr)
+ }
+ }
+ rows = append(rows, r)
}
if query.OrderBy != "" {
if query.ReverseOrder {
- sort.SliceStable(resultSlice, func(i, j int) bool {
- return resultSlice[i].orderBy < resultSlice[j].orderBy
+ sort.SliceStable(rows, func(i, j int) bool {
+ return rows[i].orderBy < rows[j].orderBy
})
} else {
- sort.SliceStable(resultSlice, func(i, j int) bool {
- return resultSlice[i].orderBy > resultSlice[j].orderBy
+ sort.SliceStable(rows, func(i, j int) bool {
+ return rows[i].orderBy > rows[j].orderBy
})
}
}
- var sb strings.Builder
-
- // Write header first
- if addHeader {
- lastIndex := len(query.Select) - 1
- sb.WriteString(lineStarter)
- for i, sc := range query.Select {
- sb.WriteString(sc.FieldStorage)
- if i != lastIndex {
- sb.WriteString(fieldSeparator)
- }
- }
- sb.WriteString("\n")
- }
-
- // And now write the data
- for i, r := range resultSlice {
- if i == limit {
- break
- }
- sb.WriteString(lineStarter)
- sb.WriteString(r.resultStr)
- sb.WriteString("\n")
- }
-
- return sb.String(), len(resultSlice), nil
+ return rows, widths, nil
}
diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go
index 44bf558..9b6c855 100644
--- a/internal/mapr/logformat/default.go
+++ b/internal/mapr/logformat/default.go
@@ -1,14 +1,23 @@
package logformat
import (
- "errors"
+ "fmt"
"strings"
+
+ "github.com/mimecast/dtail/internal/protocol"
)
-// MakeFieldsDEFAULT is the default log file mapreduce parser.
+// MakeFieldsDEFAULT is the default DTail log file key-value parser.
func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
- fields := make(map[string]string, 20)
- splitted := strings.Split(maprLine, "|")
+ splitted := strings.Split(maprLine, protocol.FieldDelimiter)
+
+ if len(splitted) < 11 || !strings.HasPrefix(splitted[9], "MAPREDUCE:") ||
+ !strings.HasPrefix(splitted[0], "INFO") {
+ // Not a DTail mapreduce log line.
+ return nil, ErrIgnoreFields
+ }
+
+ fields := make(map[string]string, len(splitted)+8)
fields["*"] = "*"
fields["$line"] = maprLine
@@ -17,10 +26,30 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
fields["$timezone"] = p.timeZoneName
fields["$timeoffset"] = p.timeZoneOffset
- for _, kv := range splitted {
+ fields["$severity"] = splitted[0]
+ fields["$loglevel"] = splitted[0]
+
+ time := splitted[1]
+ fields["$time"] = time
+ if len(time) == 15 {
+ // Example: 20211002-071209
+ fields["$date"] = time[0:8]
+ fields["$hour"] = time[9:11]
+ fields["$minute"] = time[11:13]
+ fields["$second"] = time[13:]
+ }
+ fields["$pid"] = splitted[2]
+ fields["$caller"] = splitted[3]
+ fields["$cpus"] = splitted[4]
+ fields["$goroutines"] = splitted[5]
+ fields["$cgocalls"] = splitted[6]
+ fields["$loadavg"] = splitted[7]
+ fields["$uptime"] = splitted[8]
+
+ for _, kv := range splitted[10:] {
keyAndValue := strings.SplitN(kv, "=", 2)
if len(keyAndValue) != 2 {
- return fields, errors.New("Error parsing mapr token: " + kv)
+ return fields, fmt.Errorf("Unable to parse key-value token '%s'", kv)
}
fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
}
diff --git a/internal/mapr/logformat/default_test.go b/internal/mapr/logformat/default_test.go
index 10ec8b7..28e1acc 100644
--- a/internal/mapr/logformat/default_test.go
+++ b/internal/mapr/logformat/default_test.go
@@ -1,6 +1,7 @@
package logformat
import (
+ "fmt"
"testing"
)
@@ -10,26 +11,83 @@ func TestDefaultLogFormat(t *testing.T) {
t.Errorf("Unable to create parser: %s", err.Error())
}
- fields, err := parser.MakeFields("foo=bar|baz=bay")
+ date := "20211002"
+ hour := "07"
+ minute := "23"
+ second := "42"
+ time := fmt.Sprintf("%s-%s%s%s", date, hour, minute, second)
- if err != nil {
- t.Errorf("Unable to parse: %s", err.Error())
+ inputs := []string{
+ fmt.Sprintf("INFO|%s|1|default_test.go:0|8|14|7|0.21|471h0m21s|MAPREDUCE:STATS|foo=bar|bar=foo", time),
+ fmt.Sprintf("INFO|%s|1|default_test.go:0|8|14|7|0.21|471h0m21s|MAPREDUCE:STATS|bar=foo|foo=bar", time),
}
- if bar, ok := fields["foo"]; !ok {
- t.Errorf("Expected field 'foo', but no such field there\n")
- } else if bar != "bar" {
- t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar)
- }
+ for _, input := range inputs {
+ fields, err := parser.MakeFields(input)
+
+ if err != nil {
+ t.Errorf("Parser unable to make fields: %s", err.Error())
+ }
+
+ if val, ok := fields["$severity"]; !ok {
+ t.Errorf("Expected field '$severity', but no such field there in '%s'\n", input)
+ } else if val != "INFO" {
+ t.Errorf("Expected 'Info' stored in field '$severity', but got '%s' in '%s'\n",
+ val, input)
+ }
+
+ if val, ok := fields["$time"]; !ok {
+ t.Errorf("Expected field '$time', but no such field there in '%s'\n", input)
+ } else if val != time {
+ t.Errorf("Expected '%s' stored in field '$time', but got '%s' in '%s'\n",
+ time, val, input)
+ }
+
+ if val, ok := fields["$date"]; !ok {
+ t.Errorf("Expected field '$date', but no such field there in '%s'\n", input)
+ } else if val != date {
+ t.Errorf("Expected '%s' stored in field '$date', but got '%s' in '%s'\n",
+ date, val, input)
+ }
+
+ if val, ok := fields["$hour"]; !ok {
+ t.Errorf("Expected field '$hour', but no such field there in '%s'\n", input)
+ } else if val != hour {
+ t.Errorf("Expected '%s' stored in field '$hour', but got '%s' in '%s'\n",
+ hour, val, input)
+ }
+
+ if val, ok := fields["$minute"]; !ok {
+ t.Errorf("Expected field '$minute', but no such field there in '%s'\n", input)
+ } else if val != minute {
+ t.Errorf("Expected '%s' stored in field '$minute', but got '%s' in '%s'\n",
+ minute, val, input)
+ }
+
+ if val, ok := fields["$second"]; !ok {
+ t.Errorf("Expected field '$second', but no such field there in '%s'\n", input)
+ } else if val != second {
+ t.Errorf("Expected '%s' stored in field '$second', but got '%s' in '%s'\n",
+ second, val, input)
+ }
+
+ if val, ok := fields["foo"]; !ok {
+ t.Errorf("Expected field 'foo', but no such field there in '%s'\n", input)
+ } else if val != "bar" {
+ t.Errorf("Expected 'bar' stored in field 'foo', but got '%s' in '%s'\n",
+ val, input)
+ }
- if bay, ok := fields["baz"]; !ok {
- t.Errorf("Expected field 'baz', but no such field there\n")
- } else if bay != "bay" {
- t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay)
+ if val, ok := fields["bar"]; !ok {
+ t.Errorf("Expected field 'bar', but no such field there in '%s'\n", input)
+ } else if val != "foo" {
+ t.Errorf("Expected 'foo' stored in field 'bar', but got '%s' in '%s'\n",
+ val, input)
+ }
}
- _, err = parser.MakeFields("foo=bar|bazbay")
- if err == nil {
- t.Errorf("Expected error but didn't: %s", err.Error())
+ fields, err := parser.MakeFields("foozoo=bar|bazbay")
+ if _, ok := fields["foo"]; ok {
+ t.Errorf("Expected fiending field 'foo', but found it\n")
}
}
diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go
new file mode 100644
index 0000000..433eb5f
--- /dev/null
+++ b/internal/mapr/logformat/generickv.go
@@ -0,0 +1,31 @@
+package logformat
+
+import (
+ "strings"
+
+ "github.com/mimecast/dtail/internal/protocol"
+)
+
+// MakeFieldsGENERIGKV is the generic key-value logfile parser.
+func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error) {
+ splitted := strings.Split(maprLine, protocol.FieldDelimiter)
+ fields := make(map[string]string, len(splitted))
+
+ fields["*"] = "*"
+ fields["$line"] = maprLine
+ fields["$empty"] = ""
+ fields["$hostname"] = p.hostname
+ fields["$timezone"] = p.timeZoneName
+ fields["$timeoffset"] = p.timeZoneOffset
+
+ for _, kv := range splitted[0:] {
+ keyAndValue := strings.SplitN(kv, "=", 2)
+ if len(keyAndValue) != 2 {
+ //dlog.Common.Debug("Unable to parse key-value token, ignoring it", kv)
+ continue
+ }
+ fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
+ }
+
+ return fields, nil
+}
diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go
index c53729a..129081d 100644
--- a/internal/mapr/logformat/parser.go
+++ b/internal/mapr/logformat/parser.go
@@ -8,10 +8,12 @@ import (
"strings"
"time"
- "github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
)
+// ErrIgnoreFields indicates that the fields should be ignored.
+var ErrIgnoreFields error = errors.New("Ignore this field set")
+
// Parser is used to parse the mapreduce information from the server log files.
type Parser struct {
hostname string
@@ -25,11 +27,9 @@ type Parser struct {
// NewParser returns a new log parser.
func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) {
hostname, err := os.Hostname()
-
if err != nil {
return nil, err
}
-
now := time.Now()
zone, offset := now.Zone()
@@ -43,7 +43,6 @@ func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) {
if err != nil {
return nil, err
}
-
return &p, nil
}
@@ -52,7 +51,6 @@ func NewParser(logFormatName string, query *mapr.Query) (*Parser, error) {
// Parser. Whereas MODULENAME must be a upeprcase string.
func (p *Parser) reflectLogFormat(logFormatName string) error {
methodName := fmt.Sprintf("MakeFields%s", strings.ToUpper(logFormatName))
-
rt := reflect.TypeOf(p)
method, ok := rt.MethodByName(methodName)
if !ok {
@@ -61,7 +59,6 @@ func (p *Parser) reflectLogFormat(logFormatName string) error {
p.makeFieldsFunc = method.Func
p.makeFieldsReceiver = reflect.ValueOf(p)
-
return nil
}
@@ -69,17 +66,11 @@ func (p *Parser) reflectLogFormat(logFormatName string) error {
func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err error) {
inputValues := []reflect.Value{p.makeFieldsReceiver, reflect.ValueOf(maprLine)}
returnValues := p.makeFieldsFunc.Call(inputValues)
-
errInterface := returnValues[1].Interface()
-
if errInterface == nil {
fields, err = returnValues[0].Interface().(map[string]string), nil
- logger.Trace("parser.MakeFields", fields, err)
return
}
-
fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error)
- logger.Trace("parser.MakeFields", fields, err)
-
return
}
diff --git a/internal/mapr/query.go b/internal/mapr/query.go
index 01852da..d7c32bd 100644
--- a/internal/mapr/query.go
+++ b/internal/mapr/query.go
@@ -6,8 +6,6 @@ import (
"strconv"
"strings"
"time"
-
- "github.com/mimecast/dtail/internal/io/logger"
)
const (
@@ -34,7 +32,9 @@ type Query struct {
}
func (q Query) String() string {
- return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,Set:%vGroupBy:%v,GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,RawQuery:%s,tokens:%v,LogFormat:%s)",
+ return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,Set:%vGroupBy:%v,"+
+ "GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,"+
+ "RawQuery:%s,tokens:%v,LogFormat:%s)",
q.Select,
q.Table,
q.Where,
@@ -56,20 +56,14 @@ func NewQuery(queryStr string) (*Query, error) {
if queryStr == "" {
return nil, nil
}
-
tokens := tokenize(queryStr)
-
q := Query{
RawQuery: queryStr,
tokens: tokens,
Interval: time.Second * 5,
Limit: -1,
}
-
- err := q.parse(tokens)
-
- logger.Debug(q)
- return &q, err
+ return &q, q.parse(tokens)
}
// HasOutfile returns true if query result will be written to a CVS output file.
@@ -178,13 +172,13 @@ func (q *Query) parse(tokens []token) error {
}
if len(q.Select) < 1 {
- return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none")
+ return errors.New(invalidQuery + "Expected at least one field in 'select' " +
+ "clause but got none")
}
if len(q.GroupBy) == 0 {
field := q.Select[0].Field
q.GroupBy = append(q.GroupBy, field)
}
-
if q.OrderBy != "" {
var orderFieldIsValid bool
for _, sc := range q.Select {
@@ -194,7 +188,8 @@ func (q *Query) parse(tokens []token) error {
}
}
if !orderFieldIsValid {
- return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s', must be present in 'select' clause", q.OrderBy))
+ return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s',"+
+ "must be present in 'select' clause", q.OrderBy))
}
}
diff --git a/internal/mapr/query_test.go b/internal/mapr/query_test.go
index b0b6c3a..88f7387 100644
--- a/internal/mapr/query_test.go
+++ b/internal/mapr/query_test.go
@@ -13,18 +13,25 @@ func TestParseQuerySimple(t *testing.T) {
"select foo from bar where baz <",
"select foo from bar where baz < 100 bay eq 12 group",
"select foo from bar where baz < 100 bay eq 12 group by foo order by",
- "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit",
- "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit set foo = bar;",
+ "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " +
+ "order by foo limit",
+ "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " +
+ "order by foo limit set foo = bar;",
}
okQueries := []string{"select foo from bar",
"select foo from bar where",
"select foo from bar where baz < 100 bay eq 12",
"select foo from bar where baz < 100, bay eq 12",
"select foo from bar where baz < 100 and bay eq 12",
- "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo",
- "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23",
- "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\"",
- "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\" set $foo = maskdigits(bar), $baz = 12, $bay = $foo;",
+ "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " +
+ "order by foo",
+ "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " +
+ "order by foo limit 23",
+ "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " +
+ "order by foo limit 23 outfile \"result.csv\"",
+ "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz " +
+ "order by foo limit 23 outfile \"result.csv\" " +
+ "set $foo = maskdigits(bar), $baz = 12, $bay = $foo;",
}
for _, queryStr := range errorQueries {
@@ -46,8 +53,13 @@ func TestParseQuerySimple(t *testing.T) {
func TestParseQueryDeep(t *testing.T) {
dialects := []string{
- "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23 set $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic",
- "SELECT s1, `from`, COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23 SET $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic",
+ "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq " +
+ "\"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23 " +
+ "set $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic",
+
+ "SELECT s1, `from`, COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq " +
+ "\"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23 " +
+ "SET $foo = maskdigits(bar), $baz = 12, $bay = $foo logformat generic",
}
for _, queryStr := range dialects {
@@ -55,119 +67,144 @@ func TestParseQueryDeep(t *testing.T) {
if err != nil {
t.Errorf("%s: %s", err.Error(), queryStr)
}
-
t.Log(q)
// 'select' clause
if len(q.Select) != 3 {
- t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", q.Select, queryStr, q)
+ t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v",
+ q.Select, queryStr, q)
}
-
if q.Select[0].Field != "s1" {
- t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Field, queryStr, q)
+ t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v",
+ q.Select[0].Field, queryStr, q)
}
if q.Select[0].Operation != Last {
- t.Errorf("Expected 'last' as aggregation function of first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q)
+ t.Errorf("Expected 'last' as aggregation function of first element in "+
+ "'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q)
}
-
if q.Select[1].Field != "from" {
- t.Errorf("Expected 'from' as second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Field, queryStr, q)
+ t.Errorf("Expected 'from' as second element in 'select' clause but got "+
+ "'%v': %s\n%v", q.Select[1].Field, queryStr, q)
}
if q.Select[1].Operation != Last {
- t.Errorf("Expected 'last' as aggregation function of second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q)
+ t.Errorf("Expected 'last' as aggregation function of second element in "+
+ "'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q)
}
-
if q.Select[2].Field != "s3" {
- t.Errorf("Expected 's3' as third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Field, queryStr, q)
+ t.Errorf("Expected 's3' as third element in 'select' clause but got "+
+ "'%v': %s\n%v", q.Select[2].Field, queryStr, q)
}
if q.Select[2].Operation != Count {
- t.Errorf("Expected 'count' as aggregation function of third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q)
+ t.Errorf("Expected 'count' as aggregation function of third element in "+
+ "'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q)
}
if q.Select[2].FieldStorage != "count(s3)" {
- t.Errorf("Expected 'count(s3)' as third element's storage in 'select' clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q)
+ t.Errorf("Expected 'count(s3)' as third element's storage in 'select' "+
+ "clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q)
}
// 'from' clause
if q.Table != "TABLE" {
- t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", q.Table, queryStr, q)
+ t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v",
+ q.Table, queryStr, q)
}
// 'where' clause
if len(q.Where) != 2 {
- t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", q.Where, queryStr, q)
+ t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v",
+ q.Where, queryStr, q)
}
if q.Where[0].lString != "w1" {
- t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", q.Where[0].lString, queryStr, q)
+ t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v",
+ q.Where[0].lString, queryStr, q)
}
if q.Where[0].Operation != FloatEq {
- t.Errorf("Expected FloatEq operation in first 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q)
+ t.Errorf("Expected FloatEq operation in first 'where' condition but got "+
+ "'%v': %s\n%v", q.Where[0].Operation, queryStr, q)
}
if q.Where[0].rFloat != 2 {
- t.Errorf("Expected '2' as float argument in first 'where' condition but got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q)
+ t.Errorf("Expected '2' as float argument in first 'where' condition but "+
+ "got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q)
}
if q.Where[1].lString != "w2" {
- t.Errorf("Expected w2 as second element in 'where' clause but got '%v': %s\n%v", q.Where[1].lString, queryStr, q)
+ t.Errorf("Expected w2 as second element in 'where' clause but got '%v': "+
+ "%s\n%v", q.Where[1].lString, queryStr, q)
}
if q.Where[1].Operation != StringEq {
- t.Errorf("Expected StringEq operation in second 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q)
+ t.Errorf("Expected StringEq operation in second 'where' condition but got "+
+ "'%v': %s\n%v", q.Where[0].Operation, queryStr, q)
}
if q.Where[1].rString != "free beer" {
- t.Errorf("Expected 'free beer' as string argument in second 'where' condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q)
+ t.Errorf("Expected 'free beer' as string argument in second 'where' "+
+ "condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q)
}
// 'group by' clause
if len(q.GroupBy) != 2 {
- t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", q.GroupBy, queryStr, q)
+ t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v",
+ q.GroupBy, queryStr, q)
}
if q.GroupBy[0] != "g1" {
- t.Errorf("Expected 'g1' as first element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[0], queryStr, q)
+ t.Errorf("Expected 'g1' as first element in 'group by' clause but got "+
+ "'%v': %s\n%v", q.GroupBy[0], queryStr, q)
}
if q.GroupBy[1] != "g2" {
- t.Errorf("Expected 'g2' as second element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[1], queryStr, q)
+ t.Errorf("Expected 'g2' as second element in 'group by' clause but got "+
+ "'%v': %s\n%v", q.GroupBy[1], queryStr, q)
}
if q.GroupKey != "g1,g2" {
- t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got '%v': %s\n%v", q.GroupKey, queryStr, q)
+ t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got "+
+ "'%v': %s\n%v", q.GroupKey, queryStr, q)
}
// 'order by' clause
if q.OrderBy != "count(s3)" {
- t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got '%v': %s\n%v", q.OrderBy, queryStr, q)
+ t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got "+
+ "'%v': %s\n%v", q.OrderBy, queryStr, q)
}
// 'interval' clause
if q.Interval != time.Second*time.Duration(10) {
- t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", q.Interval, queryStr, q)
+ t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v",
+ q.Interval, queryStr, q)
}
// 'limit' clause
if q.Limit != 23 {
- t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", q.Limit, queryStr, q)
+ t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v",
+ q.Limit, queryStr, q)
}
// 'set' clause
if q.Set[0].lString != "$foo" {
- t.Errorf("Expected '$foo' lvalue in first 'set' condition clause but got '%v': %s\n%v", q.Set[0].lString, queryStr, q)
+ t.Errorf("Expected '$foo' lvalue in first 'set' condition clause but got "+
+ "'%v': %s\n%v", q.Set[0].lString, queryStr, q)
}
if q.Set[0].rString != "bar" {
- t.Errorf("Expected 'bar' rvalue in first 'set' condition clause but got '%v': %s\n%v", q.Set[0].rString, queryStr, q)
+ t.Errorf("Expected 'bar' rvalue in first 'set' condition clause but got "+
+ "'%v': %s\n%v", q.Set[0].rString, queryStr, q)
}
-
if q.Set[1].lString != "$baz" {
- t.Errorf("Expected '$baz' lvalue in second 'set' condition clause but got '%v': %s\n%v", q.Set[1].lString, queryStr, q)
+ t.Errorf("Expected '$baz' lvalue in second 'set' condition clause but got "+
+ "'%v': %s\n%v", q.Set[1].lString, queryStr, q)
}
if q.Set[1].rString != "12" {
- t.Errorf("Expected '12' rvalue in second 'set' condition clause but got '%v': %s\n%v", q.Set[1].rString, queryStr, q)
+ t.Errorf("Expected '12' rvalue in second 'set' condition clause but got "+
+ "'%v': %s\n%v", q.Set[1].rString, queryStr, q)
}
-
if q.Set[2].lString != "$bay" {
- t.Errorf("Expected '$bay' lvalue in third 'set' condition clause but got '%v': %s\n%v", q.Set[2].lString, queryStr, q)
+ t.Errorf("Expected '$bay' lvalue in third 'set' condition clause but got "+
+ "'%v': %s\n%v", q.Set[2].lString, queryStr, q)
}
if q.Set[2].rString != "$foo" {
- t.Errorf("Expected '$foo' rvalue in third 'set' condition clause but got '%v': %s\n%v", q.Set[2].rString, queryStr, q)
+ t.Errorf("Expected '$foo' rvalue in third 'set' condition clause but got "+
+ "'%v': %s\n%v", q.Set[2].rString, queryStr, q)
}
+ // 'logformat' clause
if q.LogFormat != "generic" {
- t.Errorf("Expected 'generic' logformat got '%v': %s\n%v", q.LogFormat, queryStr, q)
+ t.Errorf("Expected 'generic' logformat got '%v': %s\n%v",
+ q.LogFormat, queryStr, q)
}
}
}
diff --git a/internal/mapr/selectcondition.go b/internal/mapr/selectcondition.go
index d6aa0d4..5cfb8c7 100644
--- a/internal/mapr/selectcondition.go
+++ b/internal/mapr/selectcondition.go
@@ -37,7 +37,6 @@ func (sc selectCondition) String() string {
func makeSelectConditions(tokens []token) ([]selectCondition, error) {
var sel []selectCondition
-
// Parse select aggregation, e.g. sum(foo)
parse := func(token token) (selectCondition, error) {
var sc selectCondition
@@ -52,13 +51,15 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) {
a := strings.Split(tokenStr, "(")
if len(a) != 2 {
- return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + token.str)
+ return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " +
+ token.str)
}
agg := a[0] // Aggregation, e.g. 'sum'
b := strings.Split(a[1], ")")
if len(b) != 2 {
- return sc, errors.New(invalidQuery + "Can't parse 'select' field name from aggregation: " + token.str)
+ return sc, errors.New(invalidQuery + "Can't parse 'select' field name " +
+ "from aggregation: " + token.str)
}
sc.Field = b[0] // Field name, e.g. 'foo'
sc.FieldStorage = tokenStr // e.g. 'sum(foo)'
@@ -79,9 +80,9 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) {
case "len":
sc.Operation = Len
default:
- return sc, errors.New(invalidQuery + "Unknown aggregation in 'select' clause: " + agg)
+ return sc, errors.New(invalidQuery +
+ "Unknown aggregation in 'select' clause: " + agg)
}
-
return sc, nil
}
@@ -92,6 +93,5 @@ func makeSelectConditions(tokens []token) ([]selectCondition, error) {
}
sel = append(sel, sc)
}
-
return sel, nil
}
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 28bb074..97fee11 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -8,25 +8,22 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
+ "github.com/mimecast/dtail/internal/protocol"
)
// Aggregate is for aggregating mapreduce data on the DTail server side.
type Aggregate struct {
done *internal.Done
- // Log lines to process (parsing MAPREDUCE lines).
- Lines chan line.Line
+ // NextLinesCh can be used to use a new line ch.
+ NextLinesCh chan chan line.Line
// Hostname of the current server (used to populate $hostname field).
hostname string
// Signals to serialize data.
serialize chan struct{}
- // Signals to flush data.
- flush chan struct{}
- // Signals that data has been flushed
- flushed chan struct{}
// The mapr query
query *mapr.Query
// The mapr log format parser
@@ -42,7 +39,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
fqdn, err := os.Hostname()
if err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
}
s := strings.Split(fqdn, ".")
@@ -57,38 +54,32 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
parserName = query.LogFormat
}
- logger.Info("Creating log format parser", parserName)
+ dlog.Common.Info("Creating log format parser", parserName)
logParser, err := logformat.NewParser(parserName, query)
if err != nil {
- logger.Error("Could not create log format parser. Falling back to 'generic'", err)
+ dlog.Common.Error("Could not create log format parser. Falling back to 'generic'", err)
if logParser, err = logformat.NewParser("generic", query); err != nil {
- logger.FatalExit("Could not create log format parser", err)
+ dlog.Common.FatalPanic("Could not create log format parser", err)
}
}
- a := Aggregate{
- done: internal.NewDone(),
- Lines: make(chan line.Line, 100),
- serialize: make(chan struct{}),
- flush: make(chan struct{}),
- flushed: make(chan struct{}),
- hostname: s[0],
- query: query,
- parser: logParser,
- }
-
- return &a, nil
+ return &Aggregate{
+ done: internal.NewDone(),
+ NextLinesCh: make(chan chan line.Line, 10),
+ serialize: make(chan struct{}),
+ hostname: s[0],
+ query: query,
+ parser: logParser,
+ }, nil
}
// Shutdown the aggregation engine.
func (a *Aggregate) Shutdown() {
- a.Flush()
a.done.Shutdown()
}
// Start an aggregation.
-func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
-
+func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) {
myCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -101,15 +92,14 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
}
}()
- fieldsCh := a.makeFields(myCtx)
-
+ fieldsCh := a.fieldsFromLines(myCtx)
// Add fields (e.g. via 'set' clause)
if len(a.query.Set) > 0 {
- fieldsCh = a.addFields(myCtx, fieldsCh)
+ fieldsCh = a.setAdditionalFields(myCtx, fieldsCh)
}
-
+ // Periodically pre-aggregate data every a.query.Interval seconds.
go a.aggregateTimer(myCtx)
- a.makeMaprLines(myCtx, fieldsCh, maprLines)
+ a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages)
}
func (a *Aggregate) aggregateTimer(ctx context.Context) {
@@ -123,25 +113,46 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) {
}
}
-func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
- ch := make(chan map[string]string)
+func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string {
+ fieldsCh := make(chan map[string]string)
go func() {
- defer close(ch)
+ defer close(fieldsCh)
+ var lines chan line.Line
+
+ // Gather first lines channel (first input file)
+ select {
+ case lines = <-a.NextLinesCh:
+ case <-ctx.Done():
+ return
+ }
for {
select {
- case line, ok := <-a.Lines:
+ case line, ok := <-lines:
if !ok {
- return
+ select {
+ case lines = <-a.NextLinesCh:
+ // Have a new lines channel (e.g. new input file)
+ case <-ctx.Done():
+ default:
+ // No new lines channel found.
+ return
+ }
}
- maprLine := strings.TrimSpace(string(line.Content))
+ maprLine := strings.TrimSpace(line.Content.String())
fields, err := a.parser.MakeFields(maprLine)
- logger.Debug(fields, err)
+ // Can't recycle it here yet, as field slices are still
+ // TODO: Add unit test reading from multiple mapreduce files lines.
+ // TODO: Add capability to recycle this bytes buffer.
+ //pool.RecycleBytesBuffer(line.Content)
if err != nil {
- logger.Error(err)
+ // Should fields be ignored anyway?
+ if err != logformat.ErrIgnoreFields {
+ dlog.Common.Error(fields, err)
+ }
continue
}
if !a.query.WhereClause(fields) {
@@ -149,7 +160,7 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
}
select {
- case ch <- fields:
+ case fieldsCh <- fields:
case <-ctx.Done():
}
case <-ctx.Done():
@@ -158,45 +169,42 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
}
}()
- return ch
+ return fieldsCh
}
-func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string {
- ch := make(chan map[string]string)
+func (a *Aggregate) setAdditionalFields(ctx context.Context,
+ fieldsCh <-chan map[string]string) <-chan map[string]string {
+ newFieldsCh := make(chan map[string]string)
go func() {
- defer close(ch)
-
+ defer close(newFieldsCh)
for {
- // fieldsCh will be closed via 'makeFields' if ctx is done
fields, ok := <-fieldsCh
if !ok {
return
}
if err := a.query.SetClause(fields); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
}
select {
- case ch <- fields:
+ case newFieldsCh <- fields:
case <-ctx.Done():
}
}
}()
-
- return ch
+ return newFieldsCh
}
-func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) {
- group := mapr.NewGroupSet()
+func (a *Aggregate) aggregateAndSerialize(ctx context.Context,
+ fieldsCh <-chan map[string]string, maprMessages chan<- string) {
+ group := mapr.NewGroupSet()
serialize := func() {
- logger.Info("Serializing mapreduce result")
- group.Serialize(ctx, maprLines)
+ dlog.Common.Info("Serializing mapreduce result")
+ group.Serialize(ctx, maprMessages)
group = mapr.NewGroupSet()
- logger.Info("Done serializing mapreduce result")
}
-
for {
select {
case fields, ok := <-fieldsCh:
@@ -207,9 +215,6 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin
a.aggregate(group, fields)
case <-a.serialize:
serialize()
- case <-a.flush:
- serialize()
- a.flushed <- struct{}{}
case <-ctx.Done():
return
}
@@ -217,12 +222,10 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin
}
func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
- //logger.Trace("Aggregating", group, fields)
var sb strings.Builder
-
for i, field := range a.query.GroupBy {
if i > 0 {
- sb.WriteString(" ")
+ sb.WriteString(protocol.AggregateGroupKeyCombinator)
}
if val, ok := fields[field]; ok {
sb.WriteString(val)
@@ -235,7 +238,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
for _, sc := range a.query.Select {
if val, ok := fields[sc.Field]; ok {
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
continue
}
addedSample = true
@@ -246,8 +249,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
set.Samples++
return
}
-
- logger.Trace("Aggregated data locally without adding new samples")
+ dlog.Common.Trace("Aggregated data locally without adding new samples")
}
// Serialize all the aggregated data.
@@ -255,28 +257,7 @@ func (a *Aggregate) Serialize(ctx context.Context) {
select {
case a.serialize <- struct{}{}:
case <-time.After(time.Minute):
- logger.Warn("Starting to serialize mapredice data takes over a minute")
+ dlog.Common.Warn("Starting to serialize mapredice data takes over a minute")
case <-ctx.Done():
}
}
-
-// Flush all data.
-func (a *Aggregate) Flush() {
- select {
- case a.flush <- struct{}{}:
- logger.Info("Flushing mapreduce data")
- case <-time.After(time.Minute):
- logger.Warn("Starting to flush mapreduce data takes over a minute")
- return
- case <-a.done.Done():
- return
- }
-
- select {
- case <-a.flushed:
- logger.Info("Done flushing")
- case <-time.After(time.Minute):
- logger.Warn("Waiting for data to be flushed takes over a minute")
- case <-a.done.Done():
- }
-}
diff --git a/internal/mapr/setclause.go b/internal/mapr/setclause.go
index b4c2f73..1843d31 100644
--- a/internal/mapr/setclause.go
+++ b/internal/mapr/setclause.go
@@ -7,7 +7,6 @@ func (q *Query) SetClause(fields map[string]string) error {
if !ok {
continue
}
-
switch sc.rType {
case FunctionStack:
fields[sc.lString] = sc.functionStack.Call(value)
@@ -15,6 +14,5 @@ func (q *Query) SetClause(fields map[string]string) error {
fields[sc.lString] = value
}
}
-
return nil
}
diff --git a/internal/mapr/setcondition.go b/internal/mapr/setcondition.go
index 8c5cfc9..92b21f4 100644
--- a/internal/mapr/setcondition.go
+++ b/internal/mapr/setcondition.go
@@ -39,20 +39,22 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) {
switch setOp {
case "=":
default:
- return sc, nil, errors.New(invalidQuery + "Unknown operation in 'set' clause: " + setOp)
+ return sc, nil, errors.New(invalidQuery + "Unknown operation in 'set' " +
+ "clause: " + setOp)
}
if !tokens[0].isBareword {
- return sc, nil, errors.New(invalidQuery + "Expected bareword at 'set' clause's lValue: " + tokens[0].str)
+ return sc, nil, errors.New(invalidQuery + "Expected bareword at 'set' " +
+ "clause's lValue: " + tokens[0].str)
}
-
sc.lString = tokens[0].str
if !strings.HasPrefix(sc.lString, "$") {
- return sc, nil, errors.New(invalidQuery + "Expected field variable name (starting with $) at 'set' clause's lValue: " + tokens[0].str)
+ return sc, nil, errors.New(invalidQuery + "Expected field variable name " +
+ "(starting with $) at 'set' clause's lValue: " + tokens[0].str)
}
sc.rType = Field
-
rString := tokens[2].str
+
// Seems like a function call?
if strings.HasSuffix(rString, ")") {
functionStack, functionArg, err := funcs.NewFunctionStack(tokens[2].str)
@@ -72,7 +74,6 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) {
} else {
sc.rType = Field
}
-
return sc, tokens[3:], nil
}
@@ -84,10 +85,8 @@ func makeSetConditions(tokens []token) (set []setCondition, err error) {
if err != nil {
return nil, err
}
-
set = append(set, sc)
tokens = tokensConsumeOptional(tokens, ",")
}
-
return
}
diff --git a/internal/mapr/token.go b/internal/mapr/token.go
index 8972188..6ac7631 100644
--- a/internal/mapr/token.go
+++ b/internal/mapr/token.go
@@ -4,7 +4,8 @@ import (
"strings"
)
-var keywords = [...]string{"select", "from", "where", "set", "group", "rorder", "order", "interval", "limit", "outfile", "logformat"}
+var keywords = [...]string{"select", "from", "where", "set", "group", "rorder",
+ "order", "interval", "limit", "outfile", "logformat"}
// Represents a parsed token, used to parse the mapr query.
type token struct {
@@ -16,13 +17,11 @@ func (t token) isKeyword() bool {
if !t.isBareword {
return false
}
-
for _, keyword := range keywords {
if strings.ToLower(t.str) == keyword {
return true
}
}
-
return false
}
@@ -32,7 +31,6 @@ func (t token) String() string {
func tokenize(queryStr string) []token {
var tokens []token
-
for i, part := range strings.Split(queryStr, "\"") {
// Even i, means that it is not a quoted string
if i%2 == 0 {
@@ -53,17 +51,15 @@ func tokenize(queryStr string) []token {
}
tokens = append(tokens, token)
}
-
return tokens
}
func tokensConsume(tokens []token) ([]token, []token) {
- //logger.Trace("=====================")
+ //dlog.Common.Trace("=====================")
var consumed []token
-
for i, t := range tokens {
if t.isKeyword() {
- //logger.Trace("keyword", t)
+ //dlog.Common.Trace("keyword", t)
return tokens[i:], consumed
}
// strip escapes, such as ` from `foo`, this allows to use keywords as field names
@@ -73,7 +69,7 @@ func tokensConsume(tokens []token) ([]token, []token) {
}
if t.str[0] == '`' && t.str[length-1] == '`' {
stripped := t.str[1 : length-1]
- //logger.Trace("stripped", stripped)
+ //dlog.Common.Trace("stripped", stripped)
t := token{
str: stripped,
isBareword: t.isBareword,
@@ -81,11 +77,10 @@ func tokensConsume(tokens []token) ([]token, []token) {
consumed = append(consumed, t)
continue
}
- //logger.Trace("bare", token)
+ //dlog.Common.Trace("bare", token)
consumed = append(consumed, t)
}
-
- //logger.Trace("result", consumed)
+ //dlog.Common.Trace("result", consumed)
return nil, consumed
}
@@ -95,7 +90,6 @@ func tokensConsumeStr(tokens []token) ([]token, []string) {
for _, token := range found {
strings = append(strings, token.str)
}
-
return tokens, strings
}
@@ -106,6 +100,5 @@ func tokensConsumeOptional(tokens []token, optional string) []token {
if strings.ToLower(tokens[0].str) == strings.ToLower(optional) {
return tokens[1:]
}
-
return tokens
}
diff --git a/internal/mapr/whereclause.go b/internal/mapr/whereclause.go
index cc1c164..d9f32eb 100644
--- a/internal/mapr/whereclause.go
+++ b/internal/mapr/whereclause.go
@@ -3,14 +3,13 @@ package mapr
import (
"strconv"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// WhereClause interprets the where clause of the mapreduce query.
func (q *Query) WhereClause(fields map[string]string) bool {
for _, wc := range q.Where {
var ok bool
-
if wc.Operation > FloatOperation {
var lValue, rValue float64
if lValue, ok = whereClauseFloatValue(fields, wc.lString, wc.lFloat, wc.lType); !ok {
@@ -36,11 +35,12 @@ func (q *Query) WhereClause(fields map[string]string) bool {
return false
}
}
-
return true
}
-func whereClauseFloatValue(fields map[string]string, str string, float float64, t fieldType) (float64, bool) {
+func whereClauseFloatValue(fields map[string]string, str string, float float64,
+ t fieldType) (float64, bool) {
+
switch t {
case Float:
return float, true
@@ -55,12 +55,14 @@ func whereClauseFloatValue(fields map[string]string, str string, float float64,
}
return f, true
default:
- logger.Error("Unexpected argument in 'where' clause", str, float, t)
+ dlog.Common.Error("Unexpected argument in 'where' clause", str, float, t)
return 0, false
}
}
-func whereClauseStringValue(fields map[string]string, str string, t fieldType) (string, bool) {
+func whereClauseStringValue(fields map[string]string, str string,
+ t fieldType) (string, bool) {
+
switch t {
case Field:
value, ok := fields[str]
@@ -71,7 +73,7 @@ func whereClauseStringValue(fields map[string]string, str string, t fieldType) (
case String:
return str, true
default:
- logger.Error("Unexpected argument in 'where' clause", str, t)
+ dlog.Common.Error("Unexpected argument in 'where' clause", str, t)
return str, false
}
}
diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go
index 7a60dba..280dcfb 100644
--- a/internal/mapr/wherecondition.go
+++ b/internal/mapr/wherecondition.go
@@ -6,7 +6,7 @@ import (
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// QueryOperation determines the mapreduce operation.
@@ -46,15 +46,18 @@ type whereCondition struct {
}
func (wc *whereCondition) String() string {
- return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,lType:%s,rString:%s,rFloat:%v,rType:%s)",
- wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, wc.rFloat, wc.rType.String())
+ return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,"+
+ "lType:%s,rString:%s,rFloat:%v,rType:%s)",
+ wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString,
+ wc.rFloat, wc.rType.String())
}
func makeWhereConditions(tokens []token) (where []whereCondition, err error) {
parse := func(tokens []token) (whereCondition, []token, error) {
var wc whereCondition
if len(tokens) < 3 {
- return wc, nil, errors.New(invalidQuery + "Not enough arguments in 'where' clause")
+ err := errors.New(invalidQuery + "Not enough arguments in 'where' clause")
+ return wc, nil, err
}
whereOp := strings.ToLower(tokens[1].str)
@@ -94,7 +97,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) {
case "nhassuffix":
wc.Operation = StringNotHasSuffix
default:
- return wc, nil, errors.New(invalidQuery + "Unknown operation in 'where' clause: " + whereOp)
+ return wc, nil, errors.New(invalidQuery +
+ "Unknown operation in 'where' clause: " + whereOp)
}
wc.lString = tokens[0].str
@@ -102,7 +106,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) {
if wc.Operation > FloatOperation {
if !tokens[0].isBareword {
- return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's lValue: " + tokens[0].str)
+ return wc, nil, errors.New(invalidQuery +
+ "Expected bareword at 'where' clause's lValue: " + tokens[0].str)
}
if f, err := strconv.ParseFloat(wc.lString, 64); err == nil {
wc.lFloat = f
@@ -112,7 +117,8 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) {
}
if !tokens[2].isBareword {
- return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's rValue: " + tokens[2].str)
+ return wc, nil, errors.New(invalidQuery +
+ "Expected bareword at 'where' clause's rValue: " + tokens[2].str)
}
if f, err := strconv.ParseFloat(wc.rString, 64); err == nil {
wc.rFloat = f
@@ -133,23 +139,19 @@ func makeWhereConditions(tokens []token) (where []whereCondition, err error) {
} else {
wc.rType = String
}
-
return wc, tokens[3:], nil
}
for len(tokens) > 0 {
var wc whereCondition
var err error
-
wc, tokens, err = parse(tokens)
if err != nil {
return nil, err
}
-
where = append(where, wc)
tokens = tokensConsumeOptional(tokens, "and")
}
-
return
}
@@ -168,9 +170,8 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool {
case FloatGe:
return lValue >= rValue
default:
- logger.Error("Unknown float operation", lValue, wc.Operation, rValue)
+ dlog.Common.Error("Unknown float operation", lValue, wc.Operation, rValue)
}
-
return false
}
@@ -193,8 +194,7 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool {
case StringNotHasSuffix:
return !strings.HasSuffix(lValue, rValue)
default:
- logger.Error("Unknown string operation", lValue, wc.Operation, rValue)
+ dlog.Common.Error("Unknown string operation", lValue, wc.Operation, rValue)
}
-
return false
}