summaryrefslogtreecommitdiff
path: root/mapr
diff options
context:
space:
mode:
Diffstat (limited to 'mapr')
-rw-r--r--mapr/aggregateset.go185
-rw-r--r--mapr/client/aggregate.go100
-rw-r--r--mapr/globalgroupset.go100
-rw-r--r--mapr/groupset.go178
-rw-r--r--mapr/logformat/default.go23
-rw-r--r--mapr/logformat/default_test.go35
-rw-r--r--mapr/logformat/parser.go75
-rw-r--r--mapr/query.go245
-rw-r--r--mapr/query_test.go149
-rw-r--r--mapr/selectcondition.go96
-rw-r--r--mapr/server/aggregate.go170
-rw-r--r--mapr/token.go108
-rw-r--r--mapr/wherecondition.go193
13 files changed, 1657 insertions, 0 deletions
diff --git a/mapr/aggregateset.go b/mapr/aggregateset.go
new file mode 100644
index 0000000..2096c3c
--- /dev/null
+++ b/mapr/aggregateset.go
@@ -0,0 +1,185 @@
+package mapr
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+// AggregateSet represents aggregated key/value pairs from the
+// MAPREDUCE log lines. These could be either string values or float
+// values.
+type AggregateSet struct {
+ Samples int
+ FValues map[string]float64
+ SValues map[string]string
+}
+
+// NewAggregateSet creates a new empty aggregate set.
+func NewAggregateSet() *AggregateSet {
+ return &AggregateSet{
+ FValues: make(map[string]float64),
+ SValues: make(map[string]string),
+ }
+}
+
+// String representation of aggregate set.
+func (s *AggregateSet) String() string {
+ return fmt.Sprintf("AggregateSet(Samples:%d,FValues:%v,SValues:%v)",
+ s.Samples, s.FValues, s.SValues)
+}
+
+// Merge one aggregate set into this one.
+func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
+ s.Samples += set.Samples
+ //logger.Trace("Merge", set)
+
+ for _, sc := range query.Select {
+ storage := sc.FieldStorage
+ switch sc.Operation {
+ case Count:
+ fallthrough
+ case Sum:
+ fallthrough
+ case Avg:
+ value := set.FValues[storage]
+ s.addFloat(storage, value)
+ case Min:
+ value := set.FValues[storage]
+ s.addFloatMin(storage, value)
+ case Max:
+ value := set.FValues[storage]
+ s.addFloatMax(storage, value)
+ case Last:
+ value := set.SValues[storage]
+ s.setString(storage, value)
+ case Len:
+ s.setString(storage, set.SValues[storage])
+ s.setFloat(storage, set.FValues[storage])
+ default:
+ return fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
+ }
+ }
+ return nil
+}
+
+// Serialize the aggregate set so it can be sent over the wire.
+func (s *AggregateSet) Serialize(groupKey string, ch chan<- string, stop chan struct{}) {
+ //logger.Trace("Serialising mapr.AggregateSet", s)
+ var sb strings.Builder
+
+ sb.WriteString(groupKey)
+ sb.WriteString("|")
+ sb.WriteString(fmt.Sprintf("%d|", s.Samples))
+
+ for k, v := range s.FValues {
+ sb.WriteString(k)
+ sb.WriteString("=")
+ sb.WriteString(fmt.Sprintf("%v|", v))
+ }
+
+ for k, v := range s.SValues {
+ sb.WriteString(k)
+ sb.WriteString("=")
+ sb.WriteString(v)
+ sb.WriteString("|")
+ }
+
+ select {
+ case ch <- sb.String():
+ case <-stop:
+ }
+}
+
+// Add a float value.
+func (s *AggregateSet) addFloat(key string, value float64) {
+ if _, ok := s.FValues[key]; !ok {
+ s.FValues[key] = value
+ return
+ }
+ s.FValues[key] += value
+}
+
+// Add a float minimum value.
+func (s *AggregateSet) addFloatMin(key string, value float64) {
+ f, ok := s.FValues[key]
+ if !ok {
+ s.FValues[key] = value
+ return
+ }
+
+ if f > value {
+ s.FValues[key] = value
+ }
+}
+
+// Add a float maximum value.
+func (s *AggregateSet) addFloatMax(key string, value float64) {
+ f, ok := s.FValues[key]
+ if !ok {
+ s.FValues[key] = value
+ return
+ }
+
+ if f < value {
+ s.FValues[key] = value
+ }
+}
+
+// Set a string.
+func (s *AggregateSet) setString(key, value string) {
+ s.SValues[key] = value
+}
+
+// Set a float.
+func (s *AggregateSet) setFloat(key string, value float64) {
+ s.FValues[key] = value
+}
+
+// Aggregate data to the aggregate set.
+func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) {
+ var f float64
+
+ // First check if we can aggregate anything without converting value to float.
+ switch agg {
+ case Count:
+ if clientAggregation {
+ f, err = strconv.ParseFloat(value, 64)
+ if err != nil {
+ return
+ }
+ s.addFloat(key, f)
+ return
+ }
+ s.addFloat(key, 1)
+ return
+ case Last:
+ s.setString(key, value)
+ return
+ case Len:
+ s.setString(key, value)
+ s.setFloat(key, float64(len(value)))
+ return
+ default:
+ }
+
+ // No, we have to convert to float.
+ f, err = strconv.ParseFloat(value, 64)
+ if err != nil {
+ return
+ }
+
+ switch agg {
+ case Sum:
+ fallthrough
+ case Avg:
+ s.addFloat(key, f)
+ case Min:
+ s.addFloatMin(key, f)
+ case Max:
+ s.addFloatMax(key, f)
+ default:
+ err = fmt.Errorf("Unknown aggregation method '%v'", agg)
+ }
+ return
+}
diff --git a/mapr/client/aggregate.go b/mapr/client/aggregate.go
new file mode 100644
index 0000000..b9443bc
--- /dev/null
+++ b/mapr/client/aggregate.go
@@ -0,0 +1,100 @@
+package client
+
+import (
+ "dtail/logger"
+ "dtail/mapr"
+ "strconv"
+ "strings"
+)
+
+// Aggregate mapreduce data on the DTail client side.
+type Aggregate struct {
+ // This is the mapr query specified on the command line.
+ query *mapr.Query
+ // This represents aggregated data of a single remote server.
+ group *mapr.GroupSet
+ // This represents the merged aggregated data of all servers.
+ globalGroup *mapr.GlobalGroupSet
+ stop chan struct{}
+ // The server we aggregate the data for (logging and debugging purposes only)
+ server string
+}
+
+// NewAggregate create new client aggregator.
+func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate {
+ return &Aggregate{
+ query: query,
+ group: mapr.NewGroupSet(),
+ globalGroup: globalGroup,
+ stop: make(chan struct{}),
+ server: server,
+ }
+}
+
+// Aggregate data from mapr log line into local (and global) group sets.
+func (a *Aggregate) Aggregate(parts []string) {
+ select {
+ case <-a.stop:
+ logger.Error("Client aggregator stopped for server, not processing new data", a.server)
+ return
+ default:
+ }
+
+ groupKey := parts[0]
+ samples, err := strconv.Atoi(parts[1])
+ if err != nil {
+ logger.FatalExit(parts, err)
+ }
+ fields := a.makeFields(parts[2:])
+ set := a.group.GetSet(groupKey)
+
+ var addedSamples bool
+ for _, sc := range a.query.Select {
+ if val, ok := fields[sc.FieldStorage]; ok {
+ if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil {
+ logger.Error(err)
+ continue
+ }
+ addedSamples = true
+ }
+ }
+ if addedSamples {
+ set.Samples += samples
+ }
+
+ // Merge data from group into global group.
+ isMerged, err := a.globalGroup.MergeNoblock(a.query, a.group)
+ if err != nil {
+ panic(err)
+ }
+ if isMerged {
+ // Re-init local group (make it empty again).
+ a.group.InitSet()
+ }
+}
+
+// Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"].
+func (a *Aggregate) makeFields(parts []string) map[string]string {
+ fields := make(map[string]string, len(parts))
+
+ for _, part := range parts {
+ kv := strings.Split(part, "=")
+ if len(kv) != 2 {
+ continue
+ }
+ fields[kv[0]] = kv[1]
+ }
+
+ return fields
+}
+
+// Stop the client side mapreduce aggregator.
+func (a *Aggregate) Stop() {
+ logger.Debug("Stopping client mapreduce aggregator")
+ close(a.stop)
+
+ err := a.globalGroup.Merge(a.query, a.group)
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/mapr/globalgroupset.go b/mapr/globalgroupset.go
new file mode 100644
index 0000000..cfab506
--- /dev/null
+++ b/mapr/globalgroupset.go
@@ -0,0 +1,100 @@
+package mapr
+
+import (
+ "fmt"
+)
+
+// GlobalGroupSet is used on the dtail client to merge multiple group sets
+// (one group set per remote server) to one single global group set.
+type GlobalGroupSet struct {
+ GroupSet
+ semaphore chan struct{}
+}
+
+// NewGlobalGroupSet creates a new empty global group set.
+func NewGlobalGroupSet() *GlobalGroupSet {
+ g := GlobalGroupSet{
+ semaphore: make(chan struct{}, 1),
+ }
+ g.InitSet()
+
+ return &g
+}
+
+// String representation of the global group set.
+func (g *GlobalGroupSet) String() string {
+ return fmt.Sprintf("GlobalGroupSet(%s)", g.GroupSet.String())
+}
+
+// Merge (blocking) a group set into the global group set.
+func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error {
+ g.semaphore <- struct{}{}
+ defer func() { <-g.semaphore }()
+
+ return g.merge(query, group)
+}
+
+// MergeNoblock merges (non-blocking) a group set into the global group set.
+func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, error) {
+ select {
+ case g.semaphore <- struct{}{}:
+ err := g.merge(query, group)
+ <-g.semaphore
+ return true, err
+ default:
+ return false, nil
+ }
+}
+
+// Merge a group set into the global group set.
+func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error {
+ for groupKey, set := range group.sets {
+ s := g.GetSet(groupKey)
+ if err := s.Merge(query, set); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// IsEmpty determines whether the global group set has any data in it.
+func (g *GlobalGroupSet) IsEmpty() bool {
+ return g.NumSets() == 0
+}
+
+// NumSets determines the number of sets.
+func (g *GlobalGroupSet) NumSets() int {
+ g.semaphore <- struct{}{}
+ defer func() { <-g.semaphore }()
+
+ return len(g.sets)
+}
+
+// SwapOut teturn the underlying group set and create a new empty one, so
+// that the global group set is empty again and can aggregate new data.
+func (g *GlobalGroupSet) SwapOut() *GroupSet {
+ g.semaphore <- struct{}{}
+ defer func() { <-g.semaphore }()
+
+ set := &GroupSet{sets: g.sets}
+ g.InitSet()
+
+ return set
+}
+
+// WriteResult writes the result of a mapreduce aggregation to an outfile.
+func (g *GlobalGroupSet) WriteResult(query *Query) error {
+ g.semaphore <- struct{}{}
+ defer func() { <-g.semaphore }()
+
+ return g.GroupSet.WriteResult(query)
+}
+
+// Result returns the result of the mapreduce aggregation as a string.
+func (g *GlobalGroupSet) Result(query *Query) (string, int, error) {
+ g.semaphore <- struct{}{}
+ defer func() { <-g.semaphore }()
+
+ return g.GroupSet.Result(query)
+}
diff --git a/mapr/groupset.go b/mapr/groupset.go
new file mode 100644
index 0000000..d8f9379
--- /dev/null
+++ b/mapr/groupset.go
@@ -0,0 +1,178 @@
+package mapr
+
+import (
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "sort"
+ "strconv"
+ "strings"
+)
+
+// GroupSet represents a map of aggregate sets. The group sets
+// are requierd by the "group by" mapr clause, whereas the
+// group set map keys are the values of the "group by" arguments.
+// E.g. "group by $cid" would create one aggregate set and one map
+// entry per customer id.
+type GroupSet struct {
+ sets map[string]*AggregateSet
+}
+
+// NewGroupSet returns a new empty group set.
+func NewGroupSet() *GroupSet {
+ g := GroupSet{}
+ g.InitSet()
+ return &g
+}
+
+// String representation of the group set.
+func (g *GroupSet) String() string {
+ return fmt.Sprintf("GroupSet(%v)", g.sets)
+}
+
+// InitSet makes the group set empty (initialize).
+func (g *GroupSet) InitSet() {
+ g.sets = make(map[string]*AggregateSet)
+}
+
+// GetSet gets a specific aggregate set from the group set.
+func (g *GroupSet) GetSet(groupKey string) *AggregateSet {
+ set, ok := g.sets[groupKey]
+ if !ok {
+ set = NewAggregateSet()
+ g.sets[groupKey] = set
+ }
+ return set
+}
+
+// Serialize the group set (e.g. to send it over the wire).
+func (g *GroupSet) Serialize(ch chan<- string, stop chan struct{}) {
+ for groupKey, set := range g.sets {
+ set.Serialize(groupKey, ch, stop)
+ }
+}
+
+// Result returns a nicely formated result of the query from the group set.
+func (g *GroupSet) Result(query *Query) (string, int, error) {
+ return g.limitedResult(query, query.Limit, "\t", " ", false)
+}
+
+// WriteResult writes the result to an outfile.
+func (g *GroupSet) WriteResult(query *Query) error {
+ if query.Outfile == "" {
+ return errors.New("No outfile specified")
+ }
+
+ // -1: Don't limit the result, include all data sets
+ result, _, err := g.limitedResult(query, -1, "", ",", true)
+ if err != nil {
+ return err
+ }
+
+ return ioutil.WriteFile(query.Outfile, []byte(result), 0644)
+}
+
+// Return a nicely formated result of the query from the group set.
+func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) {
+ type result struct {
+ groupKey string
+ resultStr string
+ orderBy float64
+ }
+
+ var resultSlice []result
+
+ for groupKey, set := range g.sets {
+ var sb strings.Builder
+ r := result{groupKey: groupKey}
+
+ lastIndex := len(query.Select) - 1
+ for i, sc := range query.Select {
+ storage := sc.FieldStorage
+ orderByThis := storage == query.OrderBy
+
+ switch sc.Operation {
+ case Count:
+ value := set.FValues[storage]
+ sb.WriteString(fmt.Sprintf("%d", int(value)))
+ if orderByThis {
+ r.orderBy = value
+ }
+ case Len:
+ fallthrough
+ case Sum:
+ fallthrough
+ case Min:
+ fallthrough
+ case Max:
+ value := set.FValues[storage]
+ sb.WriteString(fmt.Sprintf("%f", value))
+ if orderByThis {
+ r.orderBy = value
+ }
+ case Last:
+ value := set.SValues[storage]
+ if orderByThis {
+ f, err := strconv.ParseFloat(value, 64)
+ if err == nil {
+ r.orderBy = f
+ }
+ }
+ sb.WriteString(value)
+ case Avg:
+ value := set.FValues[storage] / float64(set.Samples)
+ sb.WriteString(fmt.Sprintf("%f", value))
+ if orderByThis {
+ r.orderBy = value
+ }
+ default:
+ return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
+ }
+ if i != lastIndex {
+ sb.WriteString(fieldSeparator)
+ }
+ }
+
+ r.resultStr = sb.String()
+ resultSlice = append(resultSlice, r)
+ }
+
+ if query.OrderBy != "" {
+ if query.ReverseOrder {
+ sort.SliceStable(resultSlice, func(i, j int) bool {
+ return resultSlice[i].orderBy < resultSlice[j].orderBy
+ })
+ } else {
+ sort.SliceStable(resultSlice, func(i, j int) bool {
+ return resultSlice[i].orderBy > resultSlice[j].orderBy
+ })
+ }
+ }
+
+ var sb strings.Builder
+
+ // Write header first
+ if addHeader {
+ lastIndex := len(query.Select) - 1
+ sb.WriteString(lineStarter)
+ for i, sc := range query.Select {
+ sb.WriteString(sc.FieldStorage)
+ if i != lastIndex {
+ sb.WriteString(fieldSeparator)
+ }
+ }
+ sb.WriteString("\n")
+ }
+
+ // And now write the data
+ for i, r := range resultSlice {
+ if i == limit {
+ break
+ }
+ sb.WriteString(lineStarter)
+ sb.WriteString(r.resultStr)
+ sb.WriteString("\n")
+ }
+
+ return sb.String(), len(resultSlice), nil
+}
diff --git a/mapr/logformat/default.go b/mapr/logformat/default.go
new file mode 100644
index 0000000..f0df5bc
--- /dev/null
+++ b/mapr/logformat/default.go
@@ -0,0 +1,23 @@
+package logformat
+
+import (
+ "errors"
+ "strings"
+)
+
+// MakeFieldsDEFAULT is the default log file mapreduce parser.
+func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
+ fields := make(map[string]string, 20)
+ splitted := strings.Split(maprLine, "|")
+
+ fields["$hostname"] = p.hostname
+
+ for _, kv := range splitted {
+ keyAndValue := strings.SplitN(kv, "=", 2)
+ if len(keyAndValue) != 2 {
+ return fields, errors.New("Error parsing mapr token: " + kv)
+ }
+ fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
+ }
+ return fields, nil
+}
diff --git a/mapr/logformat/default_test.go b/mapr/logformat/default_test.go
new file mode 100644
index 0000000..a3c47fb
--- /dev/null
+++ b/mapr/logformat/default_test.go
@@ -0,0 +1,35 @@
+package logformat
+
+import (
+ "testing"
+)
+
+func TestDefaultLogFormat(t *testing.T) {
+ parser, err := NewParser("default")
+ if err != nil {
+ t.Errorf("Unable to create parser: %s", err.Error())
+ }
+
+ fields, err := parser.MakeFields("foo=bar|baz=bay")
+
+ if err != nil {
+ t.Errorf("Unable to parse: %s", err.Error())
+ }
+
+ if bar, ok := fields["foo"]; !ok {
+ t.Errorf("Expected field 'foo', but no such field there\n")
+ } else if bar != "bar" {
+ t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar)
+ }
+
+ if bay, ok := fields["baz"]; !ok {
+ t.Errorf("Expected field 'baz', but no such field there\n")
+ } else if bay != "bay" {
+ t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay)
+ }
+
+ fields, err = parser.MakeFields("foo=bar|bazbay")
+ if err == nil {
+ t.Errorf("Expected error but didn't: %s", err.Error())
+ }
+}
diff --git a/mapr/logformat/parser.go b/mapr/logformat/parser.go
new file mode 100644
index 0000000..b7c8c5c
--- /dev/null
+++ b/mapr/logformat/parser.go
@@ -0,0 +1,75 @@
+package logformat
+
+import (
+ "dtail/logger"
+ "errors"
+ "fmt"
+ "os"
+ "reflect"
+ "strings"
+)
+
+// Parser is used to parse the mapreduce information from the server log files.
+type Parser struct {
+ hostname string
+ logFormatName string
+ makeFieldsFunc reflect.Value
+ makeFieldsReceiver reflect.Value
+}
+
+// NewParser returns a new log parser.
+func NewParser(logFormatName string) (*Parser, error) {
+ hostname, err := os.Hostname()
+
+ if err != nil {
+ return nil, err
+ }
+
+ p := Parser{
+ hostname: hostname,
+ }
+
+ err = p.reflectLogFormat(logFormatName)
+ if err != nil {
+ return nil, err
+ }
+
+ return &p, nil
+}
+
+// The aim of this is that everyone can plug in their own mapr log format
+// parsing method to DTail. Just add a method MakeFieldsMODULENAME to type
+// Parser. Whereas MODULENAME must be a upeprcase string.
+func (p *Parser) reflectLogFormat(logFormatName string) error {
+ methodName := fmt.Sprintf("MakeFields%s", strings.ToUpper(logFormatName))
+
+ rt := reflect.TypeOf(p)
+ method, ok := rt.MethodByName(methodName)
+ if !ok {
+ return errors.New("No such mapr log format module: " + methodName)
+ }
+
+ p.makeFieldsFunc = method.Func
+ p.makeFieldsReceiver = reflect.ValueOf(p)
+
+ return nil
+}
+
+// MakeFields is for returning the fields from a given log line.
+func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err error) {
+ inputValues := []reflect.Value{p.makeFieldsReceiver, reflect.ValueOf(maprLine)}
+ returnValues := p.makeFieldsFunc.Call(inputValues)
+
+ errInterface := returnValues[1].Interface()
+
+ if errInterface == nil {
+ fields, err = returnValues[0].Interface().(map[string]string), nil
+ logger.Trace("parser.MakeFields", fields, err)
+ return
+ }
+
+ fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error)
+ logger.Trace("parser.MakeFields", fields, err)
+
+ return
+}
diff --git a/mapr/query.go b/mapr/query.go
new file mode 100644
index 0000000..8ed3c67
--- /dev/null
+++ b/mapr/query.go
@@ -0,0 +1,245 @@
+package mapr
+
+import (
+ "dtail/logger"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+)
+
+const (
+ invalidQuery string = "Invalid query: "
+ unexpectedEnd string = "Unexpected end of query"
+)
+
+// Query represents a parsed mapr query.
+type Query struct {
+ Select []selectCondition
+ Table string
+ Where []whereCondition
+ GroupBy []string
+ OrderBy string
+ ReverseOrder bool
+ GroupKey string
+ Interval time.Duration
+ Limit int
+ Outfile string
+ RawQuery string
+ tokens []token
+}
+
+func (q Query) String() string {
+ return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,GroupBy:%v,GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,RawQuery:%s,tokens:%v)",
+ q.Select,
+ q.Table,
+ q.Where,
+ q.GroupBy,
+ q.GroupKey,
+ q.OrderBy,
+ q.ReverseOrder,
+ q.Interval,
+ q.Limit,
+ q.Outfile,
+ q.RawQuery,
+ q.tokens)
+}
+
+// NewQuery returns a new mapreduce query.
+func NewQuery(queryStr string) (*Query, error) {
+ if queryStr == "" {
+ return nil, nil
+ }
+
+ tokens := tokenize(queryStr)
+
+ q := Query{
+ RawQuery: queryStr,
+ tokens: tokens,
+ Interval: time.Second * 5,
+ Limit: -1,
+ }
+
+ err := q.parse(tokens)
+
+ logger.Debug(q)
+ return &q, err
+}
+
+func (q *Query) parse(tokens []token) error {
+ var found []token
+ var err error
+
+ for tokens != nil && len(tokens) > 0 {
+ switch strings.ToLower(tokens[0].str) {
+ case "select":
+ tokens, found = tokensConsume(tokens[1:])
+ q.Select, err = makeSelectConditions(found)
+ if err != nil {
+ return err
+ }
+ case "from":
+ tokens, found = tokensConsume(tokens[1:])
+ if len(found) > 0 {
+ q.Table = strings.ToUpper(found[0].str)
+ }
+ case "where":
+ tokens, found = tokensConsume(tokens[1:])
+ if q.Where, err = makeWhereConditions(found); err != nil {
+ return err
+ }
+ case "group":
+ tokens = tokensConsumeOptional(tokens[1:], "by")
+ if tokens == nil || len(tokens) < 1 {
+ return errors.New(invalidQuery + unexpectedEnd)
+ }
+ tokens, q.GroupBy = tokensConsumeStr(tokens)
+ q.GroupKey = strings.Join(q.GroupBy, ",")
+ case "rorder":
+ tokens = tokensConsumeOptional(tokens[1:], "by")
+ if tokens == nil || len(tokens) < 1 {
+ return errors.New(invalidQuery + unexpectedEnd)
+ }
+ tokens, found = tokensConsume(tokens)
+ if len(found) == 0 {
+ return errors.New(invalidQuery + unexpectedEnd)
+ }
+ q.OrderBy = found[0].str
+ q.ReverseOrder = true
+ case "order":
+ tokens = tokensConsumeOptional(tokens[1:], "by")
+ if tokens == nil || len(tokens) < 1 {
+ return errors.New(invalidQuery + unexpectedEnd)
+ }
+ tokens, found = tokensConsume(tokens)
+ if len(found) == 0 {
+ return errors.New(invalidQuery + unexpectedEnd)
+ }
+ q.OrderBy = found[0].str
+ case "interval":
+ tokens, found = tokensConsume(tokens[1:])
+ if len(found) > 0 {
+ i, err := strconv.Atoi(found[0].str)
+ if err != nil {
+ return errors.New(invalidQuery + err.Error())
+ }
+ q.Interval = time.Second * time.Duration(i)
+ }
+ case "limit":
+ tokens, found = tokensConsume(tokens[1:])
+ if len(found) == 0 {
+ return errors.New(invalidQuery + unexpectedEnd)
+ }
+ i, err := strconv.Atoi(found[0].str)
+ if err != nil {
+ return errors.New(invalidQuery + err.Error())
+ }
+ q.Limit = i
+ case "outfile":
+ tokens, found = tokensConsume(tokens[1:])
+ if len(found) == 0 {
+ return errors.New(invalidQuery + unexpectedEnd)
+ }
+ q.Outfile = found[0].str
+ default:
+ return errors.New(invalidQuery + "Unexpected keyword " + tokens[0].str)
+ }
+ }
+
+ if q.Table == "" {
+ return errors.New(invalidQuery + "Empty table specified in 'from' clause")
+ }
+ if len(q.Select) < 1 {
+ return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none")
+ }
+ if len(q.GroupBy) == 0 {
+ field := q.Select[0].Field
+ q.GroupBy = append(q.GroupBy, field)
+ }
+
+ if q.OrderBy != "" {
+ var orderFieldIsValid bool
+ for _, sc := range q.Select {
+ if q.OrderBy == sc.FieldStorage {
+ orderFieldIsValid = true
+ break
+ }
+ }
+ if !orderFieldIsValid {
+ return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s', must be present in 'select' clause", q.OrderBy))
+ }
+ }
+
+ return nil
+}
+
+// WhereClause interprets the where clause of the mapreduce query.
+func (q *Query) WhereClause(fields map[string]string) bool {
+ floatValue := func(str string, float float64, t whereType) (float64, bool) {
+ switch t {
+ case Float:
+ return float, true
+ case Field:
+ value, ok := fields[str]
+ if !ok {
+ return 0, false
+ }
+ f, err := strconv.ParseFloat(value, 64)
+ if err != nil {
+ return 0, false
+ }
+ return f, true
+ default:
+ logger.Error("Unexpected argument in 'where' clause", str, float, t)
+ return 0, false
+ }
+ }
+
+ stringValue := func(str string, t whereType) (string, bool) {
+ switch t {
+ case Field:
+ value, ok := fields[str]
+ if !ok {
+ return str, false
+ }
+ return value, true
+ case String:
+ return str, true
+ default:
+ logger.Error("Unexpected argument in 'where' clause", str, t)
+ return str, false
+ }
+ }
+
+ for _, wc := range q.Where {
+ var ok bool
+
+ if wc.Operation > FloatOperation {
+ var lValue, rValue float64
+ if lValue, ok = floatValue(wc.lString, wc.lFloat, wc.lType); !ok {
+ return false
+ }
+ if rValue, ok = floatValue(wc.rString, wc.rFloat, wc.rType); !ok {
+ return false
+ }
+ if ok = wc.floatClause(lValue, rValue); !ok {
+ return false
+ }
+ continue
+ }
+
+ var lValue, rValue string
+ if lValue, ok = stringValue(wc.lString, wc.lType); !ok {
+ return false
+ }
+ if rValue, ok = stringValue(wc.rString, wc.rType); !ok {
+ return false
+ }
+ if ok = wc.stringClause(lValue, rValue); !ok {
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/mapr/query_test.go b/mapr/query_test.go
new file mode 100644
index 0000000..6176461
--- /dev/null
+++ b/mapr/query_test.go
@@ -0,0 +1,149 @@
+package mapr
+
+import (
+ "testing"
+ "time"
+)
+
+func TestParseQuerySimple(t *testing.T) {
+ errorQueries := []string{
+ "select",
+ "select foo",
+ "select foo from",
+ "select foo from bar where baz",
+ "select foo from bar where baz <",
+ "select foo from bar where baz < 100 bay eq 12 group",
+ "select foo from bar where baz < 100 bay eq 12 group by foo order by",
+ "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit",
+ }
+ okQueries := []string{"select foo from bar",
+ "select foo from bar where",
+ "select foo from bar where baz < 100 bay eq 12",
+ "select foo from bar where baz < 100, bay eq 12",
+ "select foo from bar where baz < 100 and bay eq 12",
+ "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo",
+ "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23",
+ "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\"",
+ }
+
+ for _, queryStr := range errorQueries {
+ q, err := NewQuery(queryStr)
+ if err == nil {
+ t.Errorf("Expected a parse error: %s\n%v", queryStr, q)
+ continue
+ }
+ }
+
+ for _, queryStr := range okQueries {
+ _, err := NewQuery(queryStr)
+ if err != nil {
+ t.Errorf("%s: %s", err.Error(), queryStr)
+ continue
+ }
+ }
+}
+
+func TestParseQueryDeep(t *testing.T) {
+ dialects := []string{
+ "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23",
+ "SELECT s1, `from` COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23",
+ "select s1, `from` count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23",
+ "sElEct s1, `from` coUnt(s3) from taBle where w1 == 2 aNd w2 eq \"free beer\" Group By g1, g2 order bY count(s3) intervaL 10 LiMiT 23",
+ "SELECT s1 `from` COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP BY g1 g2 ORDER BY count(s3) INTERVAL 10 LIMIT 23",
+ "select s1 `from` count(s3) from table where w1 == 2 w2 eq \"free beer\" group g1 g2 order count(s3) interval 10 limit 23",
+ "limit 23 interval 10 order count(s3) group g1 g2 where w1 == 2 w2 eq \"free beer\" from table select s1 `from` count(s3)",
+ }
+
+ for _, queryStr := range dialects {
+ q, err := NewQuery(queryStr)
+ if err != nil {
+ t.Errorf("%s: %s", err.Error(), queryStr)
+ }
+
+ // 'select' clause
+ if len(q.Select) != 3 {
+ t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", q.Select, queryStr, q)
+ }
+
+ if q.Select[0].Field != "s1" {
+ t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Field, queryStr, q)
+ }
+ if q.Select[0].Operation != Last {
+ t.Errorf("Expected 'last' as aggregation function of first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q)
+ }
+
+ if q.Select[1].Field != "from" {
+ t.Errorf("Expected 'from' as second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Field, queryStr, q)
+ }
+ if q.Select[1].Operation != Last {
+ t.Errorf("Expected 'last' as aggregation function of second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q)
+ }
+
+ if q.Select[2].Field != "s3" {
+ t.Errorf("Expected 's3' as third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Field, queryStr, q)
+ }
+ if q.Select[2].Operation != Count {
+ t.Errorf("Expected 'count' as aggregation function of third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q)
+ }
+ if q.Select[2].FieldStorage != "count(s3)" {
+ t.Errorf("Expected 'count(s3)' as third element's storage in 'select' clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q)
+ }
+
+ // 'from' clause
+ if q.Table != "TABLE" {
+ t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", q.Table, queryStr, q)
+ }
+
+ // 'where' clause
+ if len(q.Where) != 2 {
+ t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", q.Where, queryStr, q)
+ }
+ if q.Where[0].lString != "w1" {
+ t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", q.Where[0].lString, queryStr, q)
+ }
+ if q.Where[0].Operation != FloatEq {
+ t.Errorf("Expected FloatEq operation in first 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q)
+ }
+ if q.Where[0].rFloat != 2 {
+ t.Errorf("Expected '2' as float argument in first 'where' condition but got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q)
+ }
+ if q.Where[1].lString != "w2" {
+ t.Errorf("Expected w2 as second element in 'where' clause but got '%v': %s\n%v", q.Where[1].lString, queryStr, q)
+ }
+ if q.Where[1].Operation != StringEq {
+ t.Errorf("Expected StringEq operation in second 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q)
+ }
+ if q.Where[1].rString != "free beer" {
+ t.Errorf("Expected 'free beer' as string argument in second 'where' condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q)
+ }
+
+ // 'group by' clause
+ if len(q.GroupBy) != 2 {
+ t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", q.GroupBy, queryStr, q)
+ }
+ if q.GroupBy[0] != "g1" {
+ t.Errorf("Expected 'g1' as first element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[0], queryStr, q)
+ }
+ if q.GroupBy[1] != "g2" {
+ t.Errorf("Expected 'g2' as second element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[1], queryStr, q)
+ }
+ if q.GroupKey != "g1,g2" {
+ t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got '%v': %s\n%v", q.GroupKey, queryStr, q)
+ }
+
+ // 'order by' clause
+ if q.OrderBy != "count(s3)" {
+ t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got '%v': %s\n%v", q.OrderBy, queryStr, q)
+ }
+
+ // 'interval' clause
+ if q.Interval != time.Second*time.Duration(10) {
+ t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", q.Interval, queryStr, q)
+ }
+
+ // 'limit' clause
+ if q.Limit != 23 {
+ t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", q.Limit, queryStr, q)
+ }
+ }
+}
diff --git a/mapr/selectcondition.go b/mapr/selectcondition.go
new file mode 100644
index 0000000..1882b7e
--- /dev/null
+++ b/mapr/selectcondition.go
@@ -0,0 +1,96 @@
+package mapr
+
+import (
+ "errors"
+ "fmt"
+ "strings"
+)
+
+// AggregateOperation is to specify the aggregate operation type.
+type AggregateOperation int
+
+// Aggregate operation types
+const (
+ UndefAggregateOperation AggregateOperation = iota
+ Count AggregateOperation = iota
+ Sum AggregateOperation = iota
+ Min AggregateOperation = iota
+ Max AggregateOperation = iota
+ Last AggregateOperation = iota
+ Avg AggregateOperation = iota
+ Len AggregateOperation = iota
+)
+
+// Represents a parsed "select" clause, used by mapr.Query.
+type selectCondition struct {
+ Field string
+ FieldStorage string
+ Operation AggregateOperation
+}
+
+func (sc selectCondition) String() string {
+ return fmt.Sprintf("selectCondition(Field:%s,FieldStorage:%s,Operation:%v)",
+ sc.Field,
+ sc.FieldStorage,
+ sc.Operation)
+}
+
+func makeSelectConditions(tokens []token) ([]selectCondition, error) {
+ var sel []selectCondition
+
+ // Parse select aggregation, e.g. sum(foo)
+ parse := func(token token) (selectCondition, error) {
+ var sc selectCondition
+ tokenStr := strings.ToLower(token.str)
+
+ if !strings.Contains(tokenStr, "(") && !strings.Contains(tokenStr, ")") {
+ sc.Field = tokenStr
+ sc.FieldStorage = tokenStr
+ sc.Operation = Last
+ return sc, nil
+ }
+
+ a := strings.Split(tokenStr, "(")
+ if len(a) != 2 {
+ return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + token.str)
+ }
+ agg := a[0] // Aggregation, e.g. 'sum'
+
+ b := strings.Split(a[1], ")")
+ if len(b) != 2 {
+ return sc, errors.New(invalidQuery + "Can't parse 'select' field name from aggregation: " + token.str)
+ }
+ sc.Field = b[0] // Field name, e.g. 'foo'
+ sc.FieldStorage = tokenStr // e.g. 'sum(foo)'
+
+ switch agg {
+ case "count":
+ sc.Operation = Count
+ case "sum":
+ sc.Operation = Sum
+ case "min":
+ sc.Operation = Min
+ case "max":
+ sc.Operation = Max
+ case "last":
+ sc.Operation = Last
+ case "avg":
+ sc.Operation = Avg
+ case "len":
+ sc.Operation = Len
+ default:
+ return sc, errors.New(invalidQuery + "Unknown aggregation in 'select' clause: " + agg)
+ }
+
+ return sc, nil
+ }
+
+ for _, token := range tokens {
+ sc, err := parse(token)
+ if err != nil {
+ return nil, err
+ }
+ sel = append(sel, sc)
+ }
+ return sel, nil
+}
diff --git a/mapr/server/aggregate.go b/mapr/server/aggregate.go
new file mode 100644
index 0000000..316da67
--- /dev/null
+++ b/mapr/server/aggregate.go
@@ -0,0 +1,170 @@
+package server
+
+import (
+ "dtail/config"
+ "dtail/fs"
+ "dtail/logger"
+ "dtail/mapr"
+ "dtail/mapr/logformat"
+ "os"
+ "strings"
+ "time"
+)
+
+// Aggregate is for aggregating mapreduce data on the DTail server side.
+type Aggregate struct {
+ // Log lines to process (parsing MAPREDUCE lines).
+ Lines chan fs.LineRead
+ // Hostname of the current server (used to populate $hostname field).
+ hostname string
+ // Signals to exit goroutine.
+ stop chan struct{}
+ // Signals to serialize data.
+ serialize chan struct{}
+ // The mapr query
+ query *mapr.Query
+ // The mapr log format parser
+ parser *logformat.Parser
+}
+
+// NewAggregate return a new server side aggregator.
+func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) {
+ query, err := mapr.NewQuery(queryStr)
+ if err != nil {
+ return nil, err
+ }
+
+ fqdn, err := os.Hostname()
+ if err != nil {
+ logger.Error(err)
+ }
+ s := strings.Split(fqdn, ".")
+
+ logger.Info("Creating mapr log format parser", config.Server.MapreduceLogFormat)
+ logParser, err := logformat.NewParser(config.Server.MapreduceLogFormat)
+ if err != nil {
+ logger.FatalExit("Could not create mapr log format parser", err)
+ }
+
+ a := Aggregate{
+ Lines: make(chan fs.LineRead, 100),
+ stop: make(chan struct{}),
+ serialize: make(chan struct{}),
+ hostname: s[0],
+ query: query,
+ parser: logParser,
+ }
+
+ go a.periodicAggregateTimer()
+
+ fieldsCh := make(chan map[string]string)
+ go a.readFields(fieldsCh, maprLines)
+ go a.readLines(fieldsCh)
+
+ return &a, nil
+}
+
+func (a *Aggregate) periodicAggregateTimer() {
+ for {
+ select {
+ case <-time.After(a.query.Interval):
+ a.Serialize()
+ case <-a.stop:
+ return
+ }
+ }
+}
+
+func (a *Aggregate) readFields(fieldsCh <-chan map[string]string, maprLines chan<- string) {
+ group := mapr.NewGroupSet()
+
+ for {
+ select {
+ case fields := <-fieldsCh:
+ a.aggregate(group, fields)
+ case <-a.serialize:
+ logger.Info("Serializing mapreduce result")
+ group.Serialize(maprLines, a.stop)
+ logger.Info("Done serializing mapreduce result")
+ group = mapr.NewGroupSet()
+ case <-a.stop:
+ return
+ }
+ }
+}
+
+func (a *Aggregate) readLines(fieldsCh chan<- map[string]string) {
+ for {
+ select {
+ case line, ok := <-a.Lines:
+ if !ok {
+ return
+ }
+
+ maprLine := strings.TrimSpace(string(line.Content))
+ fields, err := a.parser.MakeFields(maprLine)
+
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ if !a.query.WhereClause(fields) {
+ continue
+ }
+
+ select {
+ case fieldsCh <- fields:
+ case <-a.stop:
+ }
+ case <-a.stop:
+ return
+ }
+ }
+}
+
+func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
+ //logger.Trace("Aggregating", group, fields)
+ var sb strings.Builder
+
+ for i, field := range a.query.GroupBy {
+ if i > 0 {
+ sb.WriteString(" ")
+ }
+ if val, ok := fields[field]; ok {
+ sb.WriteString(val)
+ }
+ }
+ groupKey := sb.String()
+ set := group.GetSet(groupKey)
+
+ var addedSample bool
+ for _, sc := range a.query.Select {
+ if val, ok := fields[sc.Field]; ok {
+ if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
+ logger.Error(err)
+ continue
+ }
+ addedSample = true
+ }
+ }
+
+ if addedSample {
+ set.Samples++
+ return
+ }
+
+ logger.Trace("Aggregated data locally without adding new samples")
+}
+
+// Serialize all the aggregated data.
+func (a *Aggregate) Serialize() {
+ select {
+ case a.serialize <- struct{}{}:
+ case <-a.stop:
+ }
+}
+
+// Close the aggregator.
+func (a *Aggregate) Close() {
+ close(a.stop)
+}
diff --git a/mapr/token.go b/mapr/token.go
new file mode 100644
index 0000000..b8be4da
--- /dev/null
+++ b/mapr/token.go
@@ -0,0 +1,108 @@
+package mapr
+
+import (
+ "strings"
+)
+
+var keywords = [...]string{"select", "from", "where", "group", "rorder", "order", "interval", "limit", "outfile"}
+
+// Represents a parsed token, used to parse the mapr query.
+type token struct {
+ str string
+ isBareword bool
+}
+
+func (t token) isKeyword() bool {
+ if !t.isBareword {
+ return false
+ }
+
+ for _, keyword := range keywords {
+ if strings.ToLower(t.str) == keyword {
+ return true
+ }
+ }
+ return false
+}
+
+func (t token) String() string {
+ return t.str
+}
+
+func tokenize(queryStr string) []token {
+ var tokens []token
+
+ for i, part := range strings.Split(queryStr, "\"") {
+ // Even i, means that it is not a quoted string
+ if i%2 == 0 {
+ commasStripped := strings.Replace(part, ",", " ", -1)
+ for _, tokenStr := range strings.Fields(commasStripped) {
+ token := token{
+ str: tokenStr,
+ isBareword: true,
+ }
+ tokens = append(tokens, token)
+ }
+ continue
+ }
+ // Add whole quoted string as a token
+ token := token{
+ str: part,
+ isBareword: false,
+ }
+ tokens = append(tokens, token)
+ }
+
+ return tokens
+}
+
+func tokensConsume(tokens []token) ([]token, []token) {
+ //logger.Trace("=====================")
+ var consumed []token
+
+ for i, t := range tokens {
+ if t.isKeyword() {
+ //logger.Trace("keyword", t)
+ return tokens[i:], consumed
+ }
+ // strip escapes, such as ` from `foo`, this allows to use keywords as field names
+ length := len(t.str)
+ if length == 0 {
+ continue
+ }
+ if t.str[0] == '`' && t.str[length-1] == '`' {
+ stripped := t.str[1 : length-1]
+ //logger.Trace("stripped", stripped)
+ t := token{
+ str: stripped,
+ isBareword: t.isBareword,
+ }
+ consumed = append(consumed, t)
+ continue
+ }
+ //logger.Trace("bare", token)
+ consumed = append(consumed, t)
+ }
+
+ //logger.Trace("result", consumed)
+ return nil, consumed
+}
+
+func tokensConsumeStr(tokens []token) ([]token, []string) {
+ var strings []string
+ tokens, found := tokensConsume(tokens)
+ for _, token := range found {
+ strings = append(strings, token.str)
+ }
+ return tokens, strings
+}
+
+func tokensConsumeOptional(tokens []token, optional string) []token {
+ if len(tokens) < 1 {
+ return tokens
+ }
+ if strings.ToLower(tokens[0].str) == strings.ToLower(optional) {
+ return tokens[1:]
+ }
+ return tokens
+}
diff --git a/mapr/wherecondition.go b/mapr/wherecondition.go
new file mode 100644
index 0000000..515c8ad
--- /dev/null
+++ b/mapr/wherecondition.go
@@ -0,0 +1,193 @@
+package mapr
+
+import (
+ "dtail/logger"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+// QueryOperation determines the mapreduce operation.
+type QueryOperation int
+
+// The possible mapreduce operation.s
+const (
+ UndefQueryOperation QueryOperation = iota
+ StringEq QueryOperation = iota
+ StringNe QueryOperation = iota
+ StringContains QueryOperation = iota
+ FloatOperation QueryOperation = iota
+ FloatEq QueryOperation = iota
+ FloatNe QueryOperation = iota
+ FloatLt QueryOperation = iota
+ FloatLe QueryOperation = iota
+ FloatGt QueryOperation = iota
+ FloatGe QueryOperation = iota
+)
+
+type whereType int
+
+// The possible field types.
+const (
+ UndefWhereType whereType = iota
+ Field whereType = iota
+ String whereType = iota
+ Float whereType = iota
+)
+
+func (w whereType) String() string {
+ switch w {
+ case Field:
+ return fmt.Sprintf("Field")
+ case String:
+ return fmt.Sprintf("String")
+ case Float:
+ return fmt.Sprintf("Float")
+ default:
+ return fmt.Sprintf("UndefWhereType")
+ }
+}
+
+// Represent a parsed "where" clause, used by mapr.Query
+type whereCondition struct {
+ lString string
+ lFloat float64
+ lType whereType
+
+ Operation QueryOperation
+
+ rString string
+ rFloat float64
+ rType whereType
+}
+
+func (wc *whereCondition) String() string {
+ return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,lType:%s,rString:%s,rFloat:%v,rType:%s)",
+ wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, wc.rFloat, wc.rType.String())
+}
+
+func makeWhereConditions(tokens []token) (where []whereCondition, err error) {
+ parse := func(tokens []token) (whereCondition, []token, error) {
+ var wc whereCondition
+ if len(tokens) < 3 {
+ return wc, nil, errors.New(invalidQuery + "Not enough arguments in 'where' clause")
+ }
+
+ whereOp := strings.ToLower(tokens[1].str)
+ switch whereOp {
+ case "==":
+ wc.Operation = FloatEq
+ case "!=":
+ wc.Operation = FloatNe
+ case "<":
+ wc.Operation = FloatLt
+ case "<=":
+ wc.Operation = FloatLe
+ case "=<":
+ wc.Operation = FloatLe
+ case ">":
+ wc.Operation = FloatGt
+ case ">=":
+ wc.Operation = FloatGe
+ case "=>":
+ wc.Operation = FloatGe
+ case "eq":
+ wc.Operation = StringEq
+ case "ne":
+ wc.Operation = StringNe
+ case "contains":
+ wc.Operation = StringContains
+ default:
+ return wc, nil, errors.New(invalidQuery + "Unknown operation in 'where' clause: " + whereOp)
+ }
+
+ wc.lString = tokens[0].str
+ wc.rString = tokens[2].str
+
+ if wc.Operation > FloatOperation {
+ if !tokens[0].isBareword {
+ return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's lValue: " + tokens[0].str)
+ }
+ if f, err := strconv.ParseFloat(wc.lString, 64); err == nil {
+ wc.lFloat = f
+ wc.lType = Float
+ } else {
+ wc.lType = Field
+ }
+
+ if !tokens[2].isBareword {
+ return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's rValue: " + tokens[2].str)
+ }
+ if f, err := strconv.ParseFloat(wc.rString, 64); err == nil {
+ wc.rFloat = f
+ wc.rType = Float
+ } else {
+ wc.rType = Field
+ }
+ return wc, tokens[3:], nil
+ }
+
+ if tokens[0].isBareword {
+ wc.lType = Field
+ } else {
+ wc.lType = String
+ }
+ if tokens[2].isBareword {
+ wc.rType = Field
+ } else {
+ wc.rType = String
+ }
+
+ return wc, tokens[3:], nil
+ }
+
+ for len(tokens) > 0 {
+ var wc whereCondition
+ var err error
+
+ wc, tokens, err = parse(tokens)
+ if err != nil {
+ return nil, err
+ }
+
+ where = append(where, wc)
+ tokens = tokensConsumeOptional(tokens, "and")
+ }
+
+ return
+}
+
+func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool {
+ switch wc.Operation {
+ case FloatEq:
+ return lValue == rValue
+ case FloatNe:
+ return lValue != rValue
+ case FloatLt:
+ return lValue < rValue
+ case FloatLe:
+ return lValue <= rValue
+ case FloatGt:
+ return lValue > rValue
+ case FloatGe:
+ return lValue >= rValue
+ default:
+ logger.Error("Unknown float operation", lValue, wc.Operation, rValue)
+ }
+ return false
+}
+
+func (wc *whereCondition) stringClause(lValue string, rValue string) bool {
+ switch wc.Operation {
+ case StringEq:
+ return lValue == rValue
+ case StringNe:
+ return lValue != rValue
+ case StringContains:
+ return strings.Contains(lValue, rValue)
+ default:
+ logger.Error("Unknown string operation", lValue, wc.Operation, rValue)
+ }
+ return false
+}