summaryrefslogtreecommitdiff
path: root/internal/mapr
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2021-12-05 10:37:20 +0000
committerPaul Buetow <pbuetow@mimecast.com>2021-12-05 10:37:20 +0000
commit7ec5c5b144866c392e3676778041a2ae6aa9d360 (patch)
tree55538e624912e24b94d6faa0ae5f0060b824d1bf /internal/mapr
parent6c12fc4b33049111ad6ddc3f62bf979f843fad73 (diff)
buffer line.Line for performance
Diffstat (limited to 'internal/mapr')
-rw-r--r--internal/mapr/server/aggregate.go18
1 files changed, 8 insertions, 10 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index cb0da2b..9e3f68e 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -18,8 +18,8 @@ import (
type Aggregate struct {
done *internal.Done
// NextLinesCh can be used to use a new line ch.
- NextLinesCh chan chan line.Line
- linesCh chan line.Line
+ NextLinesCh chan chan *line.Line
+ linesCh chan *line.Line
// Hostname of the current server (used to populate $hostname field).
hostname string
// Signals to serialize data.
@@ -65,7 +65,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
return &Aggregate{
done: internal.NewDone(),
- NextLinesCh: make(chan chan line.Line, 10),
+ NextLinesCh: make(chan chan *line.Line, 100),
serialize: make(chan struct{}),
hostname: s[0],
query: query,
@@ -113,9 +113,9 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) {
}
}
-func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) {
+func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) {
+ dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels)
- dlog.Server.Trace("nextLine", "entry", line, ok, noMoreChannels)
select {
case line, ok = <-a.linesCh:
if !ok {
@@ -137,8 +137,7 @@ func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) {
// No new lines channel found.
}
}
- dlog.Server.Trace("nextLine", "exit", line, ok, noMoreChannels)
-
+ dlog.Server.Trace("nextLine.exit", line, ok, noMoreChannels)
return
}
@@ -169,13 +168,12 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
break
}
time.Sleep(time.Millisecond * 100)
+ continue
}
maprLine := strings.TrimSpace(line.Content.String())
+ line.Recycle() // after this, don't use line object anymore!!!
fields, err := a.parser.MakeFields(maprLine)
- // Can't recycle it here yet, as field slices are still
- // MAYBETODO: Add capability to recycle this bytes buffer.
- //pool.RecycleBytesBuffer(line.Content)
if err != nil {
// Should fields be ignored anyway?