summaryrefslogtreecommitdiff
path: root/internal/mapr/server
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-03-04 16:32:27 +0000
committerPaul Buetow <pbuetow@mimecast.com>2020-03-04 16:32:27 +0000
commit238dd3930e9c58397a86f649c77912ee32e4d7f0 (patch)
treeb4cda0b8c677188b600478522471628b5d4efea4 /internal/mapr/server
parent89d3ebfc4e0c947977e5f455ee76f3ce29cc92c7 (diff)
can tail probe with a given timeout and then write a mapreduce result
Diffstat (limited to 'internal/mapr/server')
-rw-r--r--internal/mapr/server/aggregate.go59
1 files changed, 46 insertions, 13 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 922dcbd..fade689 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -27,6 +27,8 @@ type Aggregate struct {
query *mapr.Query
// The mapr log format parser
parser *logformat.Parser
+ cancel context.CancelFunc
+ ctx context.Context
}
// NewAggregate return a new server side aggregator.
@@ -48,6 +50,8 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
logger.FatalExit("Could not create mapr log format parser", err)
}
+ ctx, cancel := context.WithCancel(context.Background())
+
a := Aggregate{
Lines: make(chan line.Line, 100),
serialize: make(chan struct{}),
@@ -55,18 +59,27 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
hostname: s[0],
query: query,
parser: logParser,
+ ctx: ctx,
+ cancel: cancel,
}
return &a, nil
}
-// Start an aggregation run.
+// Start an aggregation.
func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
+ defer a.cancel()
+
fieldsCh := a.linesToFields(ctx)
go a.fieldsToMaprLines(ctx, fieldsCh, maprLines)
a.periodicAggregateTimer(ctx)
}
+// Cancel the aggregation.
+func (a *Aggregate) Cancel() {
+ a.cancel()
+}
+
func (a *Aggregate) periodicAggregateTimer(ctx context.Context) {
for {
select {
@@ -74,6 +87,8 @@ func (a *Aggregate) periodicAggregateTimer(ctx context.Context) {
a.Serialize(ctx)
case <-ctx.Done():
return
+ case <-a.ctx.Done():
+ return
}
}
}
@@ -108,6 +123,8 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string
}
case <-ctx.Done():
return
+ case <-a.ctx.Done():
+ return
}
}
}()
@@ -118,30 +135,36 @@ func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string
func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) {
group := mapr.NewGroupSet()
+ serialize := func() {
+ logger.Info("Serializing mapreduce result")
+ group.Serialize(ctx, maprLines)
+ group = mapr.NewGroupSet()
+ logger.Info("Done serializing mapreduce result")
+ }
+
for {
select {
case fields, ok := <-fieldsCh:
if !ok {
- logger.Info("Serializing mapreduce result (final)")
- group.Serialize(ctx, maprLines)
- group = mapr.NewGroupSet()
- logger.Info("Done serializing mapreduce result (final)")
+ serialize()
return
}
a.aggregate(group, fields)
case <-a.serialize:
- logger.Info("Serializing mapreduce result")
- group.Serialize(ctx, maprLines)
- group = mapr.NewGroupSet()
- logger.Info("Done serializing mapreduce result")
+ serialize()
case <-a.flush:
logger.Info("Flushing mapreduce result")
- group.Serialize(ctx, maprLines)
- group = mapr.NewGroupSet()
+ serialize()
a.flush <- struct{}{}
logger.Info("Done flushing mapreduce result")
case <-ctx.Done():
return
+ case <-a.ctx.Done():
+ logger.Info("Flushing mapreduce result")
+ serialize()
+ a.flush <- struct{}{}
+ logger.Info("Done flushing mapreduce result")
+ return
}
}
}
@@ -190,6 +213,16 @@ func (a *Aggregate) Serialize(ctx context.Context) {
// Flush all data.
func (a *Aggregate) Flush() {
- a.flush <- struct{}{}
- <-a.flush
+ select {
+ case <-a.ctx.Done():
+ return
+ case a.flush <- struct{}{}:
+ case <-time.After(time.Minute):
+ return
+ }
+
+ select {
+ case <-a.flush:
+ case <-time.After(time.Minute):
+ }
}