diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-12-05 10:37:20 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-12-05 10:37:20 +0000 |
| commit | 7ec5c5b144866c392e3676778041a2ae6aa9d360 (patch) | |
| tree | 55538e624912e24b94d6faa0ae5f0060b824d1bf /internal/mapr | |
| parent | 6c12fc4b33049111ad6ddc3f62bf979f843fad73 (diff) | |
buffer line.Line for performance
Diffstat (limited to 'internal/mapr')
| -rw-r--r-- | internal/mapr/server/aggregate.go | 18 |
1 files changed, 8 insertions, 10 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index cb0da2b..9e3f68e 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -18,8 +18,8 @@ import ( type Aggregate struct { done *internal.Done // NextLinesCh can be used to use a new line ch. - NextLinesCh chan chan line.Line - linesCh chan line.Line + NextLinesCh chan chan *line.Line + linesCh chan *line.Line // Hostname of the current server (used to populate $hostname field). hostname string // Signals to serialize data. @@ -65,7 +65,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { return &Aggregate{ done: internal.NewDone(), - NextLinesCh: make(chan chan line.Line, 10), + NextLinesCh: make(chan chan *line.Line, 100), serialize: make(chan struct{}), hostname: s[0], query: query, @@ -113,9 +113,9 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { } } -func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) { +func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) { + dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels) - dlog.Server.Trace("nextLine", "entry", line, ok, noMoreChannels) select { case line, ok = <-a.linesCh: if !ok { @@ -137,8 +137,7 @@ func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) { // No new lines channel found. } } - dlog.Server.Trace("nextLine", "exit", line, ok, noMoreChannels) - + dlog.Server.Trace("nextLine.exit", line, ok, noMoreChannels) return } @@ -169,13 +168,12 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin break } time.Sleep(time.Millisecond * 100) + continue } maprLine := strings.TrimSpace(line.Content.String()) + line.Recycle() // after this, don't use line object anymore!!! fields, err := a.parser.MakeFields(maprLine) - // Can't recycle it here yet, as field slices are still - // MAYBETODO: Add capability to recycle this bytes buffer. - //pool.RecycleBytesBuffer(line.Content) if err != nil { // Should fields be ignored anyway? |
