summaryrefslogtreecommitdiff
path: root/internal/mapr/server/aggregate.go
diff options
context:
space:
mode:
authorPaul Buetow <35781042+pbuetow@users.noreply.github.com>2021-10-24 18:05:47 +0300
committerGitHub <noreply@github.com>2021-10-24 18:05:47 +0300
commit3d24204754aff155de21b01e9e3d82eb460fb87f (patch)
tree093fb4bff0bdf086188df86ca5d13dc7f8a34e4f /internal/mapr/server/aggregate.go
parent6edea198188172c603e10201aa2302a28b7b722f (diff)
parent6cfc4e161f94ab159d4b1ea491ffe6f166fa6204 (diff)
Merge pull request #24 from snonux/develop
Bugfixes around integration tests
Diffstat (limited to 'internal/mapr/server/aggregate.go')
-rw-r--r--internal/mapr/server/aggregate.go96
1 files changed, 61 insertions, 35 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 97fee11..4162828 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -2,7 +2,6 @@ package server
import (
"context"
- "os"
"strings"
"time"
@@ -20,6 +19,7 @@ type Aggregate struct {
done *internal.Done
// NextLinesCh can be used to use a new line ch.
NextLinesCh chan chan line.Line
+ linesCh chan line.Line
// Hostname of the current server (used to populate $hostname field).
hostname string
// Signals to serialize data.
@@ -37,7 +37,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
return nil, err
}
- fqdn, err := os.Hostname()
+ fqdn, err := config.Hostname()
if err != nil {
dlog.Common.Error(err)
}
@@ -113,58 +113,84 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) {
}
}
+func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) {
+
+ dlog.Common.Trace("nextLine", "entry", line, ok, noMoreChannels)
+ select {
+ case line, ok = <-a.linesCh:
+ if !ok {
+ // Channel is closed, go to next channel.
+ select {
+ case a.linesCh = <-a.NextLinesCh:
+ default:
+ noMoreChannels = true
+ }
+ }
+ default:
+ // No new line from current lines channel. Try next one.
+ select {
+ case newLinesCh := <-a.NextLinesCh:
+ oldLinesCh := a.linesCh
+ go func() { a.NextLinesCh <- oldLinesCh }()
+ a.linesCh = newLinesCh
+ default:
+ // No new lines channel found.
+ }
+ }
+ dlog.Common.Trace("nextLine", "exit", line, ok, noMoreChannels)
+
+ return
+}
+
func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string {
fieldsCh := make(chan map[string]string)
go func() {
defer close(fieldsCh)
- var lines chan line.Line
// Gather first lines channel (first input file)
select {
- case lines = <-a.NextLinesCh:
+ case a.linesCh = <-a.NextLinesCh:
case <-ctx.Done():
return
}
for {
select {
- case line, ok := <-lines:
- if !ok {
- select {
- case lines = <-a.NextLinesCh:
- // Have a new lines channel (e.g. new input file)
- case <-ctx.Done():
- default:
- // No new lines channel found.
- return
- }
- }
+ case <-ctx.Done():
+ return
+ default:
+ }
- maprLine := strings.TrimSpace(line.Content.String())
- fields, err := a.parser.MakeFields(maprLine)
- // Can't recycle it here yet, as field slices are still
- // TODO: Add unit test reading from multiple mapreduce files lines.
- // TODO: Add capability to recycle this bytes buffer.
- //pool.RecycleBytesBuffer(line.Content)
-
- if err != nil {
- // Should fields be ignored anyway?
- if err != logformat.ErrIgnoreFields {
- dlog.Common.Error(fields, err)
- }
- continue
- }
- if !a.query.WhereClause(fields) {
- continue
+ // Gather first lines channel (first input file)
+ line, ok, noMoreChannels := a.nextLine()
+ if !ok {
+ if noMoreChannels {
+ break
}
+ time.Sleep(time.Millisecond * 100)
+ }
+
+ maprLine := strings.TrimSpace(line.Content.String())
+ fields, err := a.parser.MakeFields(maprLine)
+ // Can't recycle it here yet, as field slices are still
+ // MAYBETODO: Add capability to recycle this bytes buffer.
+ //pool.RecycleBytesBuffer(line.Content)
- select {
- case fieldsCh <- fields:
- case <-ctx.Done():
+ if err != nil {
+ // Should fields be ignored anyway?
+ if err != logformat.ErrIgnoreFields {
+ dlog.Common.Error(fields, err)
}
+ continue
+ }
+ if !a.query.WhereClause(fields) {
+ continue
+ }
+
+ select {
+ case fieldsCh <- fields:
case <-ctx.Done():
- return
}
}
}()