summaryrefslogtreecommitdiff
path: root/internal/mapr
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-12-08 14:49:41 +0000
committerPaul Buetow <pbuetow@mimecast.com>2020-12-08 14:49:41 +0000
commit799b9b69ba08b898e13026b7ecab9f9f58580a82 (patch)
tree34bc0e5e539aed99dd1f13e7489e9d3111ba050f /internal/mapr
parent6b2d8539a66f1b36ffd55c56723376b9b068a5dc (diff)
merge develop
Diffstat (limited to 'internal/mapr')
-rw-r--r--internal/mapr/server/aggregate.go81
1 files changed, 44 insertions, 37 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 1028943..28bb074 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -6,6 +6,7 @@ import (
"strings"
"time"
+ "github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
@@ -15,6 +16,7 @@ import (
// Aggregate is for aggregating mapreduce data on the DTail server side.
type Aggregate struct {
+ done *internal.Done
// Log lines to process (parsing MAPREDUCE lines).
Lines chan line.Line
// Hostname of the current server (used to populate $hostname field).
@@ -23,12 +25,12 @@ type Aggregate struct {
serialize chan struct{}
// Signals to flush data.
flush chan struct{}
+ // Signals that data has been flushed
+ flushed chan struct{}
// The mapr query
query *mapr.Query
// The mapr log format parser
parser *logformat.Parser
- cancel context.CancelFunc
- ctx context.Context
}
// NewAggregate return a new server side aggregator.
@@ -64,56 +66,64 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
}
}
- ctx, cancel := context.WithCancel(context.Background())
-
a := Aggregate{
+ done: internal.NewDone(),
Lines: make(chan line.Line, 100),
serialize: make(chan struct{}),
flush: make(chan struct{}),
+ flushed: make(chan struct{}),
hostname: s[0],
query: query,
parser: logParser,
- ctx: ctx,
- cancel: cancel,
}
return &a, nil
}
+// Shutdown the aggregation engine.
+func (a *Aggregate) Shutdown() {
+ a.Flush()
+ a.done.Shutdown()
+}
+
// Start an aggregation.
func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
- defer a.cancel()
- fieldsCh := a.linesToFields(ctx)
+ myCtx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ go func() {
+ select {
+ case <-myCtx.Done():
+ a.done.Shutdown()
+ case <-a.done.Done():
+ cancel()
+ }
+ }()
+
+ fieldsCh := a.makeFields(myCtx)
// Add fields (e.g. via 'set' clause)
if len(a.query.Set) > 0 {
- fieldsCh = a.addMoreFields(ctx, fieldsCh)
+ fieldsCh = a.addFields(myCtx, fieldsCh)
}
- go a.fieldsToMaprLines(ctx, fieldsCh, maprLines)
- a.periodicAggregateTimer(ctx)
+ go a.aggregateTimer(myCtx)
+ a.makeMaprLines(myCtx, fieldsCh, maprLines)
}
-// Cancel the aggregation.
-func (a *Aggregate) Cancel() {
- a.cancel()
-}
-
-func (a *Aggregate) periodicAggregateTimer(ctx context.Context) {
+func (a *Aggregate) aggregateTimer(ctx context.Context) {
for {
select {
case <-time.After(a.query.Interval):
a.Serialize(ctx)
case <-ctx.Done():
return
- case <-a.ctx.Done():
- return
}
}
}
-func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string {
+func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
ch := make(chan map[string]string)
go func() {
@@ -144,8 +154,6 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string
}
case <-ctx.Done():
return
- case <-a.ctx.Done():
- return
}
}
}()
@@ -153,14 +161,14 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string
return ch
}
-func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string {
+func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string {
ch := make(chan map[string]string)
go func() {
defer close(ch)
for {
- // fieldsCh will be closed via 'linesToFields' if ctx is done
+ // fieldsCh will be closed via 'makeFields' if ctx is done
fields, ok := <-fieldsCh
if !ok {
return
@@ -179,7 +187,7 @@ func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[strin
return ch
}
-func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) {
+func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) {
group := mapr.NewGroupSet()
serialize := func() {
@@ -200,18 +208,10 @@ func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[s
case <-a.serialize:
serialize()
case <-a.flush:
- logger.Info("Flushing mapreduce result")
serialize()
- a.flush <- struct{}{}
- logger.Info("Done flushing mapreduce result")
+ a.flushed <- struct{}{}
case <-ctx.Done():
return
- case <-a.ctx.Done():
- logger.Info("Flushing mapreduce result")
- serialize()
- a.flush <- struct{}{}
- logger.Info("Done flushing mapreduce result")
- return
}
}
}
@@ -254,6 +254,8 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
func (a *Aggregate) Serialize(ctx context.Context) {
select {
case a.serialize <- struct{}{}:
+ case <-time.After(time.Minute):
+ logger.Warn("Starting to serialize mapredice data takes over a minute")
case <-ctx.Done():
}
}
@@ -261,15 +263,20 @@ func (a *Aggregate) Serialize(ctx context.Context) {
// Flush all data.
func (a *Aggregate) Flush() {
select {
- case <-a.ctx.Done():
- return
case a.flush <- struct{}{}:
+ logger.Info("Flushing mapreduce data")
case <-time.After(time.Minute):
+ logger.Warn("Starting to flush mapreduce data takes over a minute")
+ return
+ case <-a.done.Done():
return
}
select {
- case <-a.flush:
+ case <-a.flushed:
+ logger.Info("Done flushing")
case <-time.After(time.Minute):
+ logger.Warn("Waiting for data to be flushed takes over a minute")
+ case <-a.done.Done():
}
}