diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-12-10 09:53:48 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-12-10 09:53:48 +0000 |
| commit | 88e999a0bbb0f34ed456ba565f67dce45590a9f5 (patch) | |
| tree | 3e17295c0151db50ea0531d859cd2d6276cdb19c | |
| parent | f9cae7ef40bb517f961d046669f0a2006b2c4ffc (diff) | |
Refactor
| -rw-r--r-- | internal/mapr/server/aggregate.go | 50 |
1 files changed, 31 insertions, 19 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 9e3f68e..ed32f8f 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -165,30 +165,14 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin line, ok, noMoreChannels := a.nextLine() if !ok { if noMoreChannels { - break + return } time.Sleep(time.Millisecond * 100) continue } - maprLine := strings.TrimSpace(line.Content.String()) - line.Recycle() // after this, don't use line object anymore!!! - fields, err := a.parser.MakeFields(maprLine) - - if err != nil { - // Should fields be ignored anyway? - if err != logformat.ErrIgnoreFields { - dlog.Server.Error(fields, err) - } - continue - } - if !a.query.WhereClause(fields) { - continue - } - - select { - case fieldsCh <- fields: - case <-ctx.Done(): + if err := a.fieldFromLine(ctx, line, fieldsCh); err != nil { + dlog.Server.Error(err) } } }() @@ -196,6 +180,34 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin return fieldsCh } +func (a *Aggregate) fieldFromLine(ctx context.Context, line *line.Line, + fieldsCh chan<- map[string]string) error { + + maprLine := strings.TrimSpace(line.Content.String()) + + // after recycling it, don't use line object anymore!!! + line.Recycle() + fields, err := a.parser.MakeFields(maprLine) + + if err != nil { + // Should fields be ignored anyway? + if err != logformat.ErrIgnoreFields { + return err + } + return nil + } + if !a.query.WhereClause(fields) { + return nil + } + + select { + case fieldsCh <- fields: + case <-ctx.Done(): + } + + return nil +} + func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { |
