diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-01-09 20:30:15 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-01-09 20:30:15 +0000 |
| commit | 3755a9911ecb05886577095f2b8cc8b9e4066a3a (patch) | |
| tree | 86e24bc466986cb5c9c6d167a918e6064defeafc /mapr/server | |
Release of DTail v1.0.0v1.0.0
Diffstat (limited to 'mapr/server')
| -rw-r--r-- | mapr/server/aggregate.go | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/mapr/server/aggregate.go b/mapr/server/aggregate.go new file mode 100644 index 0000000..316da67 --- /dev/null +++ b/mapr/server/aggregate.go @@ -0,0 +1,170 @@ +package server + +import ( + "dtail/config" + "dtail/fs" + "dtail/logger" + "dtail/mapr" + "dtail/mapr/logformat" + "os" + "strings" + "time" +) + +// Aggregate is for aggregating mapreduce data on the DTail server side. +type Aggregate struct { + // Log lines to process (parsing MAPREDUCE lines). + Lines chan fs.LineRead + // Hostname of the current server (used to populate $hostname field). + hostname string + // Signals to exit goroutine. + stop chan struct{} + // Signals to serialize data. + serialize chan struct{} + // The mapr query + query *mapr.Query + // The mapr log format parser + parser *logformat.Parser +} + +// NewAggregate return a new server side aggregator. +func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) { + query, err := mapr.NewQuery(queryStr) + if err != nil { + return nil, err + } + + fqdn, err := os.Hostname() + if err != nil { + logger.Error(err) + } + s := strings.Split(fqdn, ".") + + logger.Info("Creating mapr log format parser", config.Server.MapreduceLogFormat) + logParser, err := logformat.NewParser(config.Server.MapreduceLogFormat) + if err != nil { + logger.FatalExit("Could not create mapr log format parser", err) + } + + a := Aggregate{ + Lines: make(chan fs.LineRead, 100), + stop: make(chan struct{}), + serialize: make(chan struct{}), + hostname: s[0], + query: query, + parser: logParser, + } + + go a.periodicAggregateTimer() + + fieldsCh := make(chan map[string]string) + go a.readFields(fieldsCh, maprLines) + go a.readLines(fieldsCh) + + return &a, nil +} + +func (a *Aggregate) periodicAggregateTimer() { + for { + select { + case <-time.After(a.query.Interval): + a.Serialize() + case <-a.stop: + return + } + } +} + +func (a *Aggregate) readFields(fieldsCh <-chan map[string]string, maprLines chan<- string) { + group := mapr.NewGroupSet() + + for { + select { + case fields := <-fieldsCh: + a.aggregate(group, fields) + case <-a.serialize: + logger.Info("Serializing mapreduce result") + group.Serialize(maprLines, a.stop) + logger.Info("Done serializing mapreduce result") + group = mapr.NewGroupSet() + case <-a.stop: + return + } + } +} + +func (a *Aggregate) readLines(fieldsCh chan<- map[string]string) { + for { + select { + case line, ok := <-a.Lines: + if !ok { + return + } + + maprLine := strings.TrimSpace(string(line.Content)) + fields, err := a.parser.MakeFields(maprLine) + + if err != nil { + logger.Error(err) + continue + } + if !a.query.WhereClause(fields) { + continue + } + + select { + case fieldsCh <- fields: + case <-a.stop: + } + case <-a.stop: + return + } + } +} + +func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { + //logger.Trace("Aggregating", group, fields) + var sb strings.Builder + + for i, field := range a.query.GroupBy { + if i > 0 { + sb.WriteString(" ") + } + if val, ok := fields[field]; ok { + sb.WriteString(val) + } + } + groupKey := sb.String() + set := group.GetSet(groupKey) + + var addedSample bool + for _, sc := range a.query.Select { + if val, ok := fields[sc.Field]; ok { + if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { + logger.Error(err) + continue + } + addedSample = true + } + } + + if addedSample { + set.Samples++ + return + } + + logger.Trace("Aggregated data locally without adding new samples") +} + +// Serialize all the aggregated data. +func (a *Aggregate) Serialize() { + select { + case a.serialize <- struct{}{}: + case <-a.stop: + } +} + +// Close the aggregator. +func (a *Aggregate) Close() { + close(a.stop) +} |
