summaryrefslogtreecommitdiff
path: root/mapr/server
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-09 20:30:15 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-01-09 20:30:15 +0000
commit3755a9911ecb05886577095f2b8cc8b9e4066a3a (patch)
tree86e24bc466986cb5c9c6d167a918e6064defeafc /mapr/server
Release of DTail v1.0.0v1.0.0
Diffstat (limited to 'mapr/server')
-rw-r--r--mapr/server/aggregate.go170
1 files changed, 170 insertions, 0 deletions
diff --git a/mapr/server/aggregate.go b/mapr/server/aggregate.go
new file mode 100644
index 0000000..316da67
--- /dev/null
+++ b/mapr/server/aggregate.go
@@ -0,0 +1,170 @@
+package server
+
+import (
+ "dtail/config"
+ "dtail/fs"
+ "dtail/logger"
+ "dtail/mapr"
+ "dtail/mapr/logformat"
+ "os"
+ "strings"
+ "time"
+)
+
+// Aggregate is for aggregating mapreduce data on the DTail server side.
+type Aggregate struct {
+ // Log lines to process (parsing MAPREDUCE lines).
+ Lines chan fs.LineRead
+ // Hostname of the current server (used to populate $hostname field).
+ hostname string
+ // Signals to exit goroutine.
+ stop chan struct{}
+ // Signals to serialize data.
+ serialize chan struct{}
+ // The mapr query
+ query *mapr.Query
+ // The mapr log format parser
+ parser *logformat.Parser
+}
+
+// NewAggregate return a new server side aggregator.
+func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) {
+ query, err := mapr.NewQuery(queryStr)
+ if err != nil {
+ return nil, err
+ }
+
+ fqdn, err := os.Hostname()
+ if err != nil {
+ logger.Error(err)
+ }
+ s := strings.Split(fqdn, ".")
+
+ logger.Info("Creating mapr log format parser", config.Server.MapreduceLogFormat)
+ logParser, err := logformat.NewParser(config.Server.MapreduceLogFormat)
+ if err != nil {
+ logger.FatalExit("Could not create mapr log format parser", err)
+ }
+
+ a := Aggregate{
+ Lines: make(chan fs.LineRead, 100),
+ stop: make(chan struct{}),
+ serialize: make(chan struct{}),
+ hostname: s[0],
+ query: query,
+ parser: logParser,
+ }
+
+ go a.periodicAggregateTimer()
+
+ fieldsCh := make(chan map[string]string)
+ go a.readFields(fieldsCh, maprLines)
+ go a.readLines(fieldsCh)
+
+ return &a, nil
+}
+
+func (a *Aggregate) periodicAggregateTimer() {
+ for {
+ select {
+ case <-time.After(a.query.Interval):
+ a.Serialize()
+ case <-a.stop:
+ return
+ }
+ }
+}
+
+func (a *Aggregate) readFields(fieldsCh <-chan map[string]string, maprLines chan<- string) {
+ group := mapr.NewGroupSet()
+
+ for {
+ select {
+ case fields := <-fieldsCh:
+ a.aggregate(group, fields)
+ case <-a.serialize:
+ logger.Info("Serializing mapreduce result")
+ group.Serialize(maprLines, a.stop)
+ logger.Info("Done serializing mapreduce result")
+ group = mapr.NewGroupSet()
+ case <-a.stop:
+ return
+ }
+ }
+}
+
+func (a *Aggregate) readLines(fieldsCh chan<- map[string]string) {
+ for {
+ select {
+ case line, ok := <-a.Lines:
+ if !ok {
+ return
+ }
+
+ maprLine := strings.TrimSpace(string(line.Content))
+ fields, err := a.parser.MakeFields(maprLine)
+
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ if !a.query.WhereClause(fields) {
+ continue
+ }
+
+ select {
+ case fieldsCh <- fields:
+ case <-a.stop:
+ }
+ case <-a.stop:
+ return
+ }
+ }
+}
+
+func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
+ //logger.Trace("Aggregating", group, fields)
+ var sb strings.Builder
+
+ for i, field := range a.query.GroupBy {
+ if i > 0 {
+ sb.WriteString(" ")
+ }
+ if val, ok := fields[field]; ok {
+ sb.WriteString(val)
+ }
+ }
+ groupKey := sb.String()
+ set := group.GetSet(groupKey)
+
+ var addedSample bool
+ for _, sc := range a.query.Select {
+ if val, ok := fields[sc.Field]; ok {
+ if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
+ logger.Error(err)
+ continue
+ }
+ addedSample = true
+ }
+ }
+
+ if addedSample {
+ set.Samples++
+ return
+ }
+
+ logger.Trace("Aggregated data locally without adding new samples")
+}
+
+// Serialize all the aggregated data.
+func (a *Aggregate) Serialize() {
+ select {
+ case a.serialize <- struct{}{}:
+ case <-a.stop:
+ }
+}
+
+// Close the aggregator.
+func (a *Aggregate) Close() {
+ close(a.stop)
+}