diff options
Diffstat (limited to 'internal/mapr/server')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 141 |
1 files changed, 83 insertions, 58 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 900756e..922dcbd 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -1,26 +1,28 @@ package server import ( - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/fs" - "github.com/mimecast/dtail/internal/logger" - "github.com/mimecast/dtail/internal/mapr" - "github.com/mimecast/dtail/internal/mapr/logformat" + "context" "os" "strings" "time" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/line" + "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/mapr" + "github.com/mimecast/dtail/internal/mapr/logformat" ) // Aggregate is for aggregating mapreduce data on the DTail server side. type Aggregate struct { // Log lines to process (parsing MAPREDUCE lines). - Lines chan fs.LineRead + Lines chan line.Line // Hostname of the current server (used to populate $hostname field). hostname string - // Signals to exit goroutine. - stop chan struct{} // Signals to serialize data. serialize chan struct{} + // Signals to flush data. + flush chan struct{} // The mapr query query *mapr.Query // The mapr log format parser @@ -28,7 +30,7 @@ type Aggregate struct { } // NewAggregate return a new server side aggregator. -func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) { +func NewAggregate(queryStr string) (*Aggregate, error) { query, err := mapr.NewQuery(queryStr) if err != nil { return nil, err @@ -47,76 +49,98 @@ func NewAggregate(maprLines chan<- string, queryStr string) (*Aggregate, error) } a := Aggregate{ - Lines: make(chan fs.LineRead, 100), - stop: make(chan struct{}), + Lines: make(chan line.Line, 100), serialize: make(chan struct{}), + flush: make(chan struct{}), hostname: s[0], query: query, parser: logParser, } - go a.periodicAggregateTimer() - - fieldsCh := make(chan map[string]string) - go a.readFields(fieldsCh, maprLines) - go a.readLines(fieldsCh) - return &a, nil } -func (a *Aggregate) periodicAggregateTimer() { +// Start an aggregation run. +func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { + fieldsCh := a.linesToFields(ctx) + go a.fieldsToMaprLines(ctx, fieldsCh, maprLines) + a.periodicAggregateTimer(ctx) +} + +func (a *Aggregate) periodicAggregateTimer(ctx context.Context) { for { select { case <-time.After(a.query.Interval): - a.Serialize() - case <-a.stop: + a.Serialize(ctx) + case <-ctx.Done(): return } } } -func (a *Aggregate) readFields(fieldsCh <-chan map[string]string, maprLines chan<- string) { - group := mapr.NewGroupSet() +func (a *Aggregate) linesToFields(ctx context.Context) <-chan map[string]string { + fieldsCh := make(chan map[string]string) - for { - select { - case fields := <-fieldsCh: - a.aggregate(group, fields) - case <-a.serialize: - logger.Info("Serializing mapreduce result") - group.Serialize(maprLines, a.stop) - logger.Info("Done serializing mapreduce result") - group = mapr.NewGroupSet() - case <-a.stop: - return + go func() { + defer close(fieldsCh) + + for { + select { + case line, ok := <-a.Lines: + if !ok { + return + } + + maprLine := strings.TrimSpace(string(line.Content)) + fields, err := a.parser.MakeFields(maprLine) + + if err != nil { + logger.Error(err) + continue + } + if !a.query.WhereClause(fields) { + continue + } + + select { + case fieldsCh <- fields: + case <-ctx.Done(): + } + case <-ctx.Done(): + return + } } - } + }() + + return fieldsCh } -func (a *Aggregate) readLines(fieldsCh chan<- map[string]string) { +func (a *Aggregate) fieldsToMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { + group := mapr.NewGroupSet() + for { select { - case line, ok := <-a.Lines: + case fields, ok := <-fieldsCh: if !ok { + logger.Info("Serializing mapreduce result (final)") + group.Serialize(ctx, maprLines) + group = mapr.NewGroupSet() + logger.Info("Done serializing mapreduce result (final)") return } - - maprLine := strings.TrimSpace(string(line.Content)) - fields, err := a.parser.MakeFields(maprLine) - - if err != nil { - logger.Error(err) - continue - } - if !a.query.WhereClause(fields) { - continue - } - - select { - case fieldsCh <- fields: - case <-a.stop: - } - case <-a.stop: + a.aggregate(group, fields) + case <-a.serialize: + logger.Info("Serializing mapreduce result") + group.Serialize(ctx, maprLines) + group = mapr.NewGroupSet() + logger.Info("Done serializing mapreduce result") + case <-a.flush: + logger.Info("Flushing mapreduce result") + group.Serialize(ctx, maprLines) + group = mapr.NewGroupSet() + a.flush <- struct{}{} + logger.Info("Done flushing mapreduce result") + case <-ctx.Done(): return } } @@ -157,14 +181,15 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { } // Serialize all the aggregated data. -func (a *Aggregate) Serialize() { +func (a *Aggregate) Serialize(ctx context.Context) { select { case a.serialize <- struct{}{}: - case <-a.stop: + case <-ctx.Done(): } } -// Close the aggregator. -func (a *Aggregate) Close() { - close(a.stop) +// Flush all data. +func (a *Aggregate) Flush() { + a.flush <- struct{}{} + <-a.flush } |
