summaryrefslogtreecommitdiff
path: root/internal/mapr/server/aggregate.go
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2021-10-21 21:28:49 +0300
committerPaul Buetow <pbuetow@mimecast.com>2021-10-21 21:28:49 +0300
commitf4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch)
treeea5e4a2d2a67035f645bdee496ae55a52034178a /internal/mapr/server/aggregate.go
parentd80d6070557e3a800e3a54967af9eced518f116b (diff)
parent739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff)
merge develop
Diffstat (limited to 'internal/mapr/server/aggregate.go')
-rw-r--r--internal/mapr/server/aggregate.go155
1 files changed, 68 insertions, 87 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 28bb074..97fee11 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -8,25 +8,22 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
+ "github.com/mimecast/dtail/internal/protocol"
)
// Aggregate is for aggregating mapreduce data on the DTail server side.
type Aggregate struct {
done *internal.Done
- // Log lines to process (parsing MAPREDUCE lines).
- Lines chan line.Line
+ // NextLinesCh can be used to use a new line ch.
+ NextLinesCh chan chan line.Line
// Hostname of the current server (used to populate $hostname field).
hostname string
// Signals to serialize data.
serialize chan struct{}
- // Signals to flush data.
- flush chan struct{}
- // Signals that data has been flushed
- flushed chan struct{}
// The mapr query
query *mapr.Query
// The mapr log format parser
@@ -42,7 +39,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
fqdn, err := os.Hostname()
if err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
}
s := strings.Split(fqdn, ".")
@@ -57,38 +54,32 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
parserName = query.LogFormat
}
- logger.Info("Creating log format parser", parserName)
+ dlog.Common.Info("Creating log format parser", parserName)
logParser, err := logformat.NewParser(parserName, query)
if err != nil {
- logger.Error("Could not create log format parser. Falling back to 'generic'", err)
+ dlog.Common.Error("Could not create log format parser. Falling back to 'generic'", err)
if logParser, err = logformat.NewParser("generic", query); err != nil {
- logger.FatalExit("Could not create log format parser", err)
+ dlog.Common.FatalPanic("Could not create log format parser", err)
}
}
- a := Aggregate{
- done: internal.NewDone(),
- Lines: make(chan line.Line, 100),
- serialize: make(chan struct{}),
- flush: make(chan struct{}),
- flushed: make(chan struct{}),
- hostname: s[0],
- query: query,
- parser: logParser,
- }
-
- return &a, nil
+ return &Aggregate{
+ done: internal.NewDone(),
+ NextLinesCh: make(chan chan line.Line, 10),
+ serialize: make(chan struct{}),
+ hostname: s[0],
+ query: query,
+ parser: logParser,
+ }, nil
}
// Shutdown the aggregation engine.
func (a *Aggregate) Shutdown() {
- a.Flush()
a.done.Shutdown()
}
// Start an aggregation.
-func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
-
+func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) {
myCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -101,15 +92,14 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
}
}()
- fieldsCh := a.makeFields(myCtx)
-
+ fieldsCh := a.fieldsFromLines(myCtx)
// Add fields (e.g. via 'set' clause)
if len(a.query.Set) > 0 {
- fieldsCh = a.addFields(myCtx, fieldsCh)
+ fieldsCh = a.setAdditionalFields(myCtx, fieldsCh)
}
-
+ // Periodically pre-aggregate data every a.query.Interval seconds.
go a.aggregateTimer(myCtx)
- a.makeMaprLines(myCtx, fieldsCh, maprLines)
+ a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages)
}
func (a *Aggregate) aggregateTimer(ctx context.Context) {
@@ -123,25 +113,46 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) {
}
}
-func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
- ch := make(chan map[string]string)
+func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string {
+ fieldsCh := make(chan map[string]string)
go func() {
- defer close(ch)
+ defer close(fieldsCh)
+ var lines chan line.Line
+
+ // Gather first lines channel (first input file)
+ select {
+ case lines = <-a.NextLinesCh:
+ case <-ctx.Done():
+ return
+ }
for {
select {
- case line, ok := <-a.Lines:
+ case line, ok := <-lines:
if !ok {
- return
+ select {
+ case lines = <-a.NextLinesCh:
+ // Have a new lines channel (e.g. new input file)
+ case <-ctx.Done():
+ default:
+ // No new lines channel found.
+ return
+ }
}
- maprLine := strings.TrimSpace(string(line.Content))
+ maprLine := strings.TrimSpace(line.Content.String())
fields, err := a.parser.MakeFields(maprLine)
- logger.Debug(fields, err)
+ // Can't recycle it here yet, as field slices are still
+ // TODO: Add unit test reading from multiple mapreduce files lines.
+ // TODO: Add capability to recycle this bytes buffer.
+ //pool.RecycleBytesBuffer(line.Content)
if err != nil {
- logger.Error(err)
+ // Should fields be ignored anyway?
+ if err != logformat.ErrIgnoreFields {
+ dlog.Common.Error(fields, err)
+ }
continue
}
if !a.query.WhereClause(fields) {
@@ -149,7 +160,7 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
}
select {
- case ch <- fields:
+ case fieldsCh <- fields:
case <-ctx.Done():
}
case <-ctx.Done():
@@ -158,45 +169,42 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
}
}()
- return ch
+ return fieldsCh
}
-func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string {
- ch := make(chan map[string]string)
+func (a *Aggregate) setAdditionalFields(ctx context.Context,
+ fieldsCh <-chan map[string]string) <-chan map[string]string {
+ newFieldsCh := make(chan map[string]string)
go func() {
- defer close(ch)
-
+ defer close(newFieldsCh)
for {
- // fieldsCh will be closed via 'makeFields' if ctx is done
fields, ok := <-fieldsCh
if !ok {
return
}
if err := a.query.SetClause(fields); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
}
select {
- case ch <- fields:
+ case newFieldsCh <- fields:
case <-ctx.Done():
}
}
}()
-
- return ch
+ return newFieldsCh
}
-func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) {
- group := mapr.NewGroupSet()
+func (a *Aggregate) aggregateAndSerialize(ctx context.Context,
+ fieldsCh <-chan map[string]string, maprMessages chan<- string) {
+ group := mapr.NewGroupSet()
serialize := func() {
- logger.Info("Serializing mapreduce result")
- group.Serialize(ctx, maprLines)
+ dlog.Common.Info("Serializing mapreduce result")
+ group.Serialize(ctx, maprMessages)
group = mapr.NewGroupSet()
- logger.Info("Done serializing mapreduce result")
}
-
for {
select {
case fields, ok := <-fieldsCh:
@@ -207,9 +215,6 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin
a.aggregate(group, fields)
case <-a.serialize:
serialize()
- case <-a.flush:
- serialize()
- a.flushed <- struct{}{}
case <-ctx.Done():
return
}
@@ -217,12 +222,10 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin
}
func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
- //logger.Trace("Aggregating", group, fields)
var sb strings.Builder
-
for i, field := range a.query.GroupBy {
if i > 0 {
- sb.WriteString(" ")
+ sb.WriteString(protocol.AggregateGroupKeyCombinator)
}
if val, ok := fields[field]; ok {
sb.WriteString(val)
@@ -235,7 +238,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
for _, sc := range a.query.Select {
if val, ok := fields[sc.Field]; ok {
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
continue
}
addedSample = true
@@ -246,8 +249,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
set.Samples++
return
}
-
- logger.Trace("Aggregated data locally without adding new samples")
+ dlog.Common.Trace("Aggregated data locally without adding new samples")
}
// Serialize all the aggregated data.
@@ -255,28 +257,7 @@ func (a *Aggregate) Serialize(ctx context.Context) {
select {
case a.serialize <- struct{}{}:
case <-time.After(time.Minute):
- logger.Warn("Starting to serialize mapredice data takes over a minute")
+ dlog.Common.Warn("Starting to serialize mapredice data takes over a minute")
case <-ctx.Done():
}
}
-
-// Flush all data.
-func (a *Aggregate) Flush() {
- select {
- case a.flush <- struct{}{}:
- logger.Info("Flushing mapreduce data")
- case <-time.After(time.Minute):
- logger.Warn("Starting to flush mapreduce data takes over a minute")
- return
- case <-a.done.Done():
- return
- }
-
- select {
- case <-a.flushed:
- logger.Info("Done flushing")
- case <-time.After(time.Minute):
- logger.Warn("Waiting for data to be flushed takes over a minute")
- case <-a.done.Done():
- }
-}