summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2021-12-10 09:53:48 +0000
committerPaul Buetow <pbuetow@mimecast.com>2021-12-10 09:53:48 +0000
commit88e999a0bbb0f34ed456ba565f67dce45590a9f5 (patch)
tree3e17295c0151db50ea0531d859cd2d6276cdb19c
parentf9cae7ef40bb517f961d046669f0a2006b2c4ffc (diff)
Refactor
-rw-r--r--internal/mapr/server/aggregate.go50
1 files changed, 31 insertions, 19 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 9e3f68e..ed32f8f 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -165,30 +165,14 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
line, ok, noMoreChannels := a.nextLine()
if !ok {
if noMoreChannels {
- break
+ return
}
time.Sleep(time.Millisecond * 100)
continue
}
- maprLine := strings.TrimSpace(line.Content.String())
- line.Recycle() // after this, don't use line object anymore!!!
- fields, err := a.parser.MakeFields(maprLine)
-
- if err != nil {
- // Should fields be ignored anyway?
- if err != logformat.ErrIgnoreFields {
- dlog.Server.Error(fields, err)
- }
- continue
- }
- if !a.query.WhereClause(fields) {
- continue
- }
-
- select {
- case fieldsCh <- fields:
- case <-ctx.Done():
+ if err := a.fieldFromLine(ctx, line, fieldsCh); err != nil {
+ dlog.Server.Error(err)
}
}
}()
@@ -196,6 +180,34 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
return fieldsCh
}
+func (a *Aggregate) fieldFromLine(ctx context.Context, line *line.Line,
+ fieldsCh chan<- map[string]string) error {
+
+ maprLine := strings.TrimSpace(line.Content.String())
+
+ // after recycling it, don't use line object anymore!!!
+ line.Recycle()
+ fields, err := a.parser.MakeFields(maprLine)
+
+ if err != nil {
+ // Should fields be ignored anyway?
+ if err != logformat.ErrIgnoreFields {
+ return err
+ }
+ return nil
+ }
+ if !a.query.WhereClause(fields) {
+ return nil
+ }
+
+ select {
+ case fieldsCh <- fields:
+ case <-ctx.Done():
+ }
+
+ return nil
+}
+
func (a *Aggregate) setAdditionalFields(ctx context.Context,
fieldsCh <-chan map[string]string) <-chan map[string]string {