summaryrefslogtreecommitdiff
path: root/mapr
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-20 18:41:05 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-01-21 14:35:23 +0000
commitc128865c4c7411c29a59fca9a3a2f95537686d7b (patch)
tree193bccc70d942c8b70cc93fae2670263701e43aa /mapr
parent3755a9911ecb05886577095f2b8cc8b9e4066a3a (diff)
Move commands to cmd/ and move internal dependencies to internal/
Diffstat (limited to 'mapr')
-rw-r--r--mapr/aggregateset.go185
-rw-r--r--mapr/client/aggregate.go100
-rw-r--r--mapr/globalgroupset.go100
-rw-r--r--mapr/groupset.go178
-rw-r--r--mapr/logformat/default.go23
-rw-r--r--mapr/logformat/default_test.go35
-rw-r--r--mapr/logformat/parser.go75
-rw-r--r--mapr/query.go245
-rw-r--r--mapr/query_test.go149
-rw-r--r--mapr/selectcondition.go96
-rw-r--r--mapr/server/aggregate.go170
-rw-r--r--mapr/token.go108
-rw-r--r--mapr/wherecondition.go193
13 files changed, 0 insertions, 1657 deletions
diff --git a/mapr/aggregateset.go b/mapr/aggregateset.go
deleted file mode 100644
index 2096c3c..0000000
--- a/mapr/aggregateset.go
+++ /dev/null
@@ -1,185 +0,0 @@
-package mapr
-
-import (
- "fmt"
- "strconv"
- "strings"
-)
-
-// AggregateSet represents aggregated key/value pairs from the
-// MAPREDUCE log lines. These could be either string values or float
-// values.
-type AggregateSet struct {
- Samples int
- FValues map[string]float64
- SValues map[string]string
-}
-
-// NewAggregateSet creates a new empty aggregate set.
-func NewAggregateSet() *AggregateSet {
- return &AggregateSet{
- FValues: make(map[string]float64),
- SValues: make(map[string]string),
- }
-}
-
-// String representation of aggregate set.
-func (s *AggregateSet) String() string {
- return fmt.Sprintf("AggregateSet(Samples:%d,FValues:%v,SValues:%v)",
- s.Samples, s.FValues, s.SValues)
-}
-
-// Merge one aggregate set into this one.
-func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
- s.Samples += set.Samples
- //logger.Trace("Merge", set)
-
- for _, sc := range query.Select {
- storage := sc.FieldStorage
- switch sc.Operation {
- case Count:
- fallthrough
- case Sum:
- fallthrough
- case Avg:
- value := set.FValues[storage]
- s.addFloat(storage, value)
- case Min:
- value := set.FValues[storage]
- s.addFloatMin(storage, value)
- case Max:
- value := set.FValues[storage]
- s.addFloatMax(storage, value)
- case Last:
- value := set.SValues[storage]
- s.setString(storage, value)
- case Len:
- s.setString(storage, set.SValues[storage])
- s.setFloat(storage, set.FValues[storage])
- default:
- return fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
- }
- }
- return nil
-}
-
-// Serialize the aggregate set so it can be sent over the wire.
-func (s *AggregateSet) Serialize(groupKey string, ch chan<- string, stop chan struct{}) {
- //logger.Trace("Serialising mapr.AggregateSet", s)
- var sb strings.Builder
-
- sb.WriteString(groupKey)
- sb.WriteString("|")
- sb.WriteString(fmt.Sprintf("%d|", s.Samples))
-
- for k, v := range s.FValues {
- sb.WriteString(k)
- sb.WriteString("=")
- sb.WriteString(fmt.Sprintf("%v|", v))
- }
-
- for k, v := range s.SValues {
- sb.WriteString(k)
- sb.WriteString("=")
- sb.WriteString(v)
- sb.WriteString("|")
- }
-
- select {
- case ch <- sb.String():
- case <-stop:
- }
-}
-
-// Add a float value.
-func (s *AggregateSet) addFloat(key string, value float64) {
- if _, ok := s.FValues[key]; !ok {
- s.FValues[key] = value
- return
- }
- s.FValues[key] += value
-}
-
-// Add a float minimum value.
-func (s *AggregateSet) addFloatMin(key string, value float64) {
- f, ok := s.FValues[key]
- if !ok {
- s.FValues[key] = value
- return
- }
-
- if f > value {
- s.FValues[key] = value
- }
-}
-
-// Add a float maximum value.
-func (s *AggregateSet) addFloatMax(key string, value float64) {
- f, ok := s.FValues[key]
- if !ok {
- s.FValues[key] = value
- return
- }
-
- if f < value {
- s.FValues[key] = value
- }
-}
-
-// Set a string.
-func (s *AggregateSet) setString(key, value string) {
- s.SValues[key] = value
-}
-
-// Set a float.
-func (s *AggregateSet) setFloat(key string, value float64) {
- s.FValues[key] = value
-}
-
-// Aggregate data to the aggregate set.
-func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) {
- var f float64
-
- // First check if we can aggregate anything without converting value to float.
- switch agg {
- case Count:
- if clientAggregation {
- f, err = strconv.ParseFloat(value, 64)
- if err != nil {
- return
- }
- s.addFloat(key, f)
- return
- }
- s.addFloat(key, 1)
- return
- case Last:
- s.setString(key, value)
- return
- case Len:
- s.setString(key, value)
- s.setFloat(key, float64(len(value)))
- return
- default:
- }
-
- // No, we have to convert to float.
- f, err = strconv.ParseFloat(value, 64)
- if err != nil {
- return
- }
-
- switch agg {
- case Sum:
- fallthrough
- case Avg:
- s.addFloat(key, f)
- case Min:
- s.addFloatMin(key, f)
- case Max:
- s.addFloatMax(key, f)
- default:
- err = fmt.Errorf("Unknown aggregation method '%v'", agg)
- }
- return
-}
diff --git a/mapr/client/aggregate.go b/mapr/client/aggregate.go
deleted file mode 100644
index b9443bc..0000000
--- a/mapr/client/aggregate.go
+++ /dev/null
@@ -1,100 +0,0 @@
-package client
-
-import (
- "dtail/logger"
- "dtail/mapr"
- "strconv"
- "strings"
-)
-
-// Aggregate mapreduce data on the DTail client side.
-type Aggregate struct {
- // This is the mapr query specified on the command line.
- query *mapr.Query
- // This represents aggregated data of a single remote server.
- group *mapr.GroupSet
- // This represents the merged aggregated data of all servers.
- globalGroup *mapr.GlobalGroupSet
- stop chan struct{}
- // The server we aggregate the data for (logging and debugging purposes only)
- server string
-}
-
-// NewAggregate create new client aggregator.
-func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate {
- return &Aggregate{
- query: query,
- group: mapr.NewGroupSet(),
- globalGroup: globalGroup,
- stop: make(chan struct{}),
- server: server,
- }
-}
-
-// Aggregate data from mapr log line into local (and global) group sets.
-func (a *Aggregate) Aggregate(parts []string) {
- select {
- case <-a.stop:
- logger.Error("Client aggregator stopped for server, not processing new data", a.server)
- return
- default:
- }
-
- groupKey := parts[0]
- samples, err := strconv.Atoi(parts[1])
- if err != nil {
- logger.FatalExit(parts, err)
- }
- fields := a.makeFields(parts[2:])
- set := a.group.GetSet(groupKey)
-
- var addedSamples bool
- for _, sc := range a.query.Select {
- if val, ok := fields[sc.FieldStorage]; ok {
- if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil {
- logger.Error(err)
- continue
- }
- addedSamples = true
- }
- }
- if addedSamples {
- set.Samples += samples
- }
-
- // Merge data from group into global group.
- isMerged, err := a.globalGroup.MergeNoblock(a.query, a.group)
- if err != nil {
- panic(err)
- }
- if isMerged {
- // Re-init local group (make it empty again).
- a.group.InitSet()
- }
-}
-
-// Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"].
-func (a *Aggregate) makeFields(parts []string) map[string]string {
- fields := make(map[string]string, len(parts))
-
- for _, part := range parts {
- kv := strings.Split(part, "=")
- if len(kv) != 2 {
- continue
- }
- fields[kv[0]] = kv[1]
- }
-
- return fields
-}
-
-// Stop the client side mapreduce aggregator.
-func (a *Aggregate) Stop() {
- logger.Debug("Stopping client mapreduce aggregator")
- close(a.stop)
-
- err := a.globalGroup.Merge(a.query, a.group)
- if err != nil {
- panic(err)
- }
-}
diff --git a/mapr/globalgroupset.go b/mapr/globalgroupset.go
deleted file mode 100644
index cfab506..0000000
--- a/mapr/globalgroupset.go
+++ /dev/null
@@ -1,100 +0,0 @@
-package mapr
-
-import (
- "fmt"
-)
-
-// GlobalGroupSet is used on the dtail client to merge multiple group sets
-// (one group set per remote server) to one single global group set.
-type GlobalGroupSet struct {
- GroupSet
- semaphore chan struct{}
-}
-
-// NewGlobalGroupSet creates a new empty global group set.
-func NewGlobalGroupSet() *GlobalGroupSet {
- g := GlobalGroupSet{
- semaphore: make(chan struct{}, 1),
- }
- g.InitSet()
-
- return &g
-}
-
-// String representation of the global group set.
-func (g *GlobalGroupSet) String() string {
- return fmt.Sprintf("GlobalGroupSet(%s)", g.GroupSet.String())
-}
-
-// Merge (blocking) a group set into the global group set.
-func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error {
- g.semaphore <- struct{}{}
- defer func() { <-g.semaphore }()
-
- return g.merge(query, group)
-}
-
-// MergeNoblock merges (non-blocking) a group set into the global group set.
-func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, error) {
- select {
- case g.semaphore <- struct{}{}:
- err := g.merge(query, group)
- <-g.semaphore
- return true, err
- default:
- return false, nil
- }
-}
-
-// Merge a group set into the global group set.
-func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error {
- for groupKey, set := range group.sets {
- s := g.GetSet(groupKey)
- if err := s.Merge(query, set); err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// IsEmpty determines whether the global group set has any data in it.
-func (g *GlobalGroupSet) IsEmpty() bool {
- return g.NumSets() == 0
-}
-
-// NumSets determines the number of sets.
-func (g *GlobalGroupSet) NumSets() int {
- g.semaphore <- struct{}{}
- defer func() { <-g.semaphore }()
-
- return len(g.sets)
-}
-
-// SwapOut teturn the underlying group set and create a new empty one, so
-// that the global group set is empty again and can aggregate new data.
-func (g *GlobalGroupSet) SwapOut() *GroupSet {
- g.semaphore <- struct{}{}
- defer func() { <-g.semaphore }()
-
- set := &GroupSet{sets: g.sets}
- g.InitSet()
-
- return set
-}
-
-// WriteResult writes the result of a mapreduce aggregation to an outfile.
-func (g *GlobalGroupSet) WriteResult(query *Query) error {
- g.semaphore <- struct{}{}
- defer func() { <-g.semaphore }()
-
- return g.GroupSet.WriteResult(query)
-}
-
-// Result returns the result of the mapreduce aggregation as a string.
-func (g *GlobalGroupSet) Result(query *Query) (string, int, error) {
- g.semaphore <- struct{}{}
- defer func() { <-g.semaphore }()
-
- return g.GroupSet.Result(query)
-}
diff --git a/mapr/groupset.go b/mapr/groupset.go
deleted file mode 100644
index d8f9379..0000000
--- a/mapr/groupset.go
+++ /dev/null
@@ -1,178 +0,0 @@
-package mapr
-
-import (
- "errors"
- "fmt"
- "io/ioutil"
- "sort"
- "strconv"
- "strings"
-)
-
-// GroupSet represents a map of aggregate sets. The group sets
-// are requierd by the "group by" mapr clause, whereas the
-// group set map keys are the values of the "group by" arguments.
-// E.g. "group by $cid" would create one aggregate set and one map
-// entry per customer id.
-type GroupSet struct {
- sets map[string]*AggregateSet
-}
-
-// NewGroupSet returns a new empty group set.
-func NewGroupSet() *GroupSet {
- g := GroupSet{}
- g.InitSet()
- return &g
-}
-
-// String representation of the group set.
-func (g *GroupSet) String() string {
- return fmt.Sprintf("GroupSet(%v)", g.sets)
-}
-
-// InitSet makes the group set empty (initialize).
-func (g *GroupSet) InitSet() {
- g.sets = make(map[string]*AggregateSet)
-}
-
-// GetSet gets a specific aggregate set from the group set.
-func (g *GroupSet) GetSet(groupKey string) *AggregateSet {
- set, ok := g.sets[groupKey]
- if !ok {
- set = NewAggregateSet()
- g.sets[groupKey] = set
- }
- return set
-}
-
-// Serialize the group set (e.g. to send it over the wire).
-func (g *GroupSet) Serialize(ch chan<- string, stop chan struct{}) {
- for groupKey, set := range g.sets {
- set.Serialize(groupKey, ch, stop)
- }
-}
-
-// Result returns a nicely formated result of the query from the group set.
-func (g *GroupSet) Result(query *Query) (string, int, error) {
- return g.limitedResult(query, query.Limit, "\t", " ", false)
-}
-
-// WriteResult writes the result to an outfile.
-func (g *GroupSet) WriteResult(query *Query) error {
- if query.Outfile == "" {
- return errors.New("No outfile specified")
- }
-
- // -1: Don't limit the result, include all data sets
- result, _, err := g.limitedResult(query, -1, "", ",", true)
- if err != nil {
- return err
- }
-
- return ioutil.WriteFile(query.Outfile, []byte(result), 0644)
-}
-
-// Return a nicely formated result of the query from the group set.
-func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) {
- type result struct {
- groupKey string
- resultStr string
- orderBy float64
- }
-
- var resultSlice []result
-
- for groupKey, set := range g.sets {
- var sb strings.Builder
- r := result{groupKey: groupKey}
-
- lastIndex := len(query.Select) - 1
- for i, sc := range query.Select {
- storage := sc.FieldStorage
- orderByThis := storage == query.OrderBy
-
- switch sc.Operation {
- case Count:
- value := set.FValues[storage]
- sb.WriteString(fmt.Sprintf("%d", int(value)))
- if orderByThis {
- r.orderBy = value
- }
- case Len:
- fallthrough
- case Sum:
- fallthrough
- case Min:
- fallthrough
- case Max:
- value := set.FValues[storage]
- sb.WriteString(fmt.Sprintf("%f", value))
- if orderByThis {
- r.orderBy = value
- }
- case Last:
- value := set.SValues[storage]
- if orderByThis {
- f, err := strconv.ParseFloat(value, 64)
- if err == nil {
- r.orderBy = f
- }
- }
- sb.WriteString(value)
- case Avg:
- value := set.FValues[storage] / float64(set.Samples)
- sb.WriteString(fmt.Sprintf("%f", value))
- if orderByThis {
- r.orderBy = value
- }
- default:
- return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
- }
- if i != lastIndex {
- sb.WriteString(fieldSeparator)
- }
- }
-
- r.resultStr = sb.String()
- resultSlice = append(resultSlice, r)
- }
-
- if query.OrderBy != "" {
- if query.ReverseOrder {
- sort.SliceStable(resultSlice, func(i, j int) bool {
- return resultSlice[i].orderBy < resultSlice[j].orderBy
- })
- } else {
- sort.SliceStable(resultSlice, func(i, j int) bool {
- return resultSlice[i].orderBy > resultSlice[j].orderBy
- })
- }
- }
-
- var sb strings.Builder
-
- // Write header first
- if addHeader {
- lastIndex := len(query.Select) - 1
- sb.WriteString(lineStarter)
- for i, sc := range query.Select {
- sb.WriteString(sc.FieldStorage)
- if i != lastIndex {
- sb.WriteString(fieldSeparator)
- }
- }
- sb.WriteString("\n")
- }
-
- // And now write the data
- for i, r := range resultSlice {
- if i == limit {
- break
- }
- sb.WriteString(lineStarter)
- sb.WriteString(r.resultStr)
- sb.WriteString("\n")
- }
-
- return sb.String(), len(resultSlice), nil
-}
diff --git a/mapr/logformat/default.go b/mapr/logformat/default.go
deleted file mode 100644
index f0df5bc..0000000
--- a/mapr/logformat/default.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package logformat
-
-import (
- "errors"
- "strings"
-)
-
-// MakeFieldsDEFAULT is the default log file mapreduce parser.
-func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
- fields := make(map[string]string, 20)
- splitted := strings.Split(maprLine, "|")
-
- fields["$hostname"] = p.hostname
-
- for _, kv := range splitted {
- keyAndValue := strings.SplitN(kv, "=", 2)
- if len(keyAndValue) != 2 {
- return fields, errors.New("Error parsing mapr token: " + kv)
- }
- fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
- }
- return fields, nil
-}
diff --git a/mapr/logformat/default_test.go b/mapr/logformat/default_test.go
deleted file mode 100644
index a3c47fb..0000000
--- a/mapr/logformat/default_test.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package logformat
-
-import (
- "testing"
-)
-
-func TestDefaultLogFormat(t *testing.T) {
- parser, err := NewParser("default")
- if err != nil {
- t.Errorf("Unable to create parser: %s", err.Error())
- }
-
- fields, err := parser.MakeFields("foo=bar|baz=bay")
-
- if err != nil {
- t.Errorf("Unable to parse: %s", err.Error())
- }
-
- if bar, ok := fields["foo"]; !ok {
- t.Errorf("Expected field 'foo', but no such field there\n")
- } else if bar != "bar" {
- t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar)
- }
-
- if bay, ok := fields["baz"]; !ok {
- t.Errorf("Expected field 'baz', but no such field there\n")
- } else if bay != "bay" {
- t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay)
- }
-
- fields, err = parser.MakeFields("foo=bar|bazbay")
- if err == nil {
- t.Errorf("Expected error but didn't: %s", err.Error())
- }
-}
diff --git a/mapr/logformat/parser.go b/mapr/logformat/parser.go
deleted file mode 100644
index b7c8c5c..0000000
--- a/mapr/logformat/parser.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package logformat
-
-import (
- "dtail/logger"
- "errors"
- "fmt"
- "os"
- "reflect"
- "strings"
-)
-
-// Parser is used to parse the mapreduce information from the server log files.
-type Parser struct {
- hostname string
- logFormatName string
- makeFieldsFunc reflect.Value
- makeFieldsReceiver reflect.Value
-}
-
-// NewParser returns a new log parser.
-func NewParser(logFormatName string) (*Parser, error) {
- hostname, err := os.Hostname()
-
- if err != nil {
- return nil, err
- }
-
- p := Parser{
- hostname: hostname,
- }
-
- err = p.reflectLogFormat(logFormatName)
- if err != nil {
- return nil, err
- }
-
- return &p, nil
-}
-
-// The aim of this is that everyone can plug in their own mapr log format
-// parsing method to DTail. Just add a method MakeFieldsMODULENAME to type
-// Parser. Whereas MODULENAME must be a upeprcase string.
-func (p *Parser) reflectLogFormat(logFormatName string) error {
- methodName := fmt.Sprintf("MakeFields%s", strings.ToUpper(logFormatName))
-
- rt := reflect.TypeOf(p)
- method, ok := rt.MethodByName(methodName)
- if !ok {
- return errors.New("No such mapr log format module: " + methodName)
- }
-
- p.makeFieldsFunc = method.Func
- p.makeFieldsReceiver = reflect.ValueOf(p)
-
- return nil
-}
-
-// MakeFields is for returning the fields from a given log line.
-func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err error) {
- inputValues := []reflect.Value{p.makeFieldsReceiver, reflect.ValueOf(maprLine)}
- returnValues := p.makeFieldsFunc.Call(inputValues)
-
- errInterface := returnValues[1].Interface()
-
- if errInterface == nil {
- fields, err = returnValues[0].Interface().(map[string]string), nil
- logger.Trace("parser.MakeFields", fields, err)
- return
- }
-
- fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error)
- logger.Trace("parser.MakeFields", fields, err)
-
- return
-}
diff --git a/mapr/query.go b/mapr/query.go
deleted file mode 100644
index 8ed3c67..0000000
--- a/mapr/query.go
+++ /dev/null
@@ -1,245 +0,0 @@
-package mapr
-
-import (
- "dtail/logger"
- "errors"
- "fmt"
- "strconv"
- "strings"
- "time"
-)
-
-const (
- invalidQuery string = "Invalid query: "
- unexpectedEnd string = "Unexpected end of query"
-)
-
-// Query represents a parsed mapr query.
-type Query struct {
- Select []selectCondition
- Table string
- Where []whereCondition
- GroupBy []string
- OrderBy string
- ReverseOrder bool
- GroupKey string
- Interval time.Duration
- Limit int
- Outfile string
- RawQuery string
- tokens []token
-}
-
-func (q Query) String() string {
- return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,GroupBy:%v,GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,RawQuery:%s,tokens:%v)",
- q.Select,
- q.Table,
- q.Where,
- q.GroupBy,
- q.GroupKey,
- q.OrderBy,
- q.ReverseOrder,
- q.Interval,
- q.Limit,
- q.Outfile,
- q.RawQuery,
- q.tokens)
-}
-
-// NewQuery returns a new mapreduce query.
-func NewQuery(queryStr string) (*Query, error) {
- if queryStr == "" {
- return nil, nil
- }
-
- tokens := tokenize(queryStr)
-
- q := Query{
- RawQuery: queryStr,
- tokens: tokens,
- Interval: time.Second * 5,
- Limit: -1,
- }
-
- err := q.parse(tokens)
-
- logger.Debug(q)
- return &q, err
-}
-
-func (q *Query) parse(tokens []token) error {
- var found []token
- var err error
-
- for tokens != nil && len(tokens) > 0 {
- switch strings.ToLower(tokens[0].str) {
- case "select":
- tokens, found = tokensConsume(tokens[1:])
- q.Select, err = makeSelectConditions(found)
- if err != nil {
- return err
- }
- case "from":
- tokens, found = tokensConsume(tokens[1:])
- if len(found) > 0 {
- q.Table = strings.ToUpper(found[0].str)
- }
- case "where":
- tokens, found = tokensConsume(tokens[1:])
- if q.Where, err = makeWhereConditions(found); err != nil {
- return err
- }
- case "group":
- tokens = tokensConsumeOptional(tokens[1:], "by")
- if tokens == nil || len(tokens) < 1 {
- return errors.New(invalidQuery + unexpectedEnd)
- }
- tokens, q.GroupBy = tokensConsumeStr(tokens)
- q.GroupKey = strings.Join(q.GroupBy, ",")
- case "rorder":
- tokens = tokensConsumeOptional(tokens[1:], "by")
- if tokens == nil || len(tokens) < 1 {
- return errors.New(invalidQuery + unexpectedEnd)
- }
- tokens, found = tokensConsume(tokens)
- if len(found) == 0 {
- return errors.New(invalidQuery + unexpectedEnd)
- }
- q.OrderBy = found[0].str
- q.ReverseOrder = true
- case "order":
- tokens = tokensConsumeOptional(tokens[1:], "by")
- if tokens == nil || len(tokens) < 1 {
- return errors.New(invalidQuery + unexpectedEnd)
- }
- tokens, found = tokensConsume(tokens)
- if len(found) == 0 {
- return errors.New(invalidQuery + unexpectedEnd)
- }
- q.OrderBy = found[0].str
- case "interval":
- tokens, found = tokensConsume(tokens[1:])
- if len(found) > 0 {
- i, err := strconv.Atoi(found[0].str)
- if err != nil {
- return errors.New(invalidQuery + err.Error())
- }
- q.Interval = time.Second * time.Duration(i)
- }
- case "limit":
- tokens, found = tokensConsume(tokens[1:])
- if len(found) == 0 {
- return errors.New(invalidQuery + unexpectedEnd)
- }
- i, err := strconv.Atoi(found[0].str)
- if err != nil {
- return errors.New(invalidQuery + err.Error())
- }
- q.Limit = i
- case "outfile":
- tokens, found = tokensConsume(tokens[1:])
- if len(found) == 0 {
- return errors.New(invalidQuery + unexpectedEnd)
- }
- q.Outfile = found[0].str
- default:
- return errors.New(invalidQuery + "Unexpected keyword " + tokens[0].str)
- }
- }
-
- if q.Table == "" {
- return errors.New(invalidQuery + "Empty table specified in 'from' clause")
- }
- if len(q.Select) < 1 {
- return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none")
- }
- if len(q.GroupBy) == 0 {
- field := q.Select[0].Field
- q.GroupBy = append(q.GroupBy, field)
- }
-
- if q.OrderBy != "" {
- var orderFieldIsValid bool
- for _, sc := range q.Select {
- if q.OrderBy == sc.FieldStorage {
- orderFieldIsValid = true
- break
- }
- }
- if !orderFieldIsValid {
- return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s', must be present in 'select' clause", q.OrderBy))
- }
- }
-
- return nil
-}
-
-// WhereClause interprets the where clause of the mapreduce query.
-func (q *Query) WhereClause(fields map[string]string) bool {
- floatValue := func(str string, float float64, t whereType) (float64, bool) {
- switch t {
- case Float:
- return float, true
- case Field:
- value, ok := fields[str]
- if !ok {
- return 0, false
- }
- f, err := strconv.ParseFloat(value, 64)
- if err != nil {
- return 0, false
- }
- return f, true
- default:
- logger.Error("Unexpected argument in 'where' clause", str, float, t)
- return 0, false
- }
- }
-
- stringValue := func(str string, t whereType) (string, bool) {
- switch t {
- case Field:
- value, ok := fields[str]
- if !ok {
- return str, false
- }
- return value, true
- case String:
- return str, true
- default:
- logger.Error("Unexpected argument in 'where' clause", str, t)
- return str, false
- }
- }
-
- for _, wc := range q.Where {
- var ok bool
-
- if wc.Operation > FloatOperation {
- var lValue, rValue float64
- if lValue, ok = floatValue(wc.lString, wc.lFloat, wc.lType); !ok {
- return false
- }
- if rValue, ok = floatValue(wc.rString, wc.rFloat, wc.rType); !ok {
- return false
- }
- if ok = wc.floatClause(lValue, rValue); !ok {
- return false
- }
- continue
- }
-
- var lValue, rValue string
- if lValue, ok = stringValue(wc.lString, wc.lType); !ok {
- return false
- }
- if rValue, ok = stringValue(wc.rString, wc.rType); !ok {
- return false
- }
- if ok = wc.stringClause(lValue, rValue); !ok {
- return false
- }
- }
-
- return true
-}
diff --git a/mapr/query_test.go b/mapr/query_test.go
deleted file mode 100644
index 6176461..0000000
--- a/mapr/query_test.go
+++ /dev/null
@@ -1,149 +0,0 @@
-package mapr
-
-import (
- "testing"
- "time"
-)
-
-func TestParseQuerySimple(t *testing.T) {
- errorQueries := []string{
- "select",
- "select foo",
- "select foo from",
- "select foo from bar where baz",
- "select foo from bar where baz <",
- "select foo from bar where baz < 100 bay eq 12 group",
- "select foo from bar where baz < 100 bay eq 12 group by foo order by",
- "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit",
- }
- okQueries := []string{"select foo from bar",
- "select foo from bar where",
- "select foo from bar where baz < 100 bay eq 12",
- "select foo from bar where baz < 100, bay eq 12",
- "select foo from bar where baz < 100 and bay eq 12",
- "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo",
- "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23",
- "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\"",
- }
-
- for _, queryStr := range errorQueries {
- q, err := NewQuery(queryStr)
- if err == nil {
- t.Errorf("Expected a parse error: %s\n%v", queryStr, q)
- continue
- }
- }
-
- for _, queryStr := range okQueries {
- _, err := NewQuery(queryStr)
- if err != nil {
- t.Errorf("%s: %s", err.Error(), queryStr)
- continue
- }
- }
-}
-
-func TestParseQueryDeep(t *testing.T) {
- dialects := []string{
- "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23",
- "SELECT s1, `from` COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23",
- "select s1, `from` count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23",
- "sElEct s1, `from` coUnt(s3) from taBle where w1 == 2 aNd w2 eq \"free beer\" Group By g1, g2 order bY count(s3) intervaL 10 LiMiT 23",
- "SELECT s1 `from` COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP BY g1 g2 ORDER BY count(s3) INTERVAL 10 LIMIT 23",
- "select s1 `from` count(s3) from table where w1 == 2 w2 eq \"free beer\" group g1 g2 order count(s3) interval 10 limit 23",
- "limit 23 interval 10 order count(s3) group g1 g2 where w1 == 2 w2 eq \"free beer\" from table select s1 `from` count(s3)",
- }
-
- for _, queryStr := range dialects {
- q, err := NewQuery(queryStr)
- if err != nil {
- t.Errorf("%s: %s", err.Error(), queryStr)
- }
-
- // 'select' clause
- if len(q.Select) != 3 {
- t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", q.Select, queryStr, q)
- }
-
- if q.Select[0].Field != "s1" {
- t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Field, queryStr, q)
- }
- if q.Select[0].Operation != Last {
- t.Errorf("Expected 'last' as aggregation function of first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q)
- }
-
- if q.Select[1].Field != "from" {
- t.Errorf("Expected 'from' as second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Field, queryStr, q)
- }
- if q.Select[1].Operation != Last {
- t.Errorf("Expected 'last' as aggregation function of second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q)
- }
-
- if q.Select[2].Field != "s3" {
- t.Errorf("Expected 's3' as third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Field, queryStr, q)
- }
- if q.Select[2].Operation != Count {
- t.Errorf("Expected 'count' as aggregation function of third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q)
- }
- if q.Select[2].FieldStorage != "count(s3)" {
- t.Errorf("Expected 'count(s3)' as third element's storage in 'select' clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q)
- }
-
- // 'from' clause
- if q.Table != "TABLE" {
- t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", q.Table, queryStr, q)
- }
-
- // 'where' clause
- if len(q.Where) != 2 {
- t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", q.Where, queryStr, q)
- }
- if q.Where[0].lString != "w1" {
- t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", q.Where[0].lString, queryStr, q)
- }
- if q.Where[0].Operation != FloatEq {
- t.Errorf("Expected FloatEq operation in first 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q)
- }
- if q.Where[0].rFloat != 2 {
- t.Errorf("Expected '2' as float argument in first 'where' condition but got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q)
- }
- if q.Where[1].lString != "w2" {
- t.Errorf("Expected w2 as second element in 'where' clause but got '%v': %s\n%v", q.Where[1].lString, queryStr, q)
- }
- if q.Where[1].Operation != StringEq {
- t.Errorf("Expected StringEq operation in second 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q)
- }
- if q.Where[1].rString != "free beer" {
- t.Errorf("Expected 'free beer' as string argument in second 'where' condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q)
- }
-
- // 'group by' clause
- if len(q.GroupBy) != 2 {
- t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", q.GroupBy, queryStr, q)
- }
- if q.GroupBy[0] != "g1" {
- t.Errorf("Expected 'g1' as first element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[0], queryStr, q)
- }
- if q.GroupBy[1] != "g2" {
- t.Errorf("Expected 'g2' as second element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[1], queryStr, q)
- }
- if q.GroupKey != "g1,g2" {
- t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got '%v': %s\n%v", q.GroupKey, queryStr, q)
- }
-
- // 'order by' clause
- if q.OrderBy != "count(s3)" {
- t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got '%v': %s\n%v", q.OrderBy, queryStr, q)
- }
-
- // 'interval' clause
- if q.Interval != time.Second*time.Duration(10) {
- t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", q.Interval, queryStr, q)
- }
-
- // 'limit' clause
- if q.Limit != 23 {
- t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", q.Limit, queryStr, q)
- }
- }
-}
diff --git a/mapr/selectcondition.go b/mapr/selectcondition.go
deleted file mode 100644
index 1882b7e..0000000
--- a/mapr/selectcondition.go
+++ /dev/null
@@ -1,96 +0,0 @@
-package mapr
-
-import (
- "errors"
- "fmt"
- "strings"
-)
-
-// AggregateOperation is to specify the aggregate operation type.
-type AggregateOperation int
-
-// Aggregate operation types
-const (
- UndefAggregateOperation AggregateOperation = iota
- Count AggregateOperation = iota
- Sum AggregateOperation = iota
- Min AggregateOperation = iota
- Max AggregateOperation = iota
- Last AggregateOperation = iota
- Avg AggregateOperation = iota
- Len AggregateOperation = iota
-)
-
-// Represents a parsed "select" clause, used by mapr.Query.
-type selectCondition struct {
- Field string
- FieldStorage string
- Operation AggregateOperation
-}
-
-func (sc selectCondition) String() string {
- return fmt.Sprintf("selectCondition(Field:%s,FieldStorage:%s,Operation:%v)",
- sc.Field,
- sc.FieldStorage,
- sc.Operation)
-}
-
-func makeSelectConditions(tokens []token) ([]selectCondition, error) {
- var sel []selectCondition
-
- // Parse select aggregation, e.g. sum(foo)
- parse := func(token token) (selectCondition, error) {
- var sc selectCondition
- tokenStr := strings.ToLower(token.str)
-
- if !strings.Contains(tokenStr, "(") && !strings.Contains(tokenStr, ")") {
- sc.Field = tokenStr
- sc.FieldStorage = tokenStr
- sc.Operation = Last
- return sc, nil
- }
-
- a := strings.Split(tokenStr, "(")
- if len(a) != 2 {
- return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + token.str)
- }
- agg := a[0] // Aggregation, e.g. 'sum'
-
- b := strings.Split(a[1], ")")
- if len(b) != 2 {
- return sc, errors.New(invalidQuery + "Can't parse 'select' field name from aggregation: " + token.str)
- }
- sc.Field = b[0] // Field name, e.g. 'foo'
- sc.FieldStorage = tokenStr // e.g. 'sum(foo)'
-
- switch agg {
- case "count":
- sc.Operation = Count
- case "sum":
- sc.Operation = Sum
- case "min":
- sc.Operation = Min
- case "max":
- sc.Operation = Max
- case "last":
- sc.Operation = Last
- case "avg":
- sc.Operation = Avg
- case "len":
- sc.Operation = Len
- default:
- return sc, errors.New(invalidQuery + "Unknown aggregation in 'select' clause: " + agg)
- }
-
- return sc, nil
- }
-
- for _, token := range tokens {
- sc, err := parse(token)
- if err != nil {
- return nil, err
- }
- sel = append(sel, sc)
- }
- return sel, nil
-}
diff --git a/mapr/server/aggregate.go b/mapr/server/aggregate.go
deleted file mode 100644
index 316da67..0000000
--- a/mapr/server/aggregate.go
+++ /dev/null
@@ -1,170 +0,0 @@
-package server
-
-import (
- "dtail/config"
- "dtail/fs"
- "dtail/logger"
- "dtail/mapr"
- "dtail/mapr/logformat"
- "os"
- "strings"
- "time"
-)
-
-// Aggregate is for aggregating mapreduce data on the DTail server side.
-type Aggregate struct {
- // Log lines to process (parsing MAPREDUCE lines).
- Lines chan fs.LineRead
- // Hostname of the current server (used to populate $hostname field).
- hostname string
- // Signals to exit goroutine.
- stop chan struct{}
- // Signals to serialize data.
- serialize chan struct{}
- // The mapr query
- query *mapr.Query
- // The mapr log format parser
- parser *logformat.Parser
-}
-
-// NewAggregate return a new server side aggregator.
-func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) {
- query, err := mapr.NewQuery(queryStr)
- if err != nil {
- return nil, err
- }
-
- fqdn, err := os.Hostname()
- if err != nil {
- logger.Error(err)
- }
- s := strings.Split(fqdn, ".")
-
- logger.Info("Creating mapr log format parser", config.Server.MapreduceLogFormat)
- logParser, err := logformat.NewParser(config.Server.MapreduceLogFormat)
- if err != nil {
- logger.FatalExit("Could not create mapr log format parser", err)
- }
-
- a := Aggregate{
- Lines: make(chan fs.LineRead, 100),
- stop: make(chan struct{}),
- serialize: make(chan struct{}),
- hostname: s[0],
- query: query,
- parser: logParser,
- }
-
- go a.periodicAggregateTimer()
-
- fieldsCh := make(chan map[string]string)
- go a.readFields(fieldsCh, maprLines)
- go a.readLines(fieldsCh)
-
- return &a, nil
-}
-
-func (a *Aggregate) periodicAggregateTimer() {
- for {
- select {
- case <-time.After(a.query.Interval):
- a.Serialize()
- case <-a.stop:
- return
- }
- }
-}
-
-func (a *Aggregate) readFields(fieldsCh <-chan map[string]string, maprLines chan<- string) {
- group := mapr.NewGroupSet()
-
- for {
- select {
- case fields := <-fieldsCh:
- a.aggregate(group, fields)
- case <-a.serialize:
- logger.Info("Serializing mapreduce result")
- group.Serialize(maprLines, a.stop)
- logger.Info("Done serializing mapreduce result")
- group = mapr.NewGroupSet()
- case <-a.stop:
- return
- }
- }
-}
-
-func (a *Aggregate) readLines(fieldsCh chan<- map[string]string) {
- for {
- select {
- case line, ok := <-a.Lines:
- if !ok {
- return
- }
-
- maprLine := strings.TrimSpace(string(line.Content))
- fields, err := a.parser.MakeFields(maprLine)
-
- if err != nil {
- logger.Error(err)
- continue
- }
- if !a.query.WhereClause(fields) {
- continue
- }
-
- select {
- case fieldsCh <- fields:
- case <-a.stop:
- }
- case <-a.stop:
- return
- }
- }
-}
-
-func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
- //logger.Trace("Aggregating", group, fields)
- var sb strings.Builder
-
- for i, field := range a.query.GroupBy {
- if i > 0 {
- sb.WriteString(" ")
- }
- if val, ok := fields[field]; ok {
- sb.WriteString(val)
- }
- }
- groupKey := sb.String()
- set := group.GetSet(groupKey)
-
- var addedSample bool
- for _, sc := range a.query.Select {
- if val, ok := fields[sc.Field]; ok {
- if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
- logger.Error(err)
- continue
- }
- addedSample = true
- }
- }
-
- if addedSample {
- set.Samples++
- return
- }
-
- logger.Trace("Aggregated data locally without adding new samples")
-}
-
-// Serialize all the aggregated data.
-func (a *Aggregate) Serialize() {
- select {
- case a.serialize <- struct{}{}:
- case <-a.stop:
- }
-}
-
-// Close the aggregator.
-func (a *Aggregate) Close() {
- close(a.stop)
-}
diff --git a/mapr/token.go b/mapr/token.go
deleted file mode 100644
index b8be4da..0000000
--- a/mapr/token.go
+++ /dev/null
@@ -1,108 +0,0 @@
-package mapr
-
-import (
- "strings"
-)
-
-var keywords = [...]string{"select", "from", "where", "group", "rorder", "order", "interval", "limit", "outfile"}
-
-// Represents a parsed token, used to parse the mapr query.
-type token struct {
- str string
- isBareword bool
-}
-
-func (t token) isKeyword() bool {
- if !t.isBareword {
- return false
- }
-
- for _, keyword := range keywords {
- if strings.ToLower(t.str) == keyword {
- return true
- }
- }
- return false
-}
-
-func (t token) String() string {
- return t.str
-}
-
-func tokenize(queryStr string) []token {
- var tokens []token
-
- for i, part := range strings.Split(queryStr, "\"") {
- // Even i, means that it is not a quoted string
- if i%2 == 0 {
- commasStripped := strings.Replace(part, ",", " ", -1)
- for _, tokenStr := range strings.Fields(commasStripped) {
- token := token{
- str: tokenStr,
- isBareword: true,
- }
- tokens = append(tokens, token)
- }
- continue
- }
- // Add whole quoted string as a token
- token := token{
- str: part,
- isBareword: false,
- }
- tokens = append(tokens, token)
- }
-
- return tokens
-}
-
-func tokensConsume(tokens []token) ([]token, []token) {
- //logger.Trace("=====================")
- var consumed []token
-
- for i, t := range tokens {
- if t.isKeyword() {
- //logger.Trace("keyword", t)
- return tokens[i:], consumed
- }
- // strip escapes, such as ` from `foo`, this allows to use keywords as field names
- length := len(t.str)
- if length == 0 {
- continue
- }
- if t.str[0] == '`' && t.str[length-1] == '`' {
- stripped := t.str[1 : length-1]
- //logger.Trace("stripped", stripped)
- t := token{
- str: stripped,
- isBareword: t.isBareword,
- }
- consumed = append(consumed, t)
- continue
- }
- //logger.Trace("bare", token)
- consumed = append(consumed, t)
- }
-
- //logger.Trace("result", consumed)
- return nil, consumed
-}
-
-func tokensConsumeStr(tokens []token) ([]token, []string) {
- var strings []string
- tokens, found := tokensConsume(tokens)
- for _, token := range found {
- strings = append(strings, token.str)
- }
- return tokens, strings
-}
-
-func tokensConsumeOptional(tokens []token, optional string) []token {
- if len(tokens) < 1 {
- return tokens
- }
- if strings.ToLower(tokens[0].str) == strings.ToLower(optional) {
- return tokens[1:]
- }
- return tokens
-}
diff --git a/mapr/wherecondition.go b/mapr/wherecondition.go
deleted file mode 100644
index 515c8ad..0000000
--- a/mapr/wherecondition.go
+++ /dev/null
@@ -1,193 +0,0 @@
-package mapr
-
-import (
- "dtail/logger"
- "errors"
- "fmt"
- "strconv"
- "strings"
-)
-
-// QueryOperation determines the mapreduce operation.
-type QueryOperation int
-
-// The possible mapreduce operation.s
-const (
- UndefQueryOperation QueryOperation = iota
- StringEq QueryOperation = iota
- StringNe QueryOperation = iota
- StringContains QueryOperation = iota
- FloatOperation QueryOperation = iota
- FloatEq QueryOperation = iota
- FloatNe QueryOperation = iota
- FloatLt QueryOperation = iota
- FloatLe QueryOperation = iota
- FloatGt QueryOperation = iota
- FloatGe QueryOperation = iota
-)
-
-type whereType int
-
-// The possible field types.
-const (
- UndefWhereType whereType = iota
- Field whereType = iota
- String whereType = iota
- Float whereType = iota
-)
-
-func (w whereType) String() string {
- switch w {
- case Field:
- return fmt.Sprintf("Field")
- case String:
- return fmt.Sprintf("String")
- case Float:
- return fmt.Sprintf("Float")
- default:
- return fmt.Sprintf("UndefWhereType")
- }
-}
-
-// Represent a parsed "where" clause, used by mapr.Query
-type whereCondition struct {
- lString string
- lFloat float64
- lType whereType
-
- Operation QueryOperation
-
- rString string
- rFloat float64
- rType whereType
-}
-
-func (wc *whereCondition) String() string {
- return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,lType:%s,rString:%s,rFloat:%v,rType:%s)",
- wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, wc.rFloat, wc.rType.String())
-}
-
-func makeWhereConditions(tokens []token) (where []whereCondition, err error) {
- parse := func(tokens []token) (whereCondition, []token, error) {
- var wc whereCondition
- if len(tokens) < 3 {
- return wc, nil, errors.New(invalidQuery + "Not enough arguments in 'where' clause")
- }
-
- whereOp := strings.ToLower(tokens[1].str)
- switch whereOp {
- case "==":
- wc.Operation = FloatEq
- case "!=":
- wc.Operation = FloatNe
- case "<":
- wc.Operation = FloatLt
- case "<=":
- wc.Operation = FloatLe
- case "=<":
- wc.Operation = FloatLe
- case ">":
- wc.Operation = FloatGt
- case ">=":
- wc.Operation = FloatGe
- case "=>":
- wc.Operation = FloatGe
- case "eq":
- wc.Operation = StringEq
- case "ne":
- wc.Operation = StringNe
- case "contains":
- wc.Operation = StringContains
- default:
- return wc, nil, errors.New(invalidQuery + "Unknown operation in 'where' clause: " + whereOp)
- }
-
- wc.lString = tokens[0].str
- wc.rString = tokens[2].str
-
- if wc.Operation > FloatOperation {
- if !tokens[0].isBareword {
- return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's lValue: " + tokens[0].str)
- }
- if f, err := strconv.ParseFloat(wc.lString, 64); err == nil {
- wc.lFloat = f
- wc.lType = Float
- } else {
- wc.lType = Field
- }
-
- if !tokens[2].isBareword {
- return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's rValue: " + tokens[2].str)
- }
- if f, err := strconv.ParseFloat(wc.rString, 64); err == nil {
- wc.rFloat = f
- wc.rType = Float
- } else {
- wc.rType = Field
- }
- return wc, tokens[3:], nil
- }
-
- if tokens[0].isBareword {
- wc.lType = Field
- } else {
- wc.lType = String
- }
- if tokens[2].isBareword {
- wc.rType = Field
- } else {
- wc.rType = String
- }
-
- return wc, tokens[3:], nil
- }
-
- for len(tokens) > 0 {
- var wc whereCondition
- var err error
-
- wc, tokens, err = parse(tokens)
- if err != nil {
- return nil, err
- }
-
- where = append(where, wc)
- tokens = tokensConsumeOptional(tokens, "and")
- }
-
- return
-}
-
-func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool {
- switch wc.Operation {
- case FloatEq:
- return lValue == rValue
- case FloatNe:
- return lValue != rValue
- case FloatLt:
- return lValue < rValue
- case FloatLe:
- return lValue <= rValue
- case FloatGt:
- return lValue > rValue
- case FloatGe:
- return lValue >= rValue
- default:
- logger.Error("Unknown float operation", lValue, wc.Operation, rValue)
- }
- return false
-}
-
-func (wc *whereCondition) stringClause(lValue string, rValue string) bool {
- switch wc.Operation {
- case StringEq:
- return lValue == rValue
- case StringNe:
- return lValue != rValue
- case StringContains:
- return strings.Contains(lValue, rValue)
- default:
- logger.Error("Unknown string operation", lValue, wc.Operation, rValue)
- }
- return false
-}