diff options
Diffstat (limited to 'mapr')
| -rw-r--r-- | mapr/aggregateset.go | 185 | ||||
| -rw-r--r-- | mapr/client/aggregate.go | 100 | ||||
| -rw-r--r-- | mapr/globalgroupset.go | 100 | ||||
| -rw-r--r-- | mapr/groupset.go | 178 | ||||
| -rw-r--r-- | mapr/logformat/default.go | 23 | ||||
| -rw-r--r-- | mapr/logformat/default_test.go | 35 | ||||
| -rw-r--r-- | mapr/logformat/parser.go | 75 | ||||
| -rw-r--r-- | mapr/query.go | 245 | ||||
| -rw-r--r-- | mapr/query_test.go | 149 | ||||
| -rw-r--r-- | mapr/selectcondition.go | 96 | ||||
| -rw-r--r-- | mapr/server/aggregate.go | 170 | ||||
| -rw-r--r-- | mapr/token.go | 108 | ||||
| -rw-r--r-- | mapr/wherecondition.go | 193 |
13 files changed, 1657 insertions, 0 deletions
diff --git a/mapr/aggregateset.go b/mapr/aggregateset.go new file mode 100644 index 0000000..2096c3c --- /dev/null +++ b/mapr/aggregateset.go @@ -0,0 +1,185 @@ +package mapr + +import ( + "fmt" + "strconv" + "strings" +) + +// AggregateSet represents aggregated key/value pairs from the +// MAPREDUCE log lines. These could be either string values or float +// values. +type AggregateSet struct { + Samples int + FValues map[string]float64 + SValues map[string]string +} + +// NewAggregateSet creates a new empty aggregate set. +func NewAggregateSet() *AggregateSet { + return &AggregateSet{ + FValues: make(map[string]float64), + SValues: make(map[string]string), + } +} + +// String representation of aggregate set. +func (s *AggregateSet) String() string { + return fmt.Sprintf("AggregateSet(Samples:%d,FValues:%v,SValues:%v)", + s.Samples, s.FValues, s.SValues) +} + +// Merge one aggregate set into this one. +func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error { + s.Samples += set.Samples + //logger.Trace("Merge", set) + + for _, sc := range query.Select { + storage := sc.FieldStorage + switch sc.Operation { + case Count: + fallthrough + case Sum: + fallthrough + case Avg: + value := set.FValues[storage] + s.addFloat(storage, value) + case Min: + value := set.FValues[storage] + s.addFloatMin(storage, value) + case Max: + value := set.FValues[storage] + s.addFloatMax(storage, value) + case Last: + value := set.SValues[storage] + s.setString(storage, value) + case Len: + s.setString(storage, set.SValues[storage]) + s.setFloat(storage, set.FValues[storage]) + default: + return fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) + } + } + return nil +} + +// Serialize the aggregate set so it can be sent over the wire. +func (s *AggregateSet) Serialize(groupKey string, ch chan<- string, stop chan struct{}) { + //logger.Trace("Serialising mapr.AggregateSet", s) + var sb strings.Builder + + sb.WriteString(groupKey) + sb.WriteString("|") + sb.WriteString(fmt.Sprintf("%d|", s.Samples)) + + for k, v := range s.FValues { + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(fmt.Sprintf("%v|", v)) + } + + for k, v := range s.SValues { + sb.WriteString(k) + sb.WriteString("=") + sb.WriteString(v) + sb.WriteString("|") + } + + select { + case ch <- sb.String(): + case <-stop: + } +} + +// Add a float value. +func (s *AggregateSet) addFloat(key string, value float64) { + if _, ok := s.FValues[key]; !ok { + s.FValues[key] = value + return + } + s.FValues[key] += value +} + +// Add a float minimum value. +func (s *AggregateSet) addFloatMin(key string, value float64) { + f, ok := s.FValues[key] + if !ok { + s.FValues[key] = value + return + } + + if f > value { + s.FValues[key] = value + } +} + +// Add a float maximum value. +func (s *AggregateSet) addFloatMax(key string, value float64) { + f, ok := s.FValues[key] + if !ok { + s.FValues[key] = value + return + } + + if f < value { + s.FValues[key] = value + } +} + +// Set a string. +func (s *AggregateSet) setString(key, value string) { + s.SValues[key] = value +} + +// Set a float. +func (s *AggregateSet) setFloat(key string, value float64) { + s.FValues[key] = value +} + +// Aggregate data to the aggregate set. +func (s *AggregateSet) Aggregate(key string, agg AggregateOperation, value string, clientAggregation bool) (err error) { + var f float64 + + // First check if we can aggregate anything without converting value to float. + switch agg { + case Count: + if clientAggregation { + f, err = strconv.ParseFloat(value, 64) + if err != nil { + return + } + s.addFloat(key, f) + return + } + s.addFloat(key, 1) + return + case Last: + s.setString(key, value) + return + case Len: + s.setString(key, value) + s.setFloat(key, float64(len(value))) + return + default: + } + + // No, we have to convert to float. + f, err = strconv.ParseFloat(value, 64) + if err != nil { + return + } + + switch agg { + case Sum: + fallthrough + case Avg: + s.addFloat(key, f) + case Min: + s.addFloatMin(key, f) + case Max: + s.addFloatMax(key, f) + default: + err = fmt.Errorf("Unknown aggregation method '%v'", agg) + } + return +} diff --git a/mapr/client/aggregate.go b/mapr/client/aggregate.go new file mode 100644 index 0000000..b9443bc --- /dev/null +++ b/mapr/client/aggregate.go @@ -0,0 +1,100 @@ +package client + +import ( + "dtail/logger" + "dtail/mapr" + "strconv" + "strings" +) + +// Aggregate mapreduce data on the DTail client side. +type Aggregate struct { + // This is the mapr query specified on the command line. + query *mapr.Query + // This represents aggregated data of a single remote server. + group *mapr.GroupSet + // This represents the merged aggregated data of all servers. + globalGroup *mapr.GlobalGroupSet + stop chan struct{} + // The server we aggregate the data for (logging and debugging purposes only) + server string +} + +// NewAggregate create new client aggregator. +func NewAggregate(server string, query *mapr.Query, globalGroup *mapr.GlobalGroupSet) *Aggregate { + return &Aggregate{ + query: query, + group: mapr.NewGroupSet(), + globalGroup: globalGroup, + stop: make(chan struct{}), + server: server, + } +} + +// Aggregate data from mapr log line into local (and global) group sets. +func (a *Aggregate) Aggregate(parts []string) { + select { + case <-a.stop: + logger.Error("Client aggregator stopped for server, not processing new data", a.server) + return + default: + } + + groupKey := parts[0] + samples, err := strconv.Atoi(parts[1]) + if err != nil { + logger.FatalExit(parts, err) + } + fields := a.makeFields(parts[2:]) + set := a.group.GetSet(groupKey) + + var addedSamples bool + for _, sc := range a.query.Select { + if val, ok := fields[sc.FieldStorage]; ok { + if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil { + logger.Error(err) + continue + } + addedSamples = true + } + } + if addedSamples { + set.Samples += samples + } + + // Merge data from group into global group. + isMerged, err := a.globalGroup.MergeNoblock(a.query, a.group) + if err != nil { + panic(err) + } + if isMerged { + // Re-init local group (make it empty again). + a.group.InitSet() + } +} + +// Create a map of key-value pairs from a part list such as ["foo=bar", "bar=baz"]. +func (a *Aggregate) makeFields(parts []string) map[string]string { + fields := make(map[string]string, len(parts)) + + for _, part := range parts { + kv := strings.Split(part, "=") + if len(kv) != 2 { + continue + } + fields[kv[0]] = kv[1] + } + + return fields +} + +// Stop the client side mapreduce aggregator. +func (a *Aggregate) Stop() { + logger.Debug("Stopping client mapreduce aggregator") + close(a.stop) + + err := a.globalGroup.Merge(a.query, a.group) + if err != nil { + panic(err) + } +} diff --git a/mapr/globalgroupset.go b/mapr/globalgroupset.go new file mode 100644 index 0000000..cfab506 --- /dev/null +++ b/mapr/globalgroupset.go @@ -0,0 +1,100 @@ +package mapr + +import ( + "fmt" +) + +// GlobalGroupSet is used on the dtail client to merge multiple group sets +// (one group set per remote server) to one single global group set. +type GlobalGroupSet struct { + GroupSet + semaphore chan struct{} +} + +// NewGlobalGroupSet creates a new empty global group set. +func NewGlobalGroupSet() *GlobalGroupSet { + g := GlobalGroupSet{ + semaphore: make(chan struct{}, 1), + } + g.InitSet() + + return &g +} + +// String representation of the global group set. +func (g *GlobalGroupSet) String() string { + return fmt.Sprintf("GlobalGroupSet(%s)", g.GroupSet.String()) +} + +// Merge (blocking) a group set into the global group set. +func (g *GlobalGroupSet) Merge(query *Query, group *GroupSet) error { + g.semaphore <- struct{}{} + defer func() { <-g.semaphore }() + + return g.merge(query, group) +} + +// MergeNoblock merges (non-blocking) a group set into the global group set. +func (g *GlobalGroupSet) MergeNoblock(query *Query, group *GroupSet) (bool, error) { + select { + case g.semaphore <- struct{}{}: + err := g.merge(query, group) + <-g.semaphore + return true, err + default: + return false, nil + } +} + +// Merge a group set into the global group set. +func (g *GlobalGroupSet) merge(query *Query, group *GroupSet) error { + for groupKey, set := range group.sets { + s := g.GetSet(groupKey) + if err := s.Merge(query, set); err != nil { + return err + } + } + + return nil +} + +// IsEmpty determines whether the global group set has any data in it. +func (g *GlobalGroupSet) IsEmpty() bool { + return g.NumSets() == 0 +} + +// NumSets determines the number of sets. +func (g *GlobalGroupSet) NumSets() int { + g.semaphore <- struct{}{} + defer func() { <-g.semaphore }() + + return len(g.sets) +} + +// SwapOut teturn the underlying group set and create a new empty one, so +// that the global group set is empty again and can aggregate new data. +func (g *GlobalGroupSet) SwapOut() *GroupSet { + g.semaphore <- struct{}{} + defer func() { <-g.semaphore }() + + set := &GroupSet{sets: g.sets} + g.InitSet() + + return set +} + +// WriteResult writes the result of a mapreduce aggregation to an outfile. +func (g *GlobalGroupSet) WriteResult(query *Query) error { + g.semaphore <- struct{}{} + defer func() { <-g.semaphore }() + + return g.GroupSet.WriteResult(query) +} + +// Result returns the result of the mapreduce aggregation as a string. +func (g *GlobalGroupSet) Result(query *Query) (string, int, error) { + g.semaphore <- struct{}{} + defer func() { <-g.semaphore }() + + return g.GroupSet.Result(query) +} diff --git a/mapr/groupset.go b/mapr/groupset.go new file mode 100644 index 0000000..d8f9379 --- /dev/null +++ b/mapr/groupset.go @@ -0,0 +1,178 @@ +package mapr + +import ( + "errors" + "fmt" + "io/ioutil" + "sort" + "strconv" + "strings" +) + +// GroupSet represents a map of aggregate sets. The group sets +// are requierd by the "group by" mapr clause, whereas the +// group set map keys are the values of the "group by" arguments. +// E.g. "group by $cid" would create one aggregate set and one map +// entry per customer id. +type GroupSet struct { + sets map[string]*AggregateSet +} + +// NewGroupSet returns a new empty group set. +func NewGroupSet() *GroupSet { + g := GroupSet{} + g.InitSet() + return &g +} + +// String representation of the group set. +func (g *GroupSet) String() string { + return fmt.Sprintf("GroupSet(%v)", g.sets) +} + +// InitSet makes the group set empty (initialize). +func (g *GroupSet) InitSet() { + g.sets = make(map[string]*AggregateSet) +} + +// GetSet gets a specific aggregate set from the group set. +func (g *GroupSet) GetSet(groupKey string) *AggregateSet { + set, ok := g.sets[groupKey] + if !ok { + set = NewAggregateSet() + g.sets[groupKey] = set + } + return set +} + +// Serialize the group set (e.g. to send it over the wire). +func (g *GroupSet) Serialize(ch chan<- string, stop chan struct{}) { + for groupKey, set := range g.sets { + set.Serialize(groupKey, ch, stop) + } +} + +// Result returns a nicely formated result of the query from the group set. +func (g *GroupSet) Result(query *Query) (string, int, error) { + return g.limitedResult(query, query.Limit, "\t", " ", false) +} + +// WriteResult writes the result to an outfile. +func (g *GroupSet) WriteResult(query *Query) error { + if query.Outfile == "" { + return errors.New("No outfile specified") + } + + // -1: Don't limit the result, include all data sets + result, _, err := g.limitedResult(query, -1, "", ",", true) + if err != nil { + return err + } + + return ioutil.WriteFile(query.Outfile, []byte(result), 0644) +} + +// Return a nicely formated result of the query from the group set. +func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) { + type result struct { + groupKey string + resultStr string + orderBy float64 + } + + var resultSlice []result + + for groupKey, set := range g.sets { + var sb strings.Builder + r := result{groupKey: groupKey} + + lastIndex := len(query.Select) - 1 + for i, sc := range query.Select { + storage := sc.FieldStorage + orderByThis := storage == query.OrderBy + + switch sc.Operation { + case Count: + value := set.FValues[storage] + sb.WriteString(fmt.Sprintf("%d", int(value))) + if orderByThis { + r.orderBy = value + } + case Len: + fallthrough + case Sum: + fallthrough + case Min: + fallthrough + case Max: + value := set.FValues[storage] + sb.WriteString(fmt.Sprintf("%f", value)) + if orderByThis { + r.orderBy = value + } + case Last: + value := set.SValues[storage] + if orderByThis { + f, err := strconv.ParseFloat(value, 64) + if err == nil { + r.orderBy = f + } + } + sb.WriteString(value) + case Avg: + value := set.FValues[storage] / float64(set.Samples) + sb.WriteString(fmt.Sprintf("%f", value)) + if orderByThis { + r.orderBy = value + } + default: + return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation) + } + if i != lastIndex { + sb.WriteString(fieldSeparator) + } + } + + r.resultStr = sb.String() + resultSlice = append(resultSlice, r) + } + + if query.OrderBy != "" { + if query.ReverseOrder { + sort.SliceStable(resultSlice, func(i, j int) bool { + return resultSlice[i].orderBy < resultSlice[j].orderBy + }) + } else { + sort.SliceStable(resultSlice, func(i, j int) bool { + return resultSlice[i].orderBy > resultSlice[j].orderBy + }) + } + } + + var sb strings.Builder + + // Write header first + if addHeader { + lastIndex := len(query.Select) - 1 + sb.WriteString(lineStarter) + for i, sc := range query.Select { + sb.WriteString(sc.FieldStorage) + if i != lastIndex { + sb.WriteString(fieldSeparator) + } + } + sb.WriteString("\n") + } + + // And now write the data + for i, r := range resultSlice { + if i == limit { + break + } + sb.WriteString(lineStarter) + sb.WriteString(r.resultStr) + sb.WriteString("\n") + } + + return sb.String(), len(resultSlice), nil +} diff --git a/mapr/logformat/default.go b/mapr/logformat/default.go new file mode 100644 index 0000000..f0df5bc --- /dev/null +++ b/mapr/logformat/default.go @@ -0,0 +1,23 @@ +package logformat + +import ( + "errors" + "strings" +) + +// MakeFieldsDEFAULT is the default log file mapreduce parser. +func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) { + fields := make(map[string]string, 20) + splitted := strings.Split(maprLine, "|") + + fields["$hostname"] = p.hostname + + for _, kv := range splitted { + keyAndValue := strings.SplitN(kv, "=", 2) + if len(keyAndValue) != 2 { + return fields, errors.New("Error parsing mapr token: " + kv) + } + fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1] + } + return fields, nil +} diff --git a/mapr/logformat/default_test.go b/mapr/logformat/default_test.go new file mode 100644 index 0000000..a3c47fb --- /dev/null +++ b/mapr/logformat/default_test.go @@ -0,0 +1,35 @@ +package logformat + +import ( + "testing" +) + +func TestDefaultLogFormat(t *testing.T) { + parser, err := NewParser("default") + if err != nil { + t.Errorf("Unable to create parser: %s", err.Error()) + } + + fields, err := parser.MakeFields("foo=bar|baz=bay") + + if err != nil { + t.Errorf("Unable to parse: %s", err.Error()) + } + + if bar, ok := fields["foo"]; !ok { + t.Errorf("Expected field 'foo', but no such field there\n") + } else if bar != "bar" { + t.Errorf("Expected 'bar' stored in field 'foo', but got '%s'\n", bar) + } + + if bay, ok := fields["baz"]; !ok { + t.Errorf("Expected field 'baz', but no such field there\n") + } else if bay != "bay" { + t.Errorf("Expected 'bay' stored in field 'baz', but got '%s'\n", bay) + } + + fields, err = parser.MakeFields("foo=bar|bazbay") + if err == nil { + t.Errorf("Expected error but didn't: %s", err.Error()) + } +} diff --git a/mapr/logformat/parser.go b/mapr/logformat/parser.go new file mode 100644 index 0000000..b7c8c5c --- /dev/null +++ b/mapr/logformat/parser.go @@ -0,0 +1,75 @@ +package logformat + +import ( + "dtail/logger" + "errors" + "fmt" + "os" + "reflect" + "strings" +) + +// Parser is used to parse the mapreduce information from the server log files. +type Parser struct { + hostname string + logFormatName string + makeFieldsFunc reflect.Value + makeFieldsReceiver reflect.Value +} + +// NewParser returns a new log parser. +func NewParser(logFormatName string) (*Parser, error) { + hostname, err := os.Hostname() + + if err != nil { + return nil, err + } + + p := Parser{ + hostname: hostname, + } + + err = p.reflectLogFormat(logFormatName) + if err != nil { + return nil, err + } + + return &p, nil +} + +// The aim of this is that everyone can plug in their own mapr log format +// parsing method to DTail. Just add a method MakeFieldsMODULENAME to type +// Parser. Whereas MODULENAME must be a upeprcase string. +func (p *Parser) reflectLogFormat(logFormatName string) error { + methodName := fmt.Sprintf("MakeFields%s", strings.ToUpper(logFormatName)) + + rt := reflect.TypeOf(p) + method, ok := rt.MethodByName(methodName) + if !ok { + return errors.New("No such mapr log format module: " + methodName) + } + + p.makeFieldsFunc = method.Func + p.makeFieldsReceiver = reflect.ValueOf(p) + + return nil +} + +// MakeFields is for returning the fields from a given log line. +func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err error) { + inputValues := []reflect.Value{p.makeFieldsReceiver, reflect.ValueOf(maprLine)} + returnValues := p.makeFieldsFunc.Call(inputValues) + + errInterface := returnValues[1].Interface() + + if errInterface == nil { + fields, err = returnValues[0].Interface().(map[string]string), nil + logger.Trace("parser.MakeFields", fields, err) + return + } + + fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error) + logger.Trace("parser.MakeFields", fields, err) + + return +} diff --git a/mapr/query.go b/mapr/query.go new file mode 100644 index 0000000..8ed3c67 --- /dev/null +++ b/mapr/query.go @@ -0,0 +1,245 @@ +package mapr + +import ( + "dtail/logger" + "errors" + "fmt" + "strconv" + "strings" + "time" +) + +const ( + invalidQuery string = "Invalid query: " + unexpectedEnd string = "Unexpected end of query" +) + +// Query represents a parsed mapr query. +type Query struct { + Select []selectCondition + Table string + Where []whereCondition + GroupBy []string + OrderBy string + ReverseOrder bool + GroupKey string + Interval time.Duration + Limit int + Outfile string + RawQuery string + tokens []token +} + +func (q Query) String() string { + return fmt.Sprintf("Query(Select:%v,Table:%s,Where:%v,GroupBy:%v,GroupKey:%s,OrderBy:%v,ReverseOrder:%v,Interval:%v,Limit:%d,Outfile:%s,RawQuery:%s,tokens:%v)", + q.Select, + q.Table, + q.Where, + q.GroupBy, + q.GroupKey, + q.OrderBy, + q.ReverseOrder, + q.Interval, + q.Limit, + q.Outfile, + q.RawQuery, + q.tokens) +} + +// NewQuery returns a new mapreduce query. +func NewQuery(queryStr string) (*Query, error) { + if queryStr == "" { + return nil, nil + } + + tokens := tokenize(queryStr) + + q := Query{ + RawQuery: queryStr, + tokens: tokens, + Interval: time.Second * 5, + Limit: -1, + } + + err := q.parse(tokens) + + logger.Debug(q) + return &q, err +} + +func (q *Query) parse(tokens []token) error { + var found []token + var err error + + for tokens != nil && len(tokens) > 0 { + switch strings.ToLower(tokens[0].str) { + case "select": + tokens, found = tokensConsume(tokens[1:]) + q.Select, err = makeSelectConditions(found) + if err != nil { + return err + } + case "from": + tokens, found = tokensConsume(tokens[1:]) + if len(found) > 0 { + q.Table = strings.ToUpper(found[0].str) + } + case "where": + tokens, found = tokensConsume(tokens[1:]) + if q.Where, err = makeWhereConditions(found); err != nil { + return err + } + case "group": + tokens = tokensConsumeOptional(tokens[1:], "by") + if tokens == nil || len(tokens) < 1 { + return errors.New(invalidQuery + unexpectedEnd) + } + tokens, q.GroupBy = tokensConsumeStr(tokens) + q.GroupKey = strings.Join(q.GroupBy, ",") + case "rorder": + tokens = tokensConsumeOptional(tokens[1:], "by") + if tokens == nil || len(tokens) < 1 { + return errors.New(invalidQuery + unexpectedEnd) + } + tokens, found = tokensConsume(tokens) + if len(found) == 0 { + return errors.New(invalidQuery + unexpectedEnd) + } + q.OrderBy = found[0].str + q.ReverseOrder = true + case "order": + tokens = tokensConsumeOptional(tokens[1:], "by") + if tokens == nil || len(tokens) < 1 { + return errors.New(invalidQuery + unexpectedEnd) + } + tokens, found = tokensConsume(tokens) + if len(found) == 0 { + return errors.New(invalidQuery + unexpectedEnd) + } + q.OrderBy = found[0].str + case "interval": + tokens, found = tokensConsume(tokens[1:]) + if len(found) > 0 { + i, err := strconv.Atoi(found[0].str) + if err != nil { + return errors.New(invalidQuery + err.Error()) + } + q.Interval = time.Second * time.Duration(i) + } + case "limit": + tokens, found = tokensConsume(tokens[1:]) + if len(found) == 0 { + return errors.New(invalidQuery + unexpectedEnd) + } + i, err := strconv.Atoi(found[0].str) + if err != nil { + return errors.New(invalidQuery + err.Error()) + } + q.Limit = i + case "outfile": + tokens, found = tokensConsume(tokens[1:]) + if len(found) == 0 { + return errors.New(invalidQuery + unexpectedEnd) + } + q.Outfile = found[0].str + default: + return errors.New(invalidQuery + "Unexpected keyword " + tokens[0].str) + } + } + + if q.Table == "" { + return errors.New(invalidQuery + "Empty table specified in 'from' clause") + } + if len(q.Select) < 1 { + return errors.New(invalidQuery + "Expected at least one field in 'select' clause but got none") + } + if len(q.GroupBy) == 0 { + field := q.Select[0].Field + q.GroupBy = append(q.GroupBy, field) + } + + if q.OrderBy != "" { + var orderFieldIsValid bool + for _, sc := range q.Select { + if q.OrderBy == sc.FieldStorage { + orderFieldIsValid = true + break + } + } + if !orderFieldIsValid { + return errors.New(invalidQuery + fmt.Sprintf("Can not '(r)order by' '%s', must be present in 'select' clause", q.OrderBy)) + } + } + + return nil +} + +// WhereClause interprets the where clause of the mapreduce query. +func (q *Query) WhereClause(fields map[string]string) bool { + floatValue := func(str string, float float64, t whereType) (float64, bool) { + switch t { + case Float: + return float, true + case Field: + value, ok := fields[str] + if !ok { + return 0, false + } + f, err := strconv.ParseFloat(value, 64) + if err != nil { + return 0, false + } + return f, true + default: + logger.Error("Unexpected argument in 'where' clause", str, float, t) + return 0, false + } + } + + stringValue := func(str string, t whereType) (string, bool) { + switch t { + case Field: + value, ok := fields[str] + if !ok { + return str, false + } + return value, true + case String: + return str, true + default: + logger.Error("Unexpected argument in 'where' clause", str, t) + return str, false + } + } + + for _, wc := range q.Where { + var ok bool + + if wc.Operation > FloatOperation { + var lValue, rValue float64 + if lValue, ok = floatValue(wc.lString, wc.lFloat, wc.lType); !ok { + return false + } + if rValue, ok = floatValue(wc.rString, wc.rFloat, wc.rType); !ok { + return false + } + if ok = wc.floatClause(lValue, rValue); !ok { + return false + } + continue + } + + var lValue, rValue string + if lValue, ok = stringValue(wc.lString, wc.lType); !ok { + return false + } + if rValue, ok = stringValue(wc.rString, wc.rType); !ok { + return false + } + if ok = wc.stringClause(lValue, rValue); !ok { + return false + } + } + + return true +} diff --git a/mapr/query_test.go b/mapr/query_test.go new file mode 100644 index 0000000..6176461 --- /dev/null +++ b/mapr/query_test.go @@ -0,0 +1,149 @@ +package mapr + +import ( + "testing" + "time" +) + +func TestParseQuerySimple(t *testing.T) { + errorQueries := []string{ + "select", + "select foo", + "select foo from", + "select foo from bar where baz", + "select foo from bar where baz <", + "select foo from bar where baz < 100 bay eq 12 group", + "select foo from bar where baz < 100 bay eq 12 group by foo order by", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit", + } + okQueries := []string{"select foo from bar", + "select foo from bar where", + "select foo from bar where baz < 100 bay eq 12", + "select foo from bar where baz < 100, bay eq 12", + "select foo from bar where baz < 100 and bay eq 12", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23", + "select foo from bar where baz < 100 bay eq 12 group by foo, bar, baz order by foo limit 23 outfile \"result.csv\"", + } + + for _, queryStr := range errorQueries { + q, err := NewQuery(queryStr) + if err == nil { + t.Errorf("Expected a parse error: %s\n%v", queryStr, q) + continue + } + } + + for _, queryStr := range okQueries { + _, err := NewQuery(queryStr) + if err != nil { + t.Errorf("%s: %s", err.Error(), queryStr) + continue + } + } +} + +func TestParseQueryDeep(t *testing.T) { + dialects := []string{ + "select s1, `from`, count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23", + "SELECT s1, `from` COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP g1, g2 ORDER count(s3) INTERVAL 10 LIMIT 23", + "select s1, `from` count(s3) from table where w1 == 2 and w2 eq \"free beer\" group by g1, g2 order by count(s3) interval 10 limit 23", + "sElEct s1, `from` coUnt(s3) from taBle where w1 == 2 aNd w2 eq \"free beer\" Group By g1, g2 order bY count(s3) intervaL 10 LiMiT 23", + "SELECT s1 `from` COUNT(s3) FROM table WHERE w1 == 2 AND w2 eq \"free beer\" GROUP BY g1 g2 ORDER BY count(s3) INTERVAL 10 LIMIT 23", + "select s1 `from` count(s3) from table where w1 == 2 w2 eq \"free beer\" group g1 g2 order count(s3) interval 10 limit 23", + "limit 23 interval 10 order count(s3) group g1 g2 where w1 == 2 w2 eq \"free beer\" from table select s1 `from` count(s3)", + } + + for _, queryStr := range dialects { + q, err := NewQuery(queryStr) + if err != nil { + t.Errorf("%s: %s", err.Error(), queryStr) + } + + // 'select' clause + if len(q.Select) != 3 { + t.Errorf("Expected three elements in 'select' clause but got '%v': %s\n%v", q.Select, queryStr, q) + } + + if q.Select[0].Field != "s1" { + t.Errorf("Expected 's1' as first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Field, queryStr, q) + } + if q.Select[0].Operation != Last { + t.Errorf("Expected 'last' as aggregation function of first element in 'select' clause but got '%v': %s\n%v", q.Select[0].Operation, queryStr, q) + } + + if q.Select[1].Field != "from" { + t.Errorf("Expected 'from' as second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Field, queryStr, q) + } + if q.Select[1].Operation != Last { + t.Errorf("Expected 'last' as aggregation function of second element in 'select' clause but got '%v': %s\n%v", q.Select[1].Operation, queryStr, q) + } + + if q.Select[2].Field != "s3" { + t.Errorf("Expected 's3' as third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Field, queryStr, q) + } + if q.Select[2].Operation != Count { + t.Errorf("Expected 'count' as aggregation function of third element in 'select' clause but got '%v': %s\n%v", q.Select[2].Operation, queryStr, q) + } + if q.Select[2].FieldStorage != "count(s3)" { + t.Errorf("Expected 'count(s3)' as third element's storage in 'select' clause but got '%v': %s\n%v", q.Select[2].FieldStorage, queryStr, q) + } + + // 'from' clause + if q.Table != "TABLE" { + t.Errorf("Expected 'TABLE' in 'from' clause but got '%v': %s\n%v", q.Table, queryStr, q) + } + + // 'where' clause + if len(q.Where) != 2 { + t.Errorf("Expected two elements in 'where' clause but got '%v': %s\n%v", q.Where, queryStr, q) + } + if q.Where[0].lString != "w1" { + t.Errorf("Expected w1 as first element in 'where' clause but got '%v': %s\n%v", q.Where[0].lString, queryStr, q) + } + if q.Where[0].Operation != FloatEq { + t.Errorf("Expected FloatEq operation in first 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q) + } + if q.Where[0].rFloat != 2 { + t.Errorf("Expected '2' as float argument in first 'where' condition but got '%v': %s\n%v", q.Where[0].rFloat, queryStr, q) + } + if q.Where[1].lString != "w2" { + t.Errorf("Expected w2 as second element in 'where' clause but got '%v': %s\n%v", q.Where[1].lString, queryStr, q) + } + if q.Where[1].Operation != StringEq { + t.Errorf("Expected StringEq operation in second 'where' condition but got '%v': %s\n%v", q.Where[0].Operation, queryStr, q) + } + if q.Where[1].rString != "free beer" { + t.Errorf("Expected 'free beer' as string argument in second 'where' condition but got '%v': %s\n%v", q.Where[0].rString, queryStr, q) + } + + // 'group by' clause + if len(q.GroupBy) != 2 { + t.Errorf("Expected two elements in 'group by' clause but got '%v': %s\n%v", q.GroupBy, queryStr, q) + } + if q.GroupBy[0] != "g1" { + t.Errorf("Expected 'g1' as first element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[0], queryStr, q) + } + if q.GroupBy[1] != "g2" { + t.Errorf("Expected 'g2' as second element in 'group by' clause but got '%v': %s\n%v", q.GroupBy[1], queryStr, q) + } + if q.GroupKey != "g1,g2" { + t.Errorf("Expected 'g1,g2' as group key in 'group by' clause but got '%v': %s\n%v", q.GroupKey, queryStr, q) + } + + // 'order by' clause + if q.OrderBy != "count(s3)" { + t.Errorf("Expected 'count(s3)' as element in 'order by' clause but got '%v': %s\n%v", q.OrderBy, queryStr, q) + } + + // 'interval' clause + if q.Interval != time.Second*time.Duration(10) { + t.Errorf("Expected '10s' as duration 'interval' clause but got '%v': %s\n%v", q.Interval, queryStr, q) + } + + // 'limit' clause + if q.Limit != 23 { + t.Errorf("Expected '23' as limit in 'limit' clause but got '%v': %s\n%v", q.Limit, queryStr, q) + } + } +} diff --git a/mapr/selectcondition.go b/mapr/selectcondition.go new file mode 100644 index 0000000..1882b7e --- /dev/null +++ b/mapr/selectcondition.go @@ -0,0 +1,96 @@ +package mapr + +import ( + "errors" + "fmt" + "strings" +) + +// AggregateOperation is to specify the aggregate operation type. +type AggregateOperation int + +// Aggregate operation types +const ( + UndefAggregateOperation AggregateOperation = iota + Count AggregateOperation = iota + Sum AggregateOperation = iota + Min AggregateOperation = iota + Max AggregateOperation = iota + Last AggregateOperation = iota + Avg AggregateOperation = iota + Len AggregateOperation = iota +) + +// Represents a parsed "select" clause, used by mapr.Query. +type selectCondition struct { + Field string + FieldStorage string + Operation AggregateOperation +} + +func (sc selectCondition) String() string { + return fmt.Sprintf("selectCondition(Field:%s,FieldStorage:%s,Operation:%v)", + sc.Field, + sc.FieldStorage, + sc.Operation) +} + +func makeSelectConditions(tokens []token) ([]selectCondition, error) { + var sel []selectCondition + + // Parse select aggregation, e.g. sum(foo) + parse := func(token token) (selectCondition, error) { + var sc selectCondition + tokenStr := strings.ToLower(token.str) + + if !strings.Contains(tokenStr, "(") && !strings.Contains(tokenStr, ")") { + sc.Field = tokenStr + sc.FieldStorage = tokenStr + sc.Operation = Last + return sc, nil + } + + a := strings.Split(tokenStr, "(") + if len(a) != 2 { + return sc, errors.New(invalidQuery + "Can't parse 'select' aggregation: " + token.str) + } + agg := a[0] // Aggregation, e.g. 'sum' + + b := strings.Split(a[1], ")") + if len(b) != 2 { + return sc, errors.New(invalidQuery + "Can't parse 'select' field name from aggregation: " + token.str) + } + sc.Field = b[0] // Field name, e.g. 'foo' + sc.FieldStorage = tokenStr // e.g. 'sum(foo)' + + switch agg { + case "count": + sc.Operation = Count + case "sum": + sc.Operation = Sum + case "min": + sc.Operation = Min + case "max": + sc.Operation = Max + case "last": + sc.Operation = Last + case "avg": + sc.Operation = Avg + case "len": + sc.Operation = Len + default: + return sc, errors.New(invalidQuery + "Unknown aggregation in 'select' clause: " + agg) + } + + return sc, nil + } + + for _, token := range tokens { + sc, err := parse(token) + if err != nil { + return nil, err + } + sel = append(sel, sc) + } + return sel, nil +} diff --git a/mapr/server/aggregate.go b/mapr/server/aggregate.go new file mode 100644 index 0000000..316da67 --- /dev/null +++ b/mapr/server/aggregate.go @@ -0,0 +1,170 @@ +package server + +import ( + "dtail/config" + "dtail/fs" + "dtail/logger" + "dtail/mapr" + "dtail/mapr/logformat" + "os" + "strings" + "time" +) + +// Aggregate is for aggregating mapreduce data on the DTail server side. +type Aggregate struct { + // Log lines to process (parsing MAPREDUCE lines). + Lines chan fs.LineRead + // Hostname of the current server (used to populate $hostname field). + hostname string + // Signals to exit goroutine. + stop chan struct{} + // Signals to serialize data. + serialize chan struct{} + // The mapr query + query *mapr.Query + // The mapr log format parser + parser *logformat.Parser +} + +// NewAggregate return a new server side aggregator. +func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) { + query, err := mapr.NewQuery(queryStr) + if err != nil { + return nil, err + } + + fqdn, err := os.Hostname() + if err != nil { + logger.Error(err) + } + s := strings.Split(fqdn, ".") + + logger.Info("Creating mapr log format parser", config.Server.MapreduceLogFormat) + logParser, err := logformat.NewParser(config.Server.MapreduceLogFormat) + if err != nil { + logger.FatalExit("Could not create mapr log format parser", err) + } + + a := Aggregate{ + Lines: make(chan fs.LineRead, 100), + stop: make(chan struct{}), + serialize: make(chan struct{}), + hostname: s[0], + query: query, + parser: logParser, + } + + go a.periodicAggregateTimer() + + fieldsCh := make(chan map[string]string) + go a.readFields(fieldsCh, maprLines) + go a.readLines(fieldsCh) + + return &a, nil +} + +func (a *Aggregate) periodicAggregateTimer() { + for { + select { + case <-time.After(a.query.Interval): + a.Serialize() + case <-a.stop: + return + } + } +} + +func (a *Aggregate) readFields(fieldsCh <-chan map[string]string, maprLines chan<- string) { + group := mapr.NewGroupSet() + + for { + select { + case fields := <-fieldsCh: + a.aggregate(group, fields) + case <-a.serialize: + logger.Info("Serializing mapreduce result") + group.Serialize(maprLines, a.stop) + logger.Info("Done serializing mapreduce result") + group = mapr.NewGroupSet() + case <-a.stop: + return + } + } +} + +func (a *Aggregate) readLines(fieldsCh chan<- map[string]string) { + for { + select { + case line, ok := <-a.Lines: + if !ok { + return + } + + maprLine := strings.TrimSpace(string(line.Content)) + fields, err := a.parser.MakeFields(maprLine) + + if err != nil { + logger.Error(err) + continue + } + if !a.query.WhereClause(fields) { + continue + } + + select { + case fieldsCh <- fields: + case <-a.stop: + } + case <-a.stop: + return + } + } +} + +func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { + //logger.Trace("Aggregating", group, fields) + var sb strings.Builder + + for i, field := range a.query.GroupBy { + if i > 0 { + sb.WriteString(" ") + } + if val, ok := fields[field]; ok { + sb.WriteString(val) + } + } + groupKey := sb.String() + set := group.GetSet(groupKey) + + var addedSample bool + for _, sc := range a.query.Select { + if val, ok := fields[sc.Field]; ok { + if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { + logger.Error(err) + continue + } + addedSample = true + } + } + + if addedSample { + set.Samples++ + return + } + + logger.Trace("Aggregated data locally without adding new samples") +} + +// Serialize all the aggregated data. +func (a *Aggregate) Serialize() { + select { + case a.serialize <- struct{}{}: + case <-a.stop: + } +} + +// Close the aggregator. +func (a *Aggregate) Close() { + close(a.stop) +} diff --git a/mapr/token.go b/mapr/token.go new file mode 100644 index 0000000..b8be4da --- /dev/null +++ b/mapr/token.go @@ -0,0 +1,108 @@ +package mapr + +import ( + "strings" +) + +var keywords = [...]string{"select", "from", "where", "group", "rorder", "order", "interval", "limit", "outfile"} + +// Represents a parsed token, used to parse the mapr query. +type token struct { + str string + isBareword bool +} + +func (t token) isKeyword() bool { + if !t.isBareword { + return false + } + + for _, keyword := range keywords { + if strings.ToLower(t.str) == keyword { + return true + } + } + return false +} + +func (t token) String() string { + return t.str +} + +func tokenize(queryStr string) []token { + var tokens []token + + for i, part := range strings.Split(queryStr, "\"") { + // Even i, means that it is not a quoted string + if i%2 == 0 { + commasStripped := strings.Replace(part, ",", " ", -1) + for _, tokenStr := range strings.Fields(commasStripped) { + token := token{ + str: tokenStr, + isBareword: true, + } + tokens = append(tokens, token) + } + continue + } + // Add whole quoted string as a token + token := token{ + str: part, + isBareword: false, + } + tokens = append(tokens, token) + } + + return tokens +} + +func tokensConsume(tokens []token) ([]token, []token) { + //logger.Trace("=====================") + var consumed []token + + for i, t := range tokens { + if t.isKeyword() { + //logger.Trace("keyword", t) + return tokens[i:], consumed + } + // strip escapes, such as ` from `foo`, this allows to use keywords as field names + length := len(t.str) + if length == 0 { + continue + } + if t.str[0] == '`' && t.str[length-1] == '`' { + stripped := t.str[1 : length-1] + //logger.Trace("stripped", stripped) + t := token{ + str: stripped, + isBareword: t.isBareword, + } + consumed = append(consumed, t) + continue + } + //logger.Trace("bare", token) + consumed = append(consumed, t) + } + + //logger.Trace("result", consumed) + return nil, consumed +} + +func tokensConsumeStr(tokens []token) ([]token, []string) { + var strings []string + tokens, found := tokensConsume(tokens) + for _, token := range found { + strings = append(strings, token.str) + } + return tokens, strings +} + +func tokensConsumeOptional(tokens []token, optional string) []token { + if len(tokens) < 1 { + return tokens + } + if strings.ToLower(tokens[0].str) == strings.ToLower(optional) { + return tokens[1:] + } + return tokens +} diff --git a/mapr/wherecondition.go b/mapr/wherecondition.go new file mode 100644 index 0000000..515c8ad --- /dev/null +++ b/mapr/wherecondition.go @@ -0,0 +1,193 @@ +package mapr + +import ( + "dtail/logger" + "errors" + "fmt" + "strconv" + "strings" +) + +// QueryOperation determines the mapreduce operation. +type QueryOperation int + +// The possible mapreduce operation.s +const ( + UndefQueryOperation QueryOperation = iota + StringEq QueryOperation = iota + StringNe QueryOperation = iota + StringContains QueryOperation = iota + FloatOperation QueryOperation = iota + FloatEq QueryOperation = iota + FloatNe QueryOperation = iota + FloatLt QueryOperation = iota + FloatLe QueryOperation = iota + FloatGt QueryOperation = iota + FloatGe QueryOperation = iota +) + +type whereType int + +// The possible field types. +const ( + UndefWhereType whereType = iota + Field whereType = iota + String whereType = iota + Float whereType = iota +) + +func (w whereType) String() string { + switch w { + case Field: + return fmt.Sprintf("Field") + case String: + return fmt.Sprintf("String") + case Float: + return fmt.Sprintf("Float") + default: + return fmt.Sprintf("UndefWhereType") + } +} + +// Represent a parsed "where" clause, used by mapr.Query +type whereCondition struct { + lString string + lFloat float64 + lType whereType + + Operation QueryOperation + + rString string + rFloat float64 + rType whereType +} + +func (wc *whereCondition) String() string { + return fmt.Sprintf("whereCondition(Operation:%v,lString:%s,lFloat:%v,lType:%s,rString:%s,rFloat:%v,rType:%s)", + wc.Operation, wc.lString, wc.lFloat, wc.lType.String(), wc.rString, wc.rFloat, wc.rType.String()) +} + +func makeWhereConditions(tokens []token) (where []whereCondition, err error) { + parse := func(tokens []token) (whereCondition, []token, error) { + var wc whereCondition + if len(tokens) < 3 { + return wc, nil, errors.New(invalidQuery + "Not enough arguments in 'where' clause") + } + + whereOp := strings.ToLower(tokens[1].str) + switch whereOp { + case "==": + wc.Operation = FloatEq + case "!=": + wc.Operation = FloatNe + case "<": + wc.Operation = FloatLt + case "<=": + wc.Operation = FloatLe + case "=<": + wc.Operation = FloatLe + case ">": + wc.Operation = FloatGt + case ">=": + wc.Operation = FloatGe + case "=>": + wc.Operation = FloatGe + case "eq": + wc.Operation = StringEq + case "ne": + wc.Operation = StringNe + case "contains": + wc.Operation = StringContains + default: + return wc, nil, errors.New(invalidQuery + "Unknown operation in 'where' clause: " + whereOp) + } + + wc.lString = tokens[0].str + wc.rString = tokens[2].str + + if wc.Operation > FloatOperation { + if !tokens[0].isBareword { + return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's lValue: " + tokens[0].str) + } + if f, err := strconv.ParseFloat(wc.lString, 64); err == nil { + wc.lFloat = f + wc.lType = Float + } else { + wc.lType = Field + } + + if !tokens[2].isBareword { + return wc, nil, errors.New(invalidQuery + "Expected bareword at 'where' clause's rValue: " + tokens[2].str) + } + if f, err := strconv.ParseFloat(wc.rString, 64); err == nil { + wc.rFloat = f + wc.rType = Float + } else { + wc.rType = Field + } + return wc, tokens[3:], nil + } + + if tokens[0].isBareword { + wc.lType = Field + } else { + wc.lType = String + } + if tokens[2].isBareword { + wc.rType = Field + } else { + wc.rType = String + } + + return wc, tokens[3:], nil + } + + for len(tokens) > 0 { + var wc whereCondition + var err error + + wc, tokens, err = parse(tokens) + if err != nil { + return nil, err + } + + where = append(where, wc) + tokens = tokensConsumeOptional(tokens, "and") + } + + return +} + +func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool { + switch wc.Operation { + case FloatEq: + return lValue == rValue + case FloatNe: + return lValue != rValue + case FloatLt: + return lValue < rValue + case FloatLe: + return lValue <= rValue + case FloatGt: + return lValue > rValue + case FloatGe: + return lValue >= rValue + default: + logger.Error("Unknown float operation", lValue, wc.Operation, rValue) + } + return false +} + +func (wc *whereCondition) stringClause(lValue string, rValue string) bool { + switch wc.Operation { + case StringEq: + return lValue == rValue + case StringNe: + return lValue != rValue + case StringContains: + return strings.Contains(lValue, rValue) + default: + logger.Error("Unknown string operation", lValue, wc.Operation, rValue) + } + return false +} |
