diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-01-20 18:41:05 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-01-21 14:35:23 +0000 |
| commit | c128865c4c7411c29a59fca9a3a2f95537686d7b (patch) | |
| tree | 193bccc70d942c8b70cc93fae2670263701e43aa /mapr | |
| parent | 3755a9911ecb05886577095f2b8cc8b9e4066a3a (diff) | |
Move commands to cmd/ and move internal dependencies to internal/
Diffstat (limited to 'mapr')
| -rw-r--r-- | mapr/aggregateset.go | 185 | ||||
| -rw-r--r-- | mapr/client/aggregate.go | 100 | ||||
| -rw-r--r-- | mapr/globalgroupset.go | 100 | ||||
| -rw-r--r-- | mapr/groupset.go | 178 | ||||
| -rw-r--r-- | mapr/logformat/default.go | 23 | ||||
| -rw-r--r-- | mapr/logformat/default_test.go | 35 | ||||
| -rw-r--r-- | mapr/logformat/parser.go | 75 | ||||
| -rw-r--r-- | mapr/query.go | 245 | ||||
| -rw-r--r-- | mapr/query_test.go | 149 | ||||
| -rw-r--r-- | mapr/selectcondition.go | 96 | ||||
| -rw-r--r-- | mapr/server/aggregate.go | 170 | ||||
| -rw-r--r-- | mapr/token.go | 108 | ||||
| -rw-r--r-- | mapr/wherecondition.go | 193 |
13 files changed, 0 insertions, 1657 deletions
diff --git a/mapr/aggregateset.go b/mapr/aggregateset.go deleted file mode 100644 index 2096c3c..0000000 --- a/mapr/aggregateset.go +++ /dev/null @@ -1,185 +0,0 @@ -package mapr - -import ( - "fmt" - "strconv" - "strings" -) - -// AggregateSet represents aggregated key/value pairs from the -// MAPREDUCE log lines. These could be either string values or float -// values. -type AggregateSet struct { - Samples int - FValues map[string]float64 - SValues map[string]string -} - -// NewAggregateSet creates a new empty aggregate set. -func NewAggregateSet() *AggregateSet { - return &AggregateSet{ - FValues: make(map[string]float64), - SValues: make(map[string]string), - } -} - -// String representation of aggregate set. -func (s *AggregateSet) String() string { - return fmt.Sprintf("AggregateSet(Samples:%d,FValues:%v,SValues:%v)", - s.Samples, s.FValues, s.SValues) -} - -// Merge one aggregate set into this one. -func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { - s.Samples += set.Samples - //logger.Trace("Merge", set) - - for _, sc := range query.Select { - storage := sc.FieldStorage - switch sc.Operation { - case Count: - fallthrough - case Sum: - fallthrough - case Avg: - value := set.FValues[storage] - s.addFloat(storage, value) - case Min: - value := set.FValues[storage] - s.addFloatMin(storage, value) - case Max: - value := set.FValues[storage] - s.addFloatMax(storage, value) - case Last: - value := set.SValues[storage] - s.setString(storage, value) - case Len: - s.setString(storage, set.SValues[storage]) - s.setFloat(storage, set.FValues[storage]) - default: - return fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) - } - } - return nil -} - -// Serialize the aggregate set so it can be sent over the wire. -func (s *AggregateSet) Serialize(groupKey string, ch chan<- string, stop chan struct{}) { - //logger.Trace("Serialising mapr.AggregateSet", s) - var sb strings.Builder - - sb.WriteString(groupKey) - sb.WriteString("|") - sb.WriteString(fmt.Sprintf("%d|", s.Samples)) - - for k, v := range s.FValues { - sb.WriteString(k) - sb.WriteString("=") - sb.WriteString(fmt.Sprintf("%v|", v)) - } - - for k, v := range s.SValues { - sb.WriteString(k) - sb.WriteString("=") - sb.WriteString(v) - sb.WriteString("|") - } - - select { - case ch <- sb.String(): - case <-stop: - } -} - -// Add a float value. -func (s *AggregateSet) addFloat(key string, value float64) { - if _, ok := s.FValues[key]; !ok { - s.FValues[key] = value - return - } - s.FValues[key] += value -} - -// Add a float minimum value. -func (s *AggregateSet) addFloatMin(key string, value float64) { - f, ok := s.FValues[key] - if !ok { - s.FValues[key] = value - return - } - - if f > value { - s.FValues[key] = value - } -} - -// Add a float maximum value. -func (s *AggregateSet) addFloatMax(key string, value float64) { - f, ok := s.FValues[key] - if !ok { - s.FValues[key] = value - return - } - - if f < value { - s.FValues[key] = value - } -} - -// Set a string. -func (s *AggregateSet) setString(key, value string) { - s.SValues[key] = value -} - -// Set a float. -func (s *AggregateSet) setFloat(key string, value float64) { - s.FValues[key] = value -} - -// Aggregate data to the aggregate set. -func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) { - var f float64 - - // First check if we can aggregate anything without converting value to float. - switch agg { - case Count: - if clientAggregation { - f, err = strconv.ParseFloat(value, 64) - if err != nil { - return - } - s.addFloat(key, f) - return - } - s.addFloat(key, 1) - return - case Last: - s.setString(key, value) - return - case Len: - s.setString(key, value) - s.setFloat(key, float64(len(value))) - return - default: - } - - // No, we have to convert to float. - f, err = strconv.ParseFloat(value, 64) - if err != nil { - return - } - - switch agg { - case Sum: - fallthrough - case Avg: - s.addFloat(key, f) - case Min: - s.addFloatMin(key, f) - case Max: - s.addFloatMax(key, f) - default: - err = fmt.Errorf("Unknown aggregation method '%v'", agg) - } - return -} diff --git a/mapr/client/aggregate.go b/mapr/client/aggregate.go deleted file mode 100644 index b9443bc..0000000 --- a/mapr/client/aggregate.go +++ /dev/null @@ -1,100 +0,0 @@ -package client - -import ( - "dtail/logger" - "dtail/mapr" - "strconv" - "strings" -) - -// Aggregate mapreduce data on the DTail client side. -type Aggregate struct { - // This is the mapr query specified on the command line. - query *mapr.Query - // This represents aggregated data of a single remote server. - group *mapr.GroupSet - // This represents the merged aggregated data of all servers. - globalGroup *mapr.GlobalGroupSet - stop chan struct{} - // The server we aggregate the data for (logging and debugging purposes only) - server string -} - -// NewAggregate create new client aggregator. -func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate { - return &Aggregate{ - query: query, - group: mapr.NewGroupSet(), - globalGroup: globalGroup, - stop: make(chan struct{}), - server: server, - } -} - -// Aggregate data from mapr log line into local (and global) group sets. -func (a *Aggregate) Aggregate(parts []string) { - select { - case <-a.stop: - logger.Error("Client aggregator stopped for server, not processing new data", a.server) - return - default: - } - - groupKey := parts[0] - samples, err := strconv.Atoi(parts[1]) - if err != nil { - logger.FatalExit(parts, err) - } - fields := a.makeFields(parts[2:]) - set := a.group.GetSet(groupKey) - - var addedSamples bool - for _, sc := range a.query.Select { - if val, ok := fields[sc.FieldStorage]; ok { - if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil { - logger.Error(err) - continue - } - addedSamples = true - } - } - if addedSamples { - set.Samples += samples - } - - // Merge data from group into global group. - isMerged, err := a.globalGroup.MergeNoblock(a.query, a.group) - if err != nil { - panic(err) - } - if isMerged { - // Re-init local group (make it empty again). - a.group.InitSet() - } -} - -// Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. -func (a *Aggregate) makeFields(parts []string) map[string]string { - fields := make(map[string]string, len(parts)) - - for _, part := range parts { - kv := strings.Split(part, "=") - if len(kv) != 2 { - continue - } - fields[kv[0]] = kv[1] - } - - return fields -} - -// Stop the client side mapreduce aggregator. -func (a *Aggregate) Stop() { - logger.Debug("Stopping client mapreduce aggregator") - close(a.stop) - - err := a.globalGroup.Merge(a.query, a.group) - if err != nil { - panic(err) - } -} diff --git a/mapr/globalgroupset.go b/mapr/globalgroupset.go deleted file mode 100644 index cfab506..0000000 --- a/mapr/globalgroupset.go +++ /dev/null @@ -1,100 +0,0 @@ -package mapr - -import ( - "fmt" -) - -// GlobalGroupSet is used on the dtail client to merge multiple group sets -// (one group set per remote server) to one single global group set. -type GlobalGroupSet struct { - GroupSet - semaphore chan struct{} -} - -// NewGlobalGroupSet creates a new empty global group set. -func NewGlobalGroupSet() *GlobalGroupSet { - g := GlobalGroupSet{ - semaphore: make(chan struct{}, 1), - } - g.InitSet() - - return &g -} - -// String representation of the global group set. -func (g *GlobalGroupSet) String() string { - return fmt.Sprintf("GlobalGroupSet(%s)", g.GroupSet.String()) -} - -// Merge (blocking) a group set into the global group set. -func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error { - g.semaphore <- struct{}{} - defer func() { <-g.semaphore }() - - return g.merge(query, group) -} - -// MergeNoblock merges (non-blocking) a group set into the global group set. -func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, error) { - select { - case g.semaphore <- struct{}{}: - err := g.merge(query, group) - <-g.semaphore - return true, err - default: - return false, nil - } -} - -// Merge a group set into the global group set. -func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error { - for groupKey, set := range group.sets { - s := g.GetSet(groupKey) - if err := s.Merge(query, set); err != nil { - return err - } - } - - return nil -} - -// IsEmpty determines whether the global group set has any data in it. -func (g *GlobalGroupSet) IsEmpty() bool { - return g.NumSets() == 0 -} - -// NumSets determines the number of sets. -func (g *GlobalGroupSet) NumSets() int { - g.semaphore <- struct{}{} - defer func() { <-g.semaphore }() - - return len(g.sets) -} - -// SwapOut teturn the underlying group set and create a new empty one, so -// that the global group set is empty again and can aggregate new data. -func (g *GlobalGroupSet) SwapOut() *GroupSet { - g.semaphore <- struct{}{} - defer func() { <-g.semaphore }() - - set := &GroupSet{sets: g.sets} - g.InitSet() - - return set -} - -// WriteResult writes the result of a mapreduce aggregation to an outfile. -func (g *GlobalGroupSet) WriteResult(query *Query) error { - g.semaphore <- struct{}{} - defer func() { <-g.semaphore }() - - return g.GroupSet.WriteResult(query) -} - -// Result returns the result of the mapreduce aggregation as a string. -func (g *GlobalGroupSet) Result(query *Query) (string, int, error) { - g.semaphore <- struct{}{} - defer func() { <-g.semaphore }() - - return g.GroupSet.Result(query) -} diff --git a/mapr/groupset.go b/mapr/groupset.go deleted file mode 100644 index d8f9379..0000000 --- a/mapr/groupset.go +++ /dev/null @@ -1,178 +0,0 @@ -package mapr - -import ( - "errors" - "fmt" - "io/ioutil" - "sort" - "strconv" - "strings" -) - -// GroupSet represents a map of aggregate sets. The group sets -// are requierd by the "group by" mapr clause, whereas the -// group set map keys are the values of the "group by" arguments. -// E.g. "group by $cid" would create one aggregate set and one map -// entry per customer id. -type GroupSet struct { - sets map[string]*AggregateSet -} - -// NewGroupSet returns a new empty group set. -func NewGroupSet() *GroupSet { - g := GroupSet{} - g.InitSet() - return &g -} - -// String representation of the group set. -func (g *GroupSet) String() string { - return fmt.Sprintf("GroupSet(%v)", g.sets) -} - -// InitSet makes the group set empty (initialize). -func (g *GroupSet) InitSet() { - g.sets = make(map[string]*AggregateSet) -} - -// GetSet gets a specific aggregate set from the group set. -func (g *GroupSet) GetSet(groupKey string) *AggregateSet { - set, ok := g.sets[groupKey] - if !ok { - set = NewAggregateSet() - g.sets[groupKey] = set - } - return set -} - -// Serialize the group set (e.g. to send it over the wire). -func (g *GroupSet) Serialize(ch chan<- string, stop chan struct{}) { - for groupKey, set := range g.sets { - set.Serialize(groupKey, ch, stop) - } -} - -// Result returns a nicely formated result of the query from the group set. -func (g *GroupSet) Result(query *Query) (string, int, error) { - return g.limitedResult(query, query.Limit, "\t", " ", false) -} - -// WriteResult writes the result to an outfile. -func (g *GroupSet) WriteResult(query *Query) error { - if query.Outfile == "" { - return errors.New("No outfile specified") - } - - // -1: Don't limit the result, include all data sets - result, _, err := g.limitedResult(query, -1, "", ",", true) - if err != nil { - return err - } - - return ioutil.WriteFile(query.Outfile, []byte(result), 0644) -} - -// Return a nicely formated result of the query from the group set. -func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) { - type result struct { - groupKey string - resultStr string - orderBy float64 - } - - var resultSlice []result - - for groupKey, set := range g.sets { - var sb strings.Builder - r := result{groupKey: groupKey} - - lastIndex := len(query.Select) - 1 - for i, sc := range query.Select { - storage := sc.FieldStorage - orderByThis := storage == query.OrderBy - - switch sc.Operation { - case Count: - value := set.FValues[storage] - sb.WriteString(fmt.Sprintf("%d", int(value))) - if orderByThis { - r.orderBy = value - } - case Len: - fallthrough - case Sum: - fallthrough - case Min: - fallthrough - case Max: - value := set.FValues[storage] - sb.WriteString(fmt.Sprintf("%f", value)) - if orderByThis { - r.orderBy = value - } - case Last: - value := set.SValues[storage] - if orderByThis { - f, err := strconv.ParseFloat(value, 64) - if err == nil { - r.orderBy = f - } - } - sb.WriteString(value) - case Avg: - value := set.FValues[storage] / float64(set.Samples) - sb.WriteString(fmt.Sprintf("%f", value)) - if orderByThis { - r.orderBy = value - } - default: - return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) - } - if i != lastIndex { - sb.WriteString(fieldSeparator) - } - } - - r.resultStr = sb.String() - resultSlice = append(resultSlice, r) - } - - if query.OrderBy != "" { - if query.ReverseOrder { - sort.SliceStable(resultSlice, func(i, j int) bool { - return resultSlice[i].orderBy < resultSlice[j].orderBy - }) - } else { - sort.SliceStable(resultSlice, func(i, j int) bool { - return resultSlice[i].orderBy > resultSlice[j].orderBy - }) - } - } - - var sb strings.Builder - - // Write header first - if addHeader { - lastIndex := len(query.Select) - 1 - sb.WriteString(lineStarter) - for i, sc := range query.Select { - sb.WriteString(sc.FieldStorage) - if i != lastIndex { - sb.WriteString(fieldSeparator) - } - } - sb.WriteString("\n") - } - - // And now write the data - for i, r := range resultSlice { - if i == limit { - break - } - sb.WriteString(lineStarter) - sb.WriteString(r.resultStr) - sb.WriteString("\n") - } - - return sb.String(), len(resultSlice), nil -} diff --git a/mapr/logformat/default.go b/mapr/logformat/default.go deleted file mode 100644 index f0df5bc..0000000 --- a/mapr/logformat/default.go +++ /dev/null @@ -1,23 +0,0 @@ -package logformat - -import ( - "errors" - "strings" -) - -// MakeFieldsDEFAULT is the default log file mapreduce parser. -func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { - fields := make(map[string]string, 20) - splitted := strings.Split(maprLine, "|") - - fields["$hostname"] = p.hostname - - for _, kv := range splitted { - keyAndValue := strings.SplitN(kv, "=", 2) - if len(keyAndValue) != 2 { - return fields, errors.New("Error parsing mapr token: " + kv) - } - fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] - } - return fields, nil -} diff --git a/mapr/logformat/default_test.go b/mapr/logformat/default_test.go deleted file mode 100644 index a3c47fb..0000000 --- a/mapr/logformat/default_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package logformat - -import ( - "testing" -) - -func TestDefaultLogFormat(t *testing.T) { - parser, err := NewParser("default") - if err != nil { - t.Errorf("Unable to create parser: %s", err.Error()) - } - - fields, err := parser.MakeFields("foo=bar|baz=bay") - - if err != nil { - t.Errorf("Unable to parse: %s", err.Error()) - } - - if bar, ok := fields["foo"]; !ok { - t.Errorf("Expected field 'foo', but no such field there\n") - } else if bar != "bar" { - t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar) - } - - if bay, ok := fields["baz"]; !ok { - t.Errorf("Expected field 'baz', but no such field there\n") - } else if bay != "bay" { - t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay) - } - - fields, err = parser.MakeFields("foo=bar|bazbay") - if err == nil { - t.Errorf("Expected error but didn't: %s", err.Error()) - } -} diff --git a/mapr/logformat/parser.go b/mapr/logformat/parser.go deleted file mode 100644 index b7c8c5c..0000000 --- a/mapr/logformat/parser.go +++ /dev/null @@ -1,75 +0,0 @@ -package logformat - -import ( - "dtail/logger" - "errors" - "fmt" - "os" - "reflect" - "strings" -) - -// Parser is used to parse the mapreduce information from the server log files. -type Parser struct { - hostname string - logFormatName string - makeFieldsFunc reflect.Value - makeFieldsReceiver reflect.Value -} - -// NewParser returns a new log parser. -func NewParser(logFormatName string) (*Parser, error) { - hostname, err := os.Hostname() - - if err != nil { - return nil, err - } - - p := Parser{ - hostname: hostname, - } - - err = p.reflectLogFormat(logFormatName) - if err != nil { - return nil, err - } - - return &p, nil -} - -// The aim of this is that everyone can plug in their own mapr log format -// parsing method to DTail. Just add a method MakeFieldsMODULENAME to type -// Parser. Whereas MODULENAME must be a upeprcase string. -func (p *Parser) reflectLogFormat(logFormatName string) error { - methodName := fmt.Sprintf("MakeFields%s", strings.ToUpper(logFormatName)) - - rt := reflect.TypeOf(p) - method, ok := rt.MethodByName(methodName) - if !ok { - return errors.New("No such mapr log format module: " + methodName) - } - - p.makeFieldsFunc = method.Func - p.makeFieldsReceiver = reflect.ValueOf(p) - - return nil -} - -// MakeFields is for returning the fields from a given log line. -func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err error) { - inputValues := []reflect.Value{p.makeFieldsReceiver, reflect.ValueOf(maprLine)} - returnValues := p.makeFieldsFunc.Call(inputValues) - - errInterface := returnValues[1].Interface() - - if errInterface == nil { - fields, err = returnValues[0].Interface().(map[string]string), nil - logger.Trace("parser.MakeFields", fields, err) - return - } - - fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error) - logger.Trace("parser.MakeFields", fields, err) - - return -} diff --git a/mapr/query.go b/mapr/query.go deleted file mode 100644 index 8ed3c67..0000000 --- a/mapr/query.go +++ /dev/null @@ -1,245 +0,0 @@ -package mapr - -import ( - "dtail/logger" - "errors" - "fmt" - "strconv" - "strings" - "time" -) - -const ( - invalidQuery string = "Invalid query: " - unexpectedEnd string = "Unexpected end of query" -) - -// Query represents a parsed mapr query. -type Query struct { - Select []selectCondition - Table string - Where []whereCondition - GroupBy []string - OrderBy string - ReverseOrder bool - GroupKey string - Interval time.Duration - Limit int - Outfile string - RawQuery string - tokens []token -} - -func (q Query) String() string { - return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,GroupBy:%v,GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,RawQuery:%s,tokens:%v)", - q.Select, - q.Table, - q.Where, - q.GroupBy, - q.GroupKey, - q.OrderBy, - q.ReverseOrder, - q.Interval, - q.Limit, - q.Outfile, - q.RawQuery, - q.tokens) -} - -// NewQuery returns a new mapreduce query. -func NewQuery(queryStr string) (*Query, error) { - if queryStr == "" { - return nil, nil - } - - tokens := tokenize(queryStr) - - q := Query{ - RawQuery: queryStr, - tokens: tokens, - Interval: time.Second * 5, - Limit: -1, - } - - err := q.parse(tokens) - - logger.Debug(q) - return &q, err -} - -func (q *Query) parse(tokens []token) error { - var found []token - var err error - - for tokens != nil && len(tokens) > 0 { - switch strings.ToLower(tokens[0].str) { - case "select": - tokens, found = tokensConsume(tokens[1:]) - q.Select, err = makeSelectConditions(found) - if err != nil { - return err - } - case "from": - tokens, found = tokensConsume(tokens[1:]) - if len(found) > 0 { - q.Table = strings.ToUpper(found[0].str) - } - case "where": - tokens, found = tokensConsume(tokens[1:]) - if q.Where, err = makeWhereConditions(found); err != nil { - return err - } - case "group": - tokens = tokensConsumeOptional(tokens[1:], "by") - if tokens == nil || len(tokens) < 1 { - return errors.New(invalidQuery + unexpectedEnd) - } - tokens, q.GroupBy = tokensConsumeStr(tokens) - q.GroupKey = strings.Join(q.GroupBy, ",") - case "rorder": - tokens = tokensConsumeOptional(tokens[1:], "by") - if tokens == nil || len(tokens) < 1 { - return errors.New(invalidQuery + unexpectedEnd) - } - tokens, found = tokensConsume(tokens) - if len(found) == 0 { - return errors.New(invalidQuery + unexpectedEnd) - } - q.OrderBy = found[0].str - q.ReverseOrder = true - case "order": - tokens = tokensConsumeOptional(tokens[1:], "by") - if tokens == nil || len(tokens) < 1 { - return errors.New(invalidQuery + unexpectedEnd) - } - tokens, found = tokensConsume(tokens) - if len(found) == 0 { - return errors.New(invalidQuery + unexpectedEnd) - } - q.OrderBy = found[0].str - case "interval": - tokens, found = tokensConsume(tokens[1:]) - if len(found) > 0 { - i, err := strconv.Atoi(found[0].str) - if err != nil { - return errors.New(invalidQuery + err.Error()) - } - q.Interval = time.Second * time.Duration(i) - } - case "limit": - tokens, found = tokensConsume(tokens[1:]) - if len(found) == 0 { - return errors.New(invalidQuery + unexpectedEnd) - } - i, err := strconv.Atoi(found[0].str) - if err != nil { - return errors.New(invalidQuery + err.Error()) - } - q.Limit = i - case "outfile": - tokens, found = tokensConsume(tokens[1:]) - if len(found) == 0 { - return errors.New(invalidQuery + unexpectedEnd) - } - q.Outfile = found[0].str - default: - return errors.New(invalidQuery + "Unexpected keyword " + tokens[0].str) - } - } - - if q.Table == "" { - return errors.New(invalidQuery + "Empty table specified in 'from' clause") - } - if len(q.Select) < 1 { - return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none") - } - if len(q.GroupBy) == 0 { - field := q.Select[0].Field - q.GroupBy = append(q.GroupBy, field) - } - - if q.OrderBy != "" { - var orderFieldIsValid bool - for _, sc := range q.Select { - if q.OrderBy == sc.FieldStorage { - orderFieldIsValid = true - break - } - } - if !orderFieldIsValid { - return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s', must be present in 'select' clause", q.OrderBy)) - } - } - - return nil -} - -// WhereClause interprets the where clause of the mapreduce query. -func (q *Query) WhereClause(fields map[string]string) bool { - floatValue := func(str string, float float64, t whereType) (float64, bool) { - switch t { - case Float: - return float, true - case Field: - value, ok := fields[str] - if !ok { - return 0, false - } - f, err := strconv.ParseFloat(value, 64) - if err != nil { - return 0, false - } - return f, true - default: - logger.Error("Unexpected argument in 'where' clause", str, float, t) - return 0, false - } - } - - stringValue := func(str string, t whereType) (string, bool) { - switch t { - case Field: - value, ok := fields[str] - if !ok { - return str, false - } - return value, true - case String: - return str, true - default: - logger.Error("Unexpected argument in 'where' clause", str, t) - return str, false - } - } - - for _, wc := range q.Where { - var ok bool - - if wc.Operation > FloatOperation { - var lValue, rValue float64 - if lValue, ok = floatValue(wc.lString, wc.lFloat, wc.lType); !ok { - return false - } - if rValue, ok = floatValue(wc.rString, wc.rFloat, wc.rType); !ok { - return false - } - if ok = wc.floatClause(lValue, rValue); !ok { - return false - } - continue - } - - var lValue, rValue string - if lValue, ok = stringValue(wc.lString, wc.lType); !ok { - return false - } - if rValue, ok = stringValue(wc.rString, wc.rType); !ok { - return false - } - if ok = wc.stringClause(lValue, rValue); !ok { - return false - } - } - - return true -} diff --git a/mapr/query_test.go b/mapr/query_test.go deleted file mode 100644 index 6176461..0000000 --- a/mapr/query_test.go +++ /dev/null @@ -1,149 +0,0 @@ -package mapr - -import ( - "testing" - "time" -) - -func TestParseQuerySimple(t *testing.T) { - errorQueries := []string{ - "select", - "select foo", - "select foo from", - "select foo from bar where baz", - "select foo from bar where baz <", - "select foo from bar where baz < 100 bay eq 12 group", - "select foo from bar where baz < 100 bay eq 12 group by foo order by", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit", - } - okQueries := []string{"select foo from bar", - "select foo from bar where", - "select foo from bar where baz < 100 bay eq 12", - "select foo from bar where baz < 100, bay eq 12", - "select foo from bar where baz < 100 and bay eq 12", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23", - "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\"", - } - - for _, queryStr := range errorQueries { - q, err := NewQuery(queryStr) - if err == nil { - t.Errorf("Expected a parse error: %s\n%v", queryStr, q) - continue - } - } - - for _, queryStr := range okQueries { - _, err := NewQuery(queryStr) - if err != nil { - t.Errorf("%s: %s", err.Error(), queryStr) - continue - } - } -} - -func TestParseQueryDeep(t *testing.T) { - dialects := []string{ - "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23", - "SELECT s1, `from` COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23", - "select s1, `from` count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23", - "sElEct s1, `from` coUnt(s3) from taBle where w1 == 2 aNd w2 eq \"free beer\" Group By g1, g2 order bY count(s3) intervaL 10 LiMiT 23", - "SELECT s1 `from` COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP BY g1 g2 ORDER BY count(s3) INTERVAL 10 LIMIT 23", - "select s1 `from` count(s3) from table where w1 == 2 w2 eq \"free beer\" group g1 g2 order count(s3) interval 10 limit 23", - "limit 23 interval 10 order count(s3) group g1 g2 where w1 == 2 w2 eq \"free beer\" from table select s1 `from` count(s3)", - } - - for _, queryStr := range dialects { - q, err := NewQuery(queryStr) - if err != nil { - t.Errorf("%s: %s", err.Error(), queryStr) - } - - // 'select' clause - if len(q.Select) != 3 { - t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", q.Select, queryStr, q) - } - - if q.Select[0].Field != "s1" { - t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Field, queryStr, q) - } - if q.Select[0].Operation != Last { - t.Errorf("Expected 'last' as aggregation function of first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q) - } - - if q.Select[1].Field != "from" { - t.Errorf("Expected 'from' as second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Field, queryStr, q) - } - if q.Select[1].Operation != Last { - t.Errorf("Expected 'last' as aggregation function of second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q) - } - - if q.Select[2].Field != "s3" { - t.Errorf("Expected 's3' as third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Field, queryStr, q) - } - if q.Select[2].Operation != Count { - t.Errorf("Expected 'count' as aggregation function of third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q) - } - if q.Select[2].FieldStorage != "count(s3)" { - t.Errorf("Expected 'count(s3)' as third element's storage in 'select' clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q) - } - - // 'from' clause - if q.Table != "TABLE" { - t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", q.Table, queryStr, q) - } - - // 'where' clause - if len(q.Where) != 2 { - t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", q.Where, queryStr, q) - } - if q.Where[0].lString != "w1" { - t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", q.Where[0].lString, queryStr, q) - } - if q.Where[0].Operation != FloatEq { - t.Errorf("Expected FloatEq operation in first 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q) - } - if q.Where[0].rFloat != 2 { - t.Errorf("Expected '2' as float argument in first 'where' condition but got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q) - } - if q.Where[1].lString != "w2" { - t.Errorf("Expected w2 as second element in 'where' clause but got '%v': %s\n%v", q.Where[1].lString, queryStr, q) - } - if q.Where[1].Operation != StringEq { - t.Errorf("Expected StringEq operation in second 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q) - } - if q.Where[1].rString != "free beer" { - t.Errorf("Expected 'free beer' as string argument in second 'where' condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q) - } - - // 'group by' clause - if len(q.GroupBy) != 2 { - t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", q.GroupBy, queryStr, q) - } - if q.GroupBy[0] != "g1" { - t.Errorf("Expected 'g1' as first element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[0], queryStr, q) - } - if q.GroupBy[1] != "g2" { - t.Errorf("Expected 'g2' as second element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[1], queryStr, q) - } - if q.GroupKey != "g1,g2" { - t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got '%v': %s\n%v", q.GroupKey, queryStr, q) - } - - // 'order by' clause - if q.OrderBy != "count(s3)" { - t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got '%v': %s\n%v", q.OrderBy, queryStr, q) - } - - // 'interval' clause - if q.Interval != time.Second*time.Duration(10) { - t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", q.Interval, queryStr, q) - } - - // 'limit' clause - if q.Limit != 23 { - t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", q.Limit, queryStr, q) - } - } -} diff --git a/mapr/selectcondition.go b/mapr/selectcondition.go deleted file mode 100644 index 1882b7e..0000000 --- a/mapr/selectcondition.go +++ /dev/null @@ -1,96 +0,0 @@ -package mapr - -import ( - "errors" - "fmt" - "strings" -) - -// AggregateOperation is to specify the aggregate operation type. -type AggregateOperation int - -// Aggregate operation types -const ( - UndefAggregateOperation AggregateOperation = iota - Count AggregateOperation = iota - Sum AggregateOperation = iota - Min AggregateOperation = iota - Max AggregateOperation = iota - Last AggregateOperation = iota - Avg AggregateOperation = iota - Len AggregateOperation = iota -) - -// Represents a parsed "select" clause, used by mapr.Query. -type selectCondition struct { - Field string - FieldStorage string - Operation AggregateOperation -} - -func (sc selectCondition) String() string { - return fmt.Sprintf("selectCondition(Field:%s,FieldStorage:%s,Operation:%v)", - sc.Field, - sc.FieldStorage, - sc.Operation) -} - -func makeSelectConditions(tokens []token) ([]selectCondition, error) { - var sel []selectCondition - - // Parse select aggregation, e.g. sum(foo) - parse := func(token token) (selectCondition, error) { - var sc selectCondition - tokenStr := strings.ToLower(token.str) - - if !strings.Contains(tokenStr, "(") && !strings.Contains(tokenStr, ")") { - sc.Field = tokenStr - sc.FieldStorage = tokenStr - sc.Operation = Last - return sc, nil - } - - a := strings.Split(tokenStr, "(") - if len(a) != 2 { - return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + token.str) - } - agg := a[0] // Aggregation, e.g. 'sum' - - b := strings.Split(a[1], ")") - if len(b) != 2 { - return sc, errors.New(invalidQuery + "Can't parse 'select' field name from aggregation: " + token.str) - } - sc.Field = b[0] // Field name, e.g. 'foo' - sc.FieldStorage = tokenStr // e.g. 'sum(foo)' - - switch agg { - case "count": - sc.Operation = Count - case "sum": - sc.Operation = Sum - case "min": - sc.Operation = Min - case "max": - sc.Operation = Max - case "last": - sc.Operation = Last - case "avg": - sc.Operation = Avg - case "len": - sc.Operation = Len - default: - return sc, errors.New(invalidQuery + "Unknown aggregation in 'select' clause: " + agg) - } - - return sc, nil - } - - for _, token := range tokens { - sc, err := parse(token) - if err != nil { - return nil, err - } - sel = append(sel, sc) - } - return sel, nil -} diff --git a/mapr/server/aggregate.go b/mapr/server/aggregate.go deleted file mode 100644 index 316da67..0000000 --- a/mapr/server/aggregate.go +++ /dev/null @@ -1,170 +0,0 @@ -package server - -import ( - "dtail/config" - "dtail/fs" - "dtail/logger" - "dtail/mapr" - "dtail/mapr/logformat" - "os" - "strings" - "time" -) - -// Aggregate is for aggregating mapreduce data on the DTail server side. -type Aggregate struct { - // Log lines to process (parsing MAPREDUCE lines). - Lines chan fs.LineRead - // Hostname of the current server (used to populate $hostname field). - hostname string - // Signals to exit goroutine. - stop chan struct{} - // Signals to serialize data. - serialize chan struct{} - // The mapr query - query *mapr.Query - // The mapr log format parser - parser *logformat.Parser -} - -// NewAggregate return a new server side aggregator. -func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) { - query, err := mapr.NewQuery(queryStr) - if err != nil { - return nil, err - } - - fqdn, err := os.Hostname() - if err != nil { - logger.Error(err) - } - s := strings.Split(fqdn, ".") - - logger.Info("Creating mapr log format parser", config.Server.MapreduceLogFormat) - logParser, err := logformat.NewParser(config.Server.MapreduceLogFormat) - if err != nil { - logger.FatalExit("Could not create mapr log format parser", err) - } - - a := Aggregate{ - Lines: make(chan fs.LineRead, 100), - stop: make(chan struct{}), - serialize: make(chan struct{}), - hostname: s[0], - query: query, - parser: logParser, - } - - go a.periodicAggregateTimer() - - fieldsCh := make(chan map[string]string) - go a.readFields(fieldsCh, maprLines) - go a.readLines(fieldsCh) - - return &a, nil -} - -func (a *Aggregate) periodicAggregateTimer() { - for { - select { - case <-time.After(a.query.Interval): - a.Serialize() - case <-a.stop: - return - } - } -} - -func (a *Aggregate) readFields(fieldsCh <-chan map[string]string, maprLines chan<- string) { - group := mapr.NewGroupSet() - - for { - select { - case fields := <-fieldsCh: - a.aggregate(group, fields) - case <-a.serialize: - logger.Info("Serializing mapreduce result") - group.Serialize(maprLines, a.stop) - logger.Info("Done serializing mapreduce result") - group = mapr.NewGroupSet() - case <-a.stop: - return - } - } -} - -func (a *Aggregate) readLines(fieldsCh chan<- map[string]string) { - for { - select { - case line, ok := <-a.Lines: - if !ok { - return - } - - maprLine := strings.TrimSpace(string(line.Content)) - fields, err := a.parser.MakeFields(maprLine) - - if err != nil { - logger.Error(err) - continue - } - if !a.query.WhereClause(fields) { - continue - } - - select { - case fieldsCh <- fields: - case <-a.stop: - } - case <-a.stop: - return - } - } -} - -func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { - //logger.Trace("Aggregating", group, fields) - var sb strings.Builder - - for i, field := range a.query.GroupBy { - if i > 0 { - sb.WriteString(" ") - } - if val, ok := fields[field]; ok { - sb.WriteString(val) - } - } - groupKey := sb.String() - set := group.GetSet(groupKey) - - var addedSample bool - for _, sc := range a.query.Select { - if val, ok := fields[sc.Field]; ok { - if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { - logger.Error(err) - continue - } - addedSample = true - } - } - - if addedSample { - set.Samples++ - return - } - - logger.Trace("Aggregated data locally without adding new samples") -} - -// Serialize all the aggregated data. -func (a *Aggregate) Serialize() { - select { - case a.serialize <- struct{}{}: - case <-a.stop: - } -} - -// Close the aggregator. -func (a *Aggregate) Close() { - close(a.stop) -} diff --git a/mapr/token.go b/mapr/token.go deleted file mode 100644 index b8be4da..0000000 --- a/mapr/token.go +++ /dev/null @@ -1,108 +0,0 @@ -package mapr - -import ( - "strings" -) - -var keywords = [...]string{"select", "from", "where", "group", "rorder", "order", "interval", "limit", "outfile"} - -// Represents a parsed token, used to parse the mapr query. -type token struct { - str string - isBareword bool -} - -func (t token) isKeyword() bool { - if !t.isBareword { - return false - } - - for _, keyword := range keywords { - if strings.ToLower(t.str) == keyword { - return true - } - } - return false -} - -func (t token) String() string { - return t.str -} - -func tokenize(queryStr string) []token { - var tokens []token - - for i, part := range strings.Split(queryStr, "\"") { - // Even i, means that it is not a quoted string - if i%2 == 0 { - commasStripped := strings.Replace(part, ",", " ", -1) - for _, tokenStr := range strings.Fields(commasStripped) { - token := token{ - str: tokenStr, - isBareword: true, - } - tokens = append(tokens, token) - } - continue - } - // Add whole quoted string as a token - token := token{ - str: part, - isBareword: false, - } - tokens = append(tokens, token) - } - - return tokens -} - -func tokensConsume(tokens []token) ([]token, []token) { - //logger.Trace("=====================") - var consumed []token - - for i, t := range tokens { - if t.isKeyword() { - //logger.Trace("keyword", t) - return tokens[i:], consumed - } - // strip escapes, such as ` from `foo`, this allows to use keywords as field names - length := len(t.str) - if length == 0 { - continue - } - if t.str[0] == '`' && t.str[length-1] == '`' { - stripped := t.str[1 : length-1] - //logger.Trace("stripped", stripped) - t := token{ - str: stripped, - isBareword: t.isBareword, - } - consumed = append(consumed, t) - continue - } - //logger.Trace("bare", token) - consumed = append(consumed, t) - } - - //logger.Trace("result", consumed) - return nil, consumed -} - -func tokensConsumeStr(tokens []token) ([]token, []string) { - var strings []string - tokens, found := tokensConsume(tokens) - for _, token := range found { - strings = append(strings, token.str) - } - return tokens, strings -} - -func tokensConsumeOptional(tokens []token, optional string) []token { - if len(tokens) < 1 { - return tokens - } - if strings.ToLower(tokens[0].str) == strings.ToLower(optional) { - return tokens[1:] - } - return tokens -} diff --git a/mapr/wherecondition.go b/mapr/wherecondition.go deleted file mode 100644 index 515c8ad..0000000 --- a/mapr/wherecondition.go +++ /dev/null @@ -1,193 +0,0 @@ -package mapr - -import ( - "dtail/logger" - "errors" - "fmt" - "strconv" - "strings" -) - -// QueryOperation determines the mapreduce operation. -type QueryOperation int - -// The possible mapreduce operation.s -const ( - UndefQueryOperation QueryOperation = iota - StringEq QueryOperation = iota - StringNe QueryOperation = iota - StringContains QueryOperation = iota - FloatOperation QueryOperation = iota - FloatEq QueryOperation = iota - FloatNe QueryOperation = iota - FloatLt QueryOperation = iota - FloatLe QueryOperation = iota - FloatGt QueryOperation = iota - FloatGe QueryOperation = iota -) - -type whereType int - -// The possible field types. -const ( - UndefWhereType whereType = iota - Field whereType = iota - String whereType = iota - Float whereType = iota -) - -func (w whereType) String() string { - switch w { - case Field: - return fmt.Sprintf("Field") - case String: - return fmt.Sprintf("String") - case Float: - return fmt.Sprintf("Float") - default: - return fmt.Sprintf("UndefWhereType") - } -} - -// Represent a parsed "where" clause, used by mapr.Query -type whereCondition struct { - lString string - lFloat float64 - lType whereType - - Operation QueryOperation - - rString string - rFloat float64 - rType whereType -} - -func (wc *whereCondition) String() string { - return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,lType:%s,rString:%s,rFloat:%v,rType:%s)", - wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, wc.rFloat, wc.rType.String()) -} - -func makeWhereConditions(tokens []token) (where []whereCondition, err error) { - parse := func(tokens []token) (whereCondition, []token, error) { - var wc whereCondition - if len(tokens) < 3 { - return wc, nil, errors.New(invalidQuery + "Not enough arguments in 'where' clause") - } - - whereOp := strings.ToLower(tokens[1].str) - switch whereOp { - case "==": - wc.Operation = FloatEq - case "!=": - wc.Operation = FloatNe - case "<": - wc.Operation = FloatLt - case "<=": - wc.Operation = FloatLe - case "=<": - wc.Operation = FloatLe - case ">": - wc.Operation = FloatGt - case ">=": - wc.Operation = FloatGe - case "=>": - wc.Operation = FloatGe - case "eq": - wc.Operation = StringEq - case "ne": - wc.Operation = StringNe - case "contains": - wc.Operation = StringContains - default: - return wc, nil, errors.New(invalidQuery + "Unknown operation in 'where' clause: " + whereOp) - } - - wc.lString = tokens[0].str - wc.rString = tokens[2].str - - if wc.Operation > FloatOperation { - if !tokens[0].isBareword { - return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's lValue: " + tokens[0].str) - } - if f, err := strconv.ParseFloat(wc.lString, 64); err == nil { - wc.lFloat = f - wc.lType = Float - } else { - wc.lType = Field - } - - if !tokens[2].isBareword { - return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's rValue: " + tokens[2].str) - } - if f, err := strconv.ParseFloat(wc.rString, 64); err == nil { - wc.rFloat = f - wc.rType = Float - } else { - wc.rType = Field - } - return wc, tokens[3:], nil - } - - if tokens[0].isBareword { - wc.lType = Field - } else { - wc.lType = String - } - if tokens[2].isBareword { - wc.rType = Field - } else { - wc.rType = String - } - - return wc, tokens[3:], nil - } - - for len(tokens) > 0 { - var wc whereCondition - var err error - - wc, tokens, err = parse(tokens) - if err != nil { - return nil, err - } - - where = append(where, wc) - tokens = tokensConsumeOptional(tokens, "and") - } - - return -} - -func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool { - switch wc.Operation { - case FloatEq: - return lValue == rValue - case FloatNe: - return lValue != rValue - case FloatLt: - return lValue < rValue - case FloatLe: - return lValue <= rValue - case FloatGt: - return lValue > rValue - case FloatGe: - return lValue >= rValue - default: - logger.Error("Unknown float operation", lValue, wc.Operation, rValue) - } - return false -} - -func (wc *whereCondition) stringClause(lValue string, rValue string) bool { - switch wc.Operation { - case StringEq: - return lValue == rValue - case StringNe: - return lValue != rValue - case StringContains: - return strings.Contains(lValue, rValue) - default: - logger.Error("Unknown string operation", lValue, wc.Operation, rValue) - } - return false -} |
