diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
| commit | f4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch) | |
| tree | ea5e4a2d2a67035f645bdee496ae55a52034178a /internal/mapr/server | |
| parent | d80d6070557e3a800e3a54967af9eced518f116b (diff) | |
| parent | 739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff) | |
merge develop
Diffstat (limited to 'internal/mapr/server')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 155 |
1 files changed, 68 insertions, 87 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 28bb074..97fee11 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -8,25 +8,22 @@ import ( "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" + "github.com/mimecast/dtail/internal/protocol" ) // Aggregate is for aggregating mapreduce data on the DTail server side. type Aggregate struct { done *internal.Done - // Log lines to process (parsing MAPREDUCE lines). - Lines chan line.Line + // NextLinesCh can be used to use a new line ch. + NextLinesCh chan chan line.Line // Hostname of the current server (used to populate $hostname field). hostname string // Signals to serialize data. serialize chan struct{} - // Signals to flush data. - flush chan struct{} - // Signals that data has been flushed - flushed chan struct{} // The mapr query query *mapr.Query // The mapr log format parser @@ -42,7 +39,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { fqdn, err := os.Hostname() if err != nil { - logger.Error(err) + dlog.Common.Error(err) } s := strings.Split(fqdn, ".") @@ -57,38 +54,32 @@ func NewAggregate(queryStr string) (*Aggregate, error) { parserName = query.LogFormat } - logger.Info("Creating log format parser", parserName) + dlog.Common.Info("Creating log format parser", parserName) logParser, err := logformat.NewParser(parserName, query) if err != nil { - logger.Error("Could not create log format parser. Falling back to 'generic'", err) + dlog.Common.Error("Could not create log format parser. Falling back to 'generic'", err) if logParser, err = logformat.NewParser("generic", query); err != nil { - logger.FatalExit("Could not create log format parser", err) + dlog.Common.FatalPanic("Could not create log format parser", err) } } - a := Aggregate{ - done: internal.NewDone(), - Lines: make(chan line.Line, 100), - serialize: make(chan struct{}), - flush: make(chan struct{}), - flushed: make(chan struct{}), - hostname: s[0], - query: query, - parser: logParser, - } - - return &a, nil + return &Aggregate{ + done: internal.NewDone(), + NextLinesCh: make(chan chan line.Line, 10), + serialize: make(chan struct{}), + hostname: s[0], + query: query, + parser: logParser, + }, nil } // Shutdown the aggregation engine. func (a *Aggregate) Shutdown() { - a.Flush() a.done.Shutdown() } // Start an aggregation. -func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { - +func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) { myCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -101,15 +92,14 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { } }() - fieldsCh := a.makeFields(myCtx) - + fieldsCh := a.fieldsFromLines(myCtx) // Add fields (e.g. via 'set' clause) if len(a.query.Set) > 0 { - fieldsCh = a.addFields(myCtx, fieldsCh) + fieldsCh = a.setAdditionalFields(myCtx, fieldsCh) } - + // Periodically pre-aggregate data every a.query.Interval seconds. go a.aggregateTimer(myCtx) - a.makeMaprLines(myCtx, fieldsCh, maprLines) + a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages) } func (a *Aggregate) aggregateTimer(ctx context.Context) { @@ -123,25 +113,46 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { } } -func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { - ch := make(chan map[string]string) +func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string { + fieldsCh := make(chan map[string]string) go func() { - defer close(ch) + defer close(fieldsCh) + var lines chan line.Line + + // Gather first lines channel (first input file) + select { + case lines = <-a.NextLinesCh: + case <-ctx.Done(): + return + } for { select { - case line, ok := <-a.Lines: + case line, ok := <-lines: if !ok { - return + select { + case lines = <-a.NextLinesCh: + // Have a new lines channel (e.g. new input file) + case <-ctx.Done(): + default: + // No new lines channel found. + return + } } - maprLine := strings.TrimSpace(string(line.Content)) + maprLine := strings.TrimSpace(line.Content.String()) fields, err := a.parser.MakeFields(maprLine) - logger.Debug(fields, err) + // Can't recycle it here yet, as field slices are still + // TODO: Add unit test reading from multiple mapreduce files lines. + // TODO: Add capability to recycle this bytes buffer. + //pool.RecycleBytesBuffer(line.Content) if err != nil { - logger.Error(err) + // Should fields be ignored anyway? + if err != logformat.ErrIgnoreFields { + dlog.Common.Error(fields, err) + } continue } if !a.query.WhereClause(fields) { @@ -149,7 +160,7 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { } select { - case ch <- fields: + case fieldsCh <- fields: case <-ctx.Done(): } case <-ctx.Done(): @@ -158,45 +169,42 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { } }() - return ch + return fieldsCh } -func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { - ch := make(chan map[string]string) +func (a *Aggregate) setAdditionalFields(ctx context.Context, + fieldsCh <-chan map[string]string) <-chan map[string]string { + newFieldsCh := make(chan map[string]string) go func() { - defer close(ch) - + defer close(newFieldsCh) for { - // fieldsCh will be closed via 'makeFields' if ctx is done fields, ok := <-fieldsCh if !ok { return } if err := a.query.SetClause(fields); err != nil { - logger.Error(err) + dlog.Common.Error(err) } select { - case ch <- fields: + case newFieldsCh <- fields: case <-ctx.Done(): } } }() - - return ch + return newFieldsCh } -func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { - group := mapr.NewGroupSet() +func (a *Aggregate) aggregateAndSerialize(ctx context.Context, + fieldsCh <-chan map[string]string, maprMessages chan<- string) { + group := mapr.NewGroupSet() serialize := func() { - logger.Info("Serializing mapreduce result") - group.Serialize(ctx, maprLines) + dlog.Common.Info("Serializing mapreduce result") + group.Serialize(ctx, maprMessages) group = mapr.NewGroupSet() - logger.Info("Done serializing mapreduce result") } - for { select { case fields, ok := <-fieldsCh: @@ -207,9 +215,6 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin a.aggregate(group, fields) case <-a.serialize: serialize() - case <-a.flush: - serialize() - a.flushed <- struct{}{} case <-ctx.Done(): return } @@ -217,12 +222,10 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin } func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { - //logger.Trace("Aggregating", group, fields) var sb strings.Builder - for i, field := range a.query.GroupBy { if i > 0 { - sb.WriteString(" ") + sb.WriteString(protocol.AggregateGroupKeyCombinator) } if val, ok := fields[field]; ok { sb.WriteString(val) @@ -235,7 +238,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { for _, sc := range a.query.Select { if val, ok := fields[sc.Field]; ok { if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil { - logger.Error(err) + dlog.Common.Error(err) continue } addedSample = true @@ -246,8 +249,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) { set.Samples++ return } - - logger.Trace("Aggregated data locally without adding new samples") + dlog.Common.Trace("Aggregated data locally without adding new samples") } // Serialize all the aggregated data. @@ -255,28 +257,7 @@ func (a *Aggregate) Serialize(ctx context.Context) { select { case a.serialize <- struct{}{}: case <-time.After(time.Minute): - logger.Warn("Starting to serialize mapredice data takes over a minute") + dlog.Common.Warn("Starting to serialize mapredice data takes over a minute") case <-ctx.Done(): } } - -// Flush all data. -func (a *Aggregate) Flush() { - select { - case a.flush <- struct{}{}: - logger.Info("Flushing mapreduce data") - case <-time.After(time.Minute): - logger.Warn("Starting to flush mapreduce data takes over a minute") - return - case <-a.done.Done(): - return - } - - select { - case <-a.flushed: - logger.Info("Done flushing") - case <-time.After(time.Minute): - logger.Warn("Waiting for data to be flushed takes over a minute") - case <-a.done.Done(): - } -} |
