summaryrefslogtreecommitdiff
path: root/internal/mapr/server/aggregate.go
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-26 11:26:53 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-02-07 13:31:15 +0000
commit0945da8dfefcbb723eecea0e5f4eafff63398253 (patch)
treef06dab4d2bf21d25d176b23d5baeca588d27f5d7 /internal/mapr/server/aggregate.go
parent2a8e5de265a0e0a31a5834909d6879f5c9941467 (diff)
Introduce drun command, refactor code to use context package
Diffstat (limited to 'internal/mapr/server/aggregate.go')
-rw-r--r--internal/mapr/server/aggregate.go141
1 files changed, 83 insertions, 58 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 900756e..922dcbd 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -1,26 +1,28 @@
package server
import (
- "github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/fs"
- "github.com/mimecast/dtail/internal/logger"
- "github.com/mimecast/dtail/internal/mapr"
- "github.com/mimecast/dtail/internal/mapr/logformat"
+ "context"
"os"
"strings"
"time"
+
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/line"
+ "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/mapr"
+ "github.com/mimecast/dtail/internal/mapr/logformat"
)
// Aggregate is for aggregating mapreduce data on the DTail server side.
type Aggregate struct {
// Log lines to process (parsing MAPREDUCE lines).
- Lines chan fs.LineRead
+ Lines chan line.Line
// Hostname of the current server (used to populate $hostname field).
hostname string
- // Signals to exit goroutine.
- stop chan struct{}
// Signals to serialize data.
serialize chan struct{}
+ // Signals to flush data.
+ flush chan struct{}
// The mapr query
query *mapr.Query
// The mapr log format parser
@@ -28,7 +30,7 @@ type Aggregate struct {
}
// NewAggregate return a new server side aggregator.
-func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) {
+func NewAggregate(queryStr string) (*Aggregate, error) {
query, err := mapr.NewQuery(queryStr)
if err != nil {
return nil, err
@@ -47,76 +49,98 @@ func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error)
}
a := Aggregate{
- Lines: make(chan fs.LineRead, 100),
- stop: make(chan struct{}),
+ Lines: make(chan line.Line, 100),
serialize: make(chan struct{}),
+ flush: make(chan struct{}),
hostname: s[0],
query: query,
parser: logParser,
}
- go a.periodicAggregateTimer()
-
- fieldsCh := make(chan map[string]string)
- go a.readFields(fieldsCh, maprLines)
- go a.readLines(fieldsCh)
-
return &a, nil
}
-func (a *Aggregate) periodicAggregateTimer() {
+// Start an aggregation run.
+func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
+ fieldsCh := a.linesToFields(ctx)
+ go a.fieldsToMaprLines(ctx, fieldsCh, maprLines)
+ a.periodicAggregateTimer(ctx)
+}
+
+func (a *Aggregate) periodicAggregateTimer(ctx context.Context) {
for {
select {
case <-time.After(a.query.Interval):
- a.Serialize()
- case <-a.stop:
+ a.Serialize(ctx)
+ case <-ctx.Done():
return
}
}
}
-func (a *Aggregate) readFields(fieldsCh <-chan map[string]string, maprLines chan<- string) {
- group := mapr.NewGroupSet()
+func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string {
+ fieldsCh := make(chan map[string]string)
- for {
- select {
- case fields := <-fieldsCh:
- a.aggregate(group, fields)
- case <-a.serialize:
- logger.Info("Serializing mapreduce result")
- group.Serialize(maprLines, a.stop)
- logger.Info("Done serializing mapreduce result")
- group = mapr.NewGroupSet()
- case <-a.stop:
- return
+ go func() {
+ defer close(fieldsCh)
+
+ for {
+ select {
+ case line, ok := <-a.Lines:
+ if !ok {
+ return
+ }
+
+ maprLine := strings.TrimSpace(string(line.Content))
+ fields, err := a.parser.MakeFields(maprLine)
+
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+ if !a.query.WhereClause(fields) {
+ continue
+ }
+
+ select {
+ case fieldsCh <- fields:
+ case <-ctx.Done():
+ }
+ case <-ctx.Done():
+ return
+ }
}
- }
+ }()
+
+ return fieldsCh
}
-func (a *Aggregate) readLines(fieldsCh chan<- map[string]string) {
+func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) {
+ group := mapr.NewGroupSet()
+
for {
select {
- case line, ok := <-a.Lines:
+ case fields, ok := <-fieldsCh:
if !ok {
+ logger.Info("Serializing mapreduce result (final)")
+ group.Serialize(ctx, maprLines)
+ group = mapr.NewGroupSet()
+ logger.Info("Done serializing mapreduce result (final)")
return
}
-
- maprLine := strings.TrimSpace(string(line.Content))
- fields, err := a.parser.MakeFields(maprLine)
-
- if err != nil {
- logger.Error(err)
- continue
- }
- if !a.query.WhereClause(fields) {
- continue
- }
-
- select {
- case fieldsCh <- fields:
- case <-a.stop:
- }
- case <-a.stop:
+ a.aggregate(group, fields)
+ case <-a.serialize:
+ logger.Info("Serializing mapreduce result")
+ group.Serialize(ctx, maprLines)
+ group = mapr.NewGroupSet()
+ logger.Info("Done serializing mapreduce result")
+ case <-a.flush:
+ logger.Info("Flushing mapreduce result")
+ group.Serialize(ctx, maprLines)
+ group = mapr.NewGroupSet()
+ a.flush <- struct{}{}
+ logger.Info("Done flushing mapreduce result")
+ case <-ctx.Done():
return
}
}
@@ -157,14 +181,15 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
}
// Serialize all the aggregated data.
-func (a *Aggregate) Serialize() {
+func (a *Aggregate) Serialize(ctx context.Context) {
select {
case a.serialize <- struct{}{}:
- case <-a.stop:
+ case <-ctx.Done():
}
}
-// Close the aggregator.
-func (a *Aggregate) Close() {
- close(a.stop)
+// Flush all data.
+func (a *Aggregate) Flush() {
+ a.flush <- struct{}{}
+ <-a.flush
}