diff options
| author | Paul Buetow <35781042+pbuetow@users.noreply.github.com> | 2021-10-24 18:05:47 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-10-24 18:05:47 +0300 |
| commit | 3d24204754aff155de21b01e9e3d82eb460fb87f (patch) | |
| tree | 093fb4bff0bdf086188df86ca5d13dc7f8a34e4f /internal/mapr/server/aggregate.go | |
| parent | 6edea198188172c603e10201aa2302a28b7b722f (diff) | |
| parent | 6cfc4e161f94ab159d4b1ea491ffe6f166fa6204 (diff) | |
Merge pull request #24 from snonux/develop
Bugfixes around integration tests
Diffstat (limited to 'internal/mapr/server/aggregate.go')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 96 |
1 files changed, 61 insertions, 35 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 97fee11..4162828 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -2,7 +2,6 @@ package server import ( "context" - "os" "strings" "time" @@ -20,6 +19,7 @@ type Aggregate struct { done *internal.Done // NextLinesCh can be used to use a new line ch. NextLinesCh chan chan line.Line + linesCh chan line.Line // Hostname of the current server (used to populate $hostname field). hostname string // Signals to serialize data. @@ -37,7 +37,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { return nil, err } - fqdn, err := os.Hostname() + fqdn, err := config.Hostname() if err != nil { dlog.Common.Error(err) } @@ -113,58 +113,84 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { } } +func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) { + + dlog.Common.Trace("nextLine", "entry", line, ok, noMoreChannels) + select { + case line, ok = <-a.linesCh: + if !ok { + // Channel is closed, go to next channel. + select { + case a.linesCh = <-a.NextLinesCh: + default: + noMoreChannels = true + } + } + default: + // No new line from current lines channel. Try next one. + select { + case newLinesCh := <-a.NextLinesCh: + oldLinesCh := a.linesCh + go func() { a.NextLinesCh <- oldLinesCh }() + a.linesCh = newLinesCh + default: + // No new lines channel found. + } + } + dlog.Common.Trace("nextLine", "exit", line, ok, noMoreChannels) + + return +} + func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string { fieldsCh := make(chan map[string]string) go func() { defer close(fieldsCh) - var lines chan line.Line // Gather first lines channel (first input file) select { - case lines = <-a.NextLinesCh: + case a.linesCh = <-a.NextLinesCh: case <-ctx.Done(): return } for { select { - case line, ok := <-lines: - if !ok { - select { - case lines = <-a.NextLinesCh: - // Have a new lines channel (e.g. new input file) - case <-ctx.Done(): - default: - // No new lines channel found. - return - } - } + case <-ctx.Done(): + return + default: + } - maprLine := strings.TrimSpace(line.Content.String()) - fields, err := a.parser.MakeFields(maprLine) - // Can't recycle it here yet, as field slices are still - // TODO: Add unit test reading from multiple mapreduce files lines. - // TODO: Add capability to recycle this bytes buffer. - //pool.RecycleBytesBuffer(line.Content) - - if err != nil { - // Should fields be ignored anyway? - if err != logformat.ErrIgnoreFields { - dlog.Common.Error(fields, err) - } - continue - } - if !a.query.WhereClause(fields) { - continue + // Gather first lines channel (first input file) + line, ok, noMoreChannels := a.nextLine() + if !ok { + if noMoreChannels { + break } + time.Sleep(time.Millisecond * 100) + } + + maprLine := strings.TrimSpace(line.Content.String()) + fields, err := a.parser.MakeFields(maprLine) + // Can't recycle it here yet, as field slices are still + // MAYBETODO: Add capability to recycle this bytes buffer. + //pool.RecycleBytesBuffer(line.Content) - select { - case fieldsCh <- fields: - case <-ctx.Done(): + if err != nil { + // Should fields be ignored anyway? + if err != logformat.ErrIgnoreFields { + dlog.Common.Error(fields, err) } + continue + } + if !a.query.WhereClause(fields) { + continue + } + + select { + case fieldsCh <- fields: case <-ctx.Done(): - return } } }() |
