diff options
| author | Paul Buetow <paul@dionysus> | 2020-09-19 19:15:08 +0100 |
|---|---|---|
| committer | Paul Buetow <paul@dionysus> | 2020-09-19 19:15:08 +0100 |
| commit | df2ff83897cde61d04b12958c6f6d458c69502f4 (patch) | |
| tree | 8e6d9f697fe9a5c70f200d54745bb5daecac6bde | |
| parent | bad8e04bb4410b94b8e875ccde287f74ab94121a (diff) | |
refactor to have no context.Context in server mapreduce aggregate structs
| -rw-r--r-- | internal/mapr/server/aggregate.go | 80 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 2 |
2 files changed, 44 insertions, 38 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 1028943..cd59b63 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" @@ -15,6 +16,7 @@ import ( // Aggregate is for aggregating mapreduce data on the DTail server side. type Aggregate struct { + done *internal.Done // Log lines to process (parsing MAPREDUCE lines). Lines chan line.Line // Hostname of the current server (used to populate $hostname field). @@ -23,12 +25,12 @@ type Aggregate struct { serialize chan struct{} // Signals to flush data. flush chan struct{} + // Signals that data has been flushed + flushed chan struct{} // The mapr query query *mapr.Query // The mapr log format parser parser *logformat.Parser - cancel context.CancelFunc - ctx context.Context } // NewAggregate return a new server side aggregator. @@ -64,56 +66,63 @@ func NewAggregate(queryStr string) (*Aggregate, error) { } } - ctx, cancel := context.WithCancel(context.Background()) - a := Aggregate{ + done: internal.NewDone(), Lines: make(chan line.Line, 100), serialize: make(chan struct{}), flush: make(chan struct{}), + flushed: make(chan struct{}), hostname: s[0], query: query, parser: logParser, - ctx: ctx, - cancel: cancel, } return &a, nil } +func (a *Aggregate) Shutdown() { + a.Flush() + a.done.Shutdown() +} + // Start an aggregation. func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { - defer a.cancel() - fieldsCh := a.linesToFields(ctx) + myCtx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { + select { + case <-myCtx.Done(): + a.done.Shutdown() + case <-a.done.Done(): + cancel() + } + }() + + fieldsCh := a.makeFields(myCtx) // Add fields (e.g. via 'set' clause) if len(a.query.Set) > 0 { - fieldsCh = a.addMoreFields(ctx, fieldsCh) + fieldsCh = a.addFields(myCtx, fieldsCh) } - go a.fieldsToMaprLines(ctx, fieldsCh, maprLines) - a.periodicAggregateTimer(ctx) + go a.aggregateTimer(myCtx) + a.makeMaprLines(myCtx, fieldsCh, maprLines) } -// Cancel the aggregation. -func (a *Aggregate) Cancel() { - a.cancel() -} - -func (a *Aggregate) periodicAggregateTimer(ctx context.Context) { +func (a *Aggregate) aggregateTimer(ctx context.Context) { for { select { case <-time.After(a.query.Interval): a.Serialize(ctx) case <-ctx.Done(): return - case <-a.ctx.Done(): - return } } } -func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string { +func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { ch := make(chan map[string]string) go func() { @@ -144,8 +153,6 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string } case <-ctx.Done(): return - case <-a.ctx.Done(): - return } } }() @@ -153,14 +160,14 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string return ch } -func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { +func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { ch := make(chan map[string]string) go func() { defer close(ch) for { - // fieldsCh will be closed via 'linesToFields' if ctx is done + // fieldsCh will be closed via 'makeFields' if ctx is done fields, ok := <-fieldsCh if !ok { return @@ -179,7 +186,7 @@ func (a *Aggregate) addMoreFields(ctx context.Context, fieldsCh <-chan map[strin return ch } -func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { +func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { group := mapr.NewGroupSet() serialize := func() { @@ -200,18 +207,10 @@ func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[s case <-a.serialize: serialize() case <-a.flush: - logger.Info("Flushing mapreduce result") serialize() - a.flush <- struct{}{} - logger.Info("Done flushing mapreduce result") + a.flushed <- struct{}{} case <-ctx.Done(): return - case <-a.ctx.Done(): - logger.Info("Flushing mapreduce result") - serialize() - a.flush <- struct{}{} - logger.Info("Done flushing mapreduce result") - return } } } @@ -254,6 +253,8 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { func (a *Aggregate) Serialize(ctx context.Context) { select { case a.serialize <- struct{}{}: + case <-time.After(time.Minute): + logger.Warn("Starting to serialize mapredice data takes over a minute") case <-ctx.Done(): } } @@ -261,15 +262,20 @@ func (a *Aggregate) Serialize(ctx context.Context) { // Flush all data. func (a *Aggregate) Flush() { select { - case <-a.ctx.Done(): - return case a.flush <- struct{}{}: + logger.Info("Flushing mapreduce data") case <-time.After(time.Minute): + logger.Warn("Starting to flush mapreduce data takes over a minute") + return + case <-a.done.Done(): return } select { - case <-a.flush: + case <-a.flushed: + logger.Info("Done flushing") case <-time.After(time.Minute): + logger.Warn("Waiting for data to be flushed takes over a minute") + case <-a.done.Done(): } } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 5af3ebb..164a280 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -266,7 +266,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] if h.aggregate == nil { return } - h.aggregate.Cancel() + h.aggregate.Shutdown() } } |
