summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@dionysus>2020-09-19 19:15:08 +0100
committerPaul Buetow <paul@dionysus>2020-09-19 19:15:08 +0100
commitdf2ff83897cde61d04b12958c6f6d458c69502f4 (patch)
tree8e6d9f697fe9a5c70f200d54745bb5daecac6bde
parentbad8e04bb4410b94b8e875ccde287f74ab94121a (diff)
refactor to have no context.Context in server mapreduce aggregate structs
-rw-r--r--internal/mapr/server/aggregate.go80
-rw-r--r--internal/server/handlers/serverhandler.go2
2 files changed, 44 insertions, 38 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 1028943..cd59b63 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -6,6 +6,7 @@ import (
"strings"
"time"
+ "github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
@@ -15,6 +16,7 @@ import (
// Aggregate is for aggregating mapreduce data on the DTail server side.
type Aggregate struct {
+ done *internal.Done
// Log lines to process (parsing MAPREDUCE lines).
Lines chan line.Line
// Hostname of the current server (used to populate $hostname field).
@@ -23,12 +25,12 @@ type Aggregate struct {
serialize chan struct{}
// Signals to flush data.
flush chan struct{}
+ // Signals that data has been flushed
+ flushed chan struct{}
// The mapr query
query *mapr.Query
// The mapr log format parser
parser *logformat.Parser
- cancel context.CancelFunc
- ctx context.Context
}
// NewAggregate return a new server side aggregator.
@@ -64,56 +66,63 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
}
}
- ctx, cancel := context.WithCancel(context.Background())
-
a := Aggregate{
+ done: internal.NewDone(),
Lines: make(chan line.Line, 100),
serialize: make(chan struct{}),
flush: make(chan struct{}),
+ flushed: make(chan struct{}),
hostname: s[0],
query: query,
parser: logParser,
- ctx: ctx,
- cancel: cancel,
}
return &a, nil
}
+func (a *Aggregate) Shutdown() {
+ a.Flush()
+ a.done.Shutdown()
+}
+
// Start an aggregation.
func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
- defer a.cancel()
- fieldsCh := a.linesToFields(ctx)
+ myCtx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ go func() {
+ select {
+ case <-myCtx.Done():
+ a.done.Shutdown()
+ case <-a.done.Done():
+ cancel()
+ }
+ }()
+
+ fieldsCh := a.makeFields(myCtx)
// Add fields (e.g. via 'set' clause)
if len(a.query.Set) > 0 {
- fieldsCh = a.addMoreFields(ctx, fieldsCh)
+ fieldsCh = a.addFields(myCtx, fieldsCh)
}
- go a.fieldsToMaprLines(ctx, fieldsCh, maprLines)
- a.periodicAggregateTimer(ctx)
+ go a.aggregateTimer(myCtx)
+ a.makeMaprLines(myCtx, fieldsCh, maprLines)
}
-// Cancel the aggregation.
-func (a *Aggregate) Cancel() {
- a.cancel()
-}
-
-func (a *Aggregate) periodicAggregateTimer(ctx context.Context) {
+func (a *Aggregate) aggregateTimer(ctx context.Context) {
for {
select {
case <-time.After(a.query.Interval):
a.Serialize(ctx)
case <-ctx.Done():
return
- case <-a.ctx.Done():
- return
}
}
}
-func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string {
+func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
ch := make(chan map[string]string)
go func() {
@@ -144,8 +153,6 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string
}
case <-ctx.Done():
return
- case <-a.ctx.Done():
- return
}
}
}()
@@ -153,14 +160,14 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string
return ch
}
-func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string {
+func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string {
ch := make(chan map[string]string)
go func() {
defer close(ch)
for {
- // fieldsCh will be closed via 'linesToFields' if ctx is done
+ // fieldsCh will be closed via 'makeFields' if ctx is done
fields, ok := <-fieldsCh
if !ok {
return
@@ -179,7 +186,7 @@ func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[strin
return ch
}
-func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) {
+func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) {
group := mapr.NewGroupSet()
serialize := func() {
@@ -200,18 +207,10 @@ func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[s
case <-a.serialize:
serialize()
case <-a.flush:
- logger.Info("Flushing mapreduce result")
serialize()
- a.flush <- struct{}{}
- logger.Info("Done flushing mapreduce result")
+ a.flushed <- struct{}{}
case <-ctx.Done():
return
- case <-a.ctx.Done():
- logger.Info("Flushing mapreduce result")
- serialize()
- a.flush <- struct{}{}
- logger.Info("Done flushing mapreduce result")
- return
}
}
}
@@ -254,6 +253,8 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
func (a *Aggregate) Serialize(ctx context.Context) {
select {
case a.serialize <- struct{}{}:
+ case <-time.After(time.Minute):
+ logger.Warn("Starting to serialize mapredice data takes over a minute")
case <-ctx.Done():
}
}
@@ -261,15 +262,20 @@ func (a *Aggregate) Serialize(ctx context.Context) {
// Flush all data.
func (a *Aggregate) Flush() {
select {
- case <-a.ctx.Done():
- return
case a.flush <- struct{}{}:
+ logger.Info("Flushing mapreduce data")
case <-time.After(time.Minute):
+ logger.Warn("Starting to flush mapreduce data takes over a minute")
+ return
+ case <-a.done.Done():
return
}
select {
- case <-a.flush:
+ case <-a.flushed:
+ logger.Info("Done flushing")
case <-time.After(time.Minute):
+ logger.Warn("Waiting for data to be flushed takes over a minute")
+ case <-a.done.Done():
}
}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 5af3ebb..164a280 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -266,7 +266,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
if h.aggregate == nil {
return
}
- h.aggregate.Cancel()
+ h.aggregate.Shutdown()
}
}