diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-03-04 16:32:27 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-03-04 16:32:27 +0000 |
| commit | 238dd3930e9c58397a86f649c77912ee32e4d7f0 (patch) | |
| tree | b4cda0b8c677188b600478522471628b5d4efea4 /internal/mapr/server | |
| parent | 89d3ebfc4e0c947977e5f455ee76f3ce29cc92c7 (diff) | |
can tail probe with a given timeout and then write a mapreduce result
Diffstat (limited to 'internal/mapr/server')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 59 |
1 files changed, 46 insertions, 13 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 922dcbd..fade689 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -27,6 +27,8 @@ type Aggregate struct { query *mapr.Query // The mapr log format parser parser *logformat.Parser + cancel context.CancelFunc + ctx context.Context } // NewAggregate return a new server side aggregator. @@ -48,6 +50,8 @@ func NewAggregate(queryStr string) (*Aggregate, error) { logger.FatalExit("Could not create mapr log format parser", err) } + ctx, cancel := context.WithCancel(context.Background()) + a := Aggregate{ Lines: make(chan line.Line, 100), serialize: make(chan struct{}), @@ -55,18 +59,27 @@ func NewAggregate(queryStr string) (*Aggregate, error) { hostname: s[0], query: query, parser: logParser, + ctx: ctx, + cancel: cancel, } return &a, nil } -// Start an aggregation run. +// Start an aggregation. func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { + defer a.cancel() + fieldsCh := a.linesToFields(ctx) go a.fieldsToMaprLines(ctx, fieldsCh, maprLines) a.periodicAggregateTimer(ctx) } +// Cancel the aggregation. +func (a *Aggregate) Cancel() { + a.cancel() +} + func (a *Aggregate) periodicAggregateTimer(ctx context.Context) { for { select { @@ -74,6 +87,8 @@ func (a *Aggregate) periodicAggregateTimer(ctx context.Context) { a.Serialize(ctx) case <-ctx.Done(): return + case <-a.ctx.Done(): + return } } } @@ -108,6 +123,8 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string } case <-ctx.Done(): return + case <-a.ctx.Done(): + return } } }() @@ -118,30 +135,36 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { group := mapr.NewGroupSet() + serialize := func() { + logger.Info("Serializing mapreduce result") + group.Serialize(ctx, maprLines) + group = mapr.NewGroupSet() + logger.Info("Done serializing mapreduce result") + } + for { select { case fields, ok := <-fieldsCh: if !ok { - logger.Info("Serializing mapreduce result (final)") - group.Serialize(ctx, maprLines) - group = mapr.NewGroupSet() - logger.Info("Done serializing mapreduce result (final)") + serialize() return } a.aggregate(group, fields) case <-a.serialize: - logger.Info("Serializing mapreduce result") - group.Serialize(ctx, maprLines) - group = mapr.NewGroupSet() - logger.Info("Done serializing mapreduce result") + serialize() case <-a.flush: logger.Info("Flushing mapreduce result") - group.Serialize(ctx, maprLines) - group = mapr.NewGroupSet() + serialize() a.flush <- struct{}{} logger.Info("Done flushing mapreduce result") case <-ctx.Done(): return + case <-a.ctx.Done(): + logger.Info("Flushing mapreduce result") + serialize() + a.flush <- struct{}{} + logger.Info("Done flushing mapreduce result") + return } } } @@ -190,6 +213,16 @@ func (a *Aggregate) Serialize(ctx context.Context) { // Flush all data. func (a *Aggregate) Flush() { - a.flush <- struct{}{} - <-a.flush + select { + case <-a.ctx.Done(): + return + case a.flush <- struct{}{}: + case <-time.After(time.Minute): + return + } + + select { + case <-a.flush: + case <-time.After(time.Minute): + } } |
