summaryrefslogtreecommitdiff
path: root/internal/mapr/query.go
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-07-03 14:13:13 +0100
committerPaul Buetow <pbuetow@mimecast.com>2020-08-13 11:37:24 +0100
commitc5a0ba7d29da7effa0ae18bffa10fc0be359b8e7 (patch)
treede4874740a5ddeb6eb29c887f6e121c61a1f8f3c /internal/mapr/query.go
parent8f9f9766cecec4a42ffb4d14ba9b7efc2ed204ad (diff)
bump up version to 3.0.0. can run continuous background mapreduce queries, useful for log file monitorig for example. breaking protocol change which allows to mapreduce aggreate messages containing the default field separator |. add of more unit tests. add logformat mapreduce query keyword. add set mapreduce clause support and support to evaluate built-in functions such as md5sum() and maskdigits().v3.0.0
Diffstat (limited to 'internal/mapr/query.go')
-rw-r--r--internal/mapr/query.go103
1 files changed, 29 insertions, 74 deletions
diff --git a/internal/mapr/query.go b/internal/mapr/query.go
index 6dff792..7f6b63c 100644
--- a/internal/mapr/query.go
+++ b/internal/mapr/query.go
@@ -20,6 +20,7 @@ type Query struct {
Select []selectCondition
Table string
Where []whereCondition
+ Set []setCondition
GroupBy []string
OrderBy string
ReverseOrder bool
@@ -29,13 +30,15 @@ type Query struct {
Outfile string
RawQuery string
tokens []token
+ LogFormat string
}
func (q Query) String() string {
- return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,GroupBy:%v,GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,RawQuery:%s,tokens:%v)",
+ return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,Set:%vGroupBy:%v,GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,RawQuery:%s,tokens:%v,LogFormat:%s)",
q.Select,
q.Table,
q.Where,
+ q.Set,
q.GroupBy,
q.GroupKey,
q.OrderBy,
@@ -44,7 +47,8 @@ func (q Query) String() string {
q.Limit,
q.Outfile,
q.RawQuery,
- q.tokens)
+ q.tokens,
+ q.LogFormat)
}
// NewQuery returns a new mapreduce query.
@@ -68,10 +72,16 @@ func NewQuery(queryStr string) (*Query, error) {
return &q, err
}
+// HasOutfile returns true if query result will be written to a CVS output file.
func (q *Query) HasOutfile() bool {
return q.Outfile != ""
}
+// Has is a helper to determine whether a query contains a substring
+func (q *Query) Has(what string) bool {
+ return strings.Contains(q.RawQuery, what)
+}
+
func (q *Query) parse(tokens []token) error {
var found []token
var err error
@@ -86,14 +96,23 @@ func (q *Query) parse(tokens []token) error {
}
case "from":
tokens, found = tokensConsume(tokens[1:])
- if len(found) > 0 {
- q.Table = strings.ToUpper(found[0].str)
+ if len(found) == 0 {
+ return errors.New(invalidQuery + "expected table name after 'from'")
+ }
+ if len(found) > 1 {
+ return errors.New(invalidQuery + "expected only one table name after 'from'")
}
+ q.Table = strings.ToUpper(found[0].str)
case "where":
tokens, found = tokensConsume(tokens[1:])
if q.Where, err = makeWhereConditions(found); err != nil {
return err
}
+ case "set":
+ tokens, found = tokensConsume(tokens[1:])
+ if q.Set, err = makeSetConditions(found); err != nil {
+ return err
+ }
case "group":
tokens = tokensConsumeOptional(tokens[1:], "by")
if tokens == nil || len(tokens) < 1 {
@@ -147,6 +166,12 @@ func (q *Query) parse(tokens []token) error {
return errors.New(invalidQuery + unexpectedEnd)
}
q.Outfile = found[0].str
+ case "logformat":
+ tokens, found = tokensConsume(tokens[1:])
+ if len(found) == 0 {
+ return errors.New(invalidQuery + unexpectedEnd)
+ }
+ q.LogFormat = found[0].str
default:
return errors.New(invalidQuery + "Unexpected keyword " + tokens[0].str)
}
@@ -181,73 +206,3 @@ func (q *Query) parse(tokens []token) error {
return nil
}
-
-// WhereClause interprets the where clause of the mapreduce query.
-func (q *Query) WhereClause(fields map[string]string) bool {
- floatValue := func(str string, float float64, t whereType) (float64, bool) {
- switch t {
- case Float:
- return float, true
- case Field:
- value, ok := fields[str]
- if !ok {
- return 0, false
- }
- f, err := strconv.ParseFloat(value, 64)
- if err != nil {
- return 0, false
- }
- return f, true
- default:
- logger.Error("Unexpected argument in 'where' clause", str, float, t)
- return 0, false
- }
- }
-
- stringValue := func(str string, t whereType) (string, bool) {
- switch t {
- case Field:
- value, ok := fields[str]
- if !ok {
- return str, false
- }
- return value, true
- case String:
- return str, true
- default:
- logger.Error("Unexpected argument in 'where' clause", str, t)
- return str, false
- }
- }
-
- for _, wc := range q.Where {
- var ok bool
-
- if wc.Operation > FloatOperation {
- var lValue, rValue float64
- if lValue, ok = floatValue(wc.lString, wc.lFloat, wc.lType); !ok {
- return false
- }
- if rValue, ok = floatValue(wc.rString, wc.rFloat, wc.rType); !ok {
- return false
- }
- if ok = wc.floatClause(lValue, rValue); !ok {
- return false
- }
- continue
- }
-
- var lValue, rValue string
- if lValue, ok = stringValue(wc.lString, wc.lType); !ok {
- return false
- }
- if rValue, ok = stringValue(wc.rString, wc.rType); !ok {
- return false
- }
- if ok = wc.stringClause(lValue, rValue); !ok {
- return false
- }
- }
-
- return true
-}