summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2021-12-05 10:37:20 +0000
committerPaul Buetow <pbuetow@mimecast.com>2021-12-05 10:37:20 +0000
commit7ec5c5b144866c392e3676778041a2ae6aa9d360 (patch)
tree55538e624912e24b94d6faa0ae5f0060b824d1bf /internal
parent6c12fc4b33049111ad6ddc3f62bf979f843fad73 (diff)
buffer line.Line for performance
Diffstat (limited to 'internal')
-rw-r--r--internal/io/fs/filereader.go2
-rw-r--r--internal/io/fs/readfile.go24
-rw-r--r--internal/io/fs/readfilelcontext.go54
-rw-r--r--internal/io/line/line.go41
-rw-r--r--internal/mapr/server/aggregate.go18
-rw-r--r--internal/server/handlers/basehandler.go3
-rw-r--r--internal/server/handlers/healthhandler.go2
-rw-r--r--internal/server/handlers/readcommand.go2
-rw-r--r--internal/server/handlers/serverhandler.go2
9 files changed, 87 insertions, 61 deletions
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
index b05fd39..e27d2a7 100644
--- a/internal/io/fs/filereader.go
+++ b/internal/io/fs/filereader.go
@@ -11,7 +11,7 @@ import (
// FileReader is the interface used on the dtail server to read/cat/grep/mapr...
// a file.
type FileReader interface {
- Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line,
+ Start(ctx context.Context, ltx lcontext.LContext, lines chan<- *line.Line,
re regex.Regex) error
FilePath() string
Retry() bool
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 008111d..f37b07d 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -73,7 +73,7 @@ func (f readFile) Retry() bool {
// Start tailing a log file.
func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
- lines chan<- line.Line, re regex.Regex) error {
+ lines chan<- *line.Line, re regex.Regex) error {
reader, fd, err := f.makeReader()
if fd != nil {
@@ -203,7 +203,7 @@ func (f *readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader,
// Filter log lines matching a given regular expression.
func (f *readFile) filter(ctx context.Context, ltx lcontext.LContext,
- rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+ rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re regex.Regex) {
// Do we have any kind of local context settings? If so then run the more complex
// filterWithLContext method.
@@ -218,31 +218,25 @@ func (f *readFile) filter(ctx context.Context, ltx lcontext.LContext,
f.filterWithoutLContext(ctx, rawLines, lines, re)
}
-func (f *readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int,
- re regex.Regex) (line.Line, bool) {
+func (f *readFile) transmittable(rawLine *bytes.Buffer, length, capacity int,
+ re regex.Regex) (*line.Line, bool) {
- var read line.Line
- if !re.Match(lineBytesBuffer.Bytes()) {
+ newLine := line.Null()
+ if !re.Match(rawLine.Bytes()) {
f.updateLineNotMatched()
f.updateLineNotTransmitted()
- return read, false
+ return newLine, false
}
f.updateLineMatched()
// Can we actually send more messages, channel capacity reached?
if f.canSkipLines && length >= capacity {
f.updateLineNotTransmitted()
- return read, false
+ return newLine, false
}
f.updateLineTransmitted()
- read = line.Line{
- Content: lineBytesBuffer,
- SourceID: f.globID,
- Count: f.totalLineCount(),
- TransmittedPerc: f.transmittedPerc(),
- }
- return read, true
+ return line.New(rawLine, f.totalLineCount(), f.transmittedPerc(), f.globID), true
}
// Check wether log file is truncated. Returns nil if not.
diff --git a/internal/io/fs/readfilelcontext.go b/internal/io/fs/readfilelcontext.go
index d6212f8..0d41a07 100644
--- a/internal/io/fs/readfilelcontext.go
+++ b/internal/io/fs/readfilelcontext.go
@@ -29,18 +29,18 @@ type ltxState struct {
// We don't have any local grep context, which makes life much simpler and more efficient.
func (f *readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer,
- lines chan<- line.Line, re regex.Regex) {
+ lines chan<- *line.Line, re regex.Regex) {
for {
select {
- case line, ok := <-rawLines:
+ case rawLine, ok := <-rawLines:
f.updatePosition()
if !ok {
return
}
- if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok {
+ if newLine, ok := f.transmittable(rawLine, len(lines), cap(lines), re); ok {
select {
- case lines <- filteredLine:
+ case lines <- newLine:
case <-ctx.Done():
return
}
@@ -51,7 +51,7 @@ func (f *readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *b
// Filter log lines matching a given regular expression, however with local grep context.
func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext,
- rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+ rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re regex.Regex) {
var ls ltxState
@@ -73,8 +73,8 @@ func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext
ls.after = 0
ls.processAfter = ltx.AfterContext > 0
- for lineBytesBuffer := range rawLines {
- status := f.filterLineWithLContext(ctx, &ltx, &ls, rawLines, lines, &re, lineBytesBuffer)
+ for rawLine := range rawLines {
+ status := f.filterLineWithLContext(ctx, &ltx, &ls, rawLines, lines, &re, rawLine)
if status == abortReading {
return
}
@@ -83,21 +83,16 @@ func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext
// Filter log lines matching a given regular expression, however with local grep context.
func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LContext,
- ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re *regex.Regex,
- lineBytesBuffer *bytes.Buffer) readStatus {
+ ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re *regex.Regex,
+ rawLine *bytes.Buffer) readStatus {
f.updatePosition()
- if !re.Match(lineBytesBuffer.Bytes()) {
+ if !re.Match(rawLine.Bytes()) {
f.updateLineNotMatched()
if ls.processAfter && ls.after > 0 {
ls.after--
- myLine := line.Line{
- Content: lineBytesBuffer,
- SourceID: f.globID,
- Count: f.totalLineCount(),
- TransmittedPerc: 100,
- }
+ myLine := line.New(rawLine, f.totalLineCount(), 100, f.globID)
select {
case lines <- myLine:
@@ -108,10 +103,10 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo
} else if ls.processBefore {
// Keep last num BeforeContext raw messages.
select {
- case ls.beforeBuf <- lineBytesBuffer:
+ case ls.beforeBuf <- rawLine:
default:
pool.RecycleBytesBuffer(<-ls.beforeBuf)
- ls.beforeBuf <- lineBytesBuffer
+ ls.beforeBuf <- rawLine
}
}
return continueReading
@@ -130,13 +125,8 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo
i := uint64(len(ls.beforeBuf))
for {
select {
- case lineBytesBuffer := <-ls.beforeBuf:
- myLine := line.Line{
- Content: lineBytesBuffer,
- SourceID: f.globID,
- Count: f.totalLineCount() - i,
- TransmittedPerc: 100,
- }
+ case rawLine := <-ls.beforeBuf:
+ myLine := line.New(rawLine, f.totalLineCount()-i, 100, f.globID)
i--
select {
@@ -153,12 +143,7 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo
}
}
- line := line.Line{
- Content: lineBytesBuffer,
- SourceID: f.globID,
- Count: f.totalLineCount(),
- TransmittedPerc: 100,
- }
+ line := line.New(rawLine, f.totalLineCount(), 100, f.globID)
select {
case lines <- line:
@@ -178,3 +163,10 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo
return continueReading
}
+
+/*
+func (f *readFile) filterLineWithLContextNoMatch(ctx context.Context, ltx *lcontext.LContext,
+ ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re *regex.Regex,
+ rawLine *bytes.Buffer) readStatus {
+}
+*/
diff --git a/internal/io/line/line.go b/internal/io/line/line.go
index d306c88..c168274 100644
--- a/internal/io/line/line.go
+++ b/internal/io/line/line.go
@@ -3,8 +3,17 @@ package line
import (
"bytes"
"fmt"
+ "sync"
)
+// lineBuffer is there to optimize memory allocations. DTail otherwise allocates
+// a lot of memory while reading logs.
+var lineBuffer = sync.Pool{
+ New: func() interface{} {
+ return &Line{}
+ },
+}
+
// Line represents a read log line.
type Line struct {
// The content of the log line.
@@ -23,6 +32,22 @@ type Line struct {
SourceID string
}
+func New(content *bytes.Buffer, count uint64, transmittedPerc int, sourceID string) *Line {
+ l := lineBuffer.Get().(*Line)
+ l.Content = content
+ l.Count = count
+ l.TransmittedPerc = transmittedPerc
+ l.SourceID = sourceID
+ return l
+}
+
+// Null returns a new line with all members initialized to their null value.
+func Null() *Line {
+ l := lineBuffer.Get().(*Line)
+ l.NullValues()
+ return l
+}
+
// Return a human readable representation of the followed line.
func (l Line) String() string {
return fmt.Sprintf("Line(Content:%s,TransmittedPerc:%v,Count:%v,SourceID:%s)",
@@ -31,3 +56,19 @@ func (l Line) String() string {
l.Count,
l.SourceID)
}
+
+// Recycle the line. Once done, don't reuse this instance!!!
+func (l *Line) Recycle() {
+ // No explicit reset required, as NewLine overrides all elements
+ // already takes care of it.
+ //l.Reset()
+ lineBuffer.Put(l)
+}
+
+// NullValues nulls all line struct members to their default state.
+func (l *Line) NullValues() {
+ l.Content = nil
+ l.Count = 0
+ l.TransmittedPerc = 0
+ l.SourceID = ""
+}
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index cb0da2b..9e3f68e 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -18,8 +18,8 @@ import (
type Aggregate struct {
done *internal.Done
// NextLinesCh can be used to use a new line ch.
- NextLinesCh chan chan line.Line
- linesCh chan line.Line
+ NextLinesCh chan chan *line.Line
+ linesCh chan *line.Line
// Hostname of the current server (used to populate $hostname field).
hostname string
// Signals to serialize data.
@@ -65,7 +65,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
return &Aggregate{
done: internal.NewDone(),
- NextLinesCh: make(chan chan line.Line, 10),
+ NextLinesCh: make(chan chan *line.Line, 100),
serialize: make(chan struct{}),
hostname: s[0],
query: query,
@@ -113,9 +113,9 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) {
}
}
-func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) {
+func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) {
+ dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels)
- dlog.Server.Trace("nextLine", "entry", line, ok, noMoreChannels)
select {
case line, ok = <-a.linesCh:
if !ok {
@@ -137,8 +137,7 @@ func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) {
// No new lines channel found.
}
}
- dlog.Server.Trace("nextLine", "exit", line, ok, noMoreChannels)
-
+ dlog.Server.Trace("nextLine.exit", line, ok, noMoreChannels)
return
}
@@ -169,13 +168,12 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
break
}
time.Sleep(time.Millisecond * 100)
+ continue
}
maprLine := strings.TrimSpace(line.Content.String())
+ line.Recycle() // after this, don't use line object anymore!!!
fields, err := a.parser.MakeFields(maprLine)
- // Can't recycle it here yet, as field slices are still
- // MAYBETODO: Add capability to recycle this bytes buffer.
- //pool.RecycleBytesBuffer(line.Content)
if err != nil {
// Should fields be ignored anyway?
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index f068944..4fd718e 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -29,7 +29,7 @@ type handleCommandCb func(context.Context, lcontext.LContext, int, []string, str
type baseHandler struct {
done *internal.Done
handleCommandCb handleCommandCb
- lines chan line.Line
+ lines chan *line.Line
aggregate *server.Aggregate
maprMessages chan string
serverMessages chan string
@@ -112,6 +112,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
h.readBuf.WriteByte(protocol.MessageDelimiter)
n = copy(p, h.readBuf.Bytes())
pool.RecycleBytesBuffer(line.Content)
+ line.Recycle()
case <-time.After(time.Second):
select {
diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go
index e7f7762..362fe24 100644
--- a/internal/server/handlers/healthhandler.go
+++ b/internal/server/handlers/healthhandler.go
@@ -23,7 +23,7 @@ func NewHealthHandler(user *user.User) *HealthHandler {
h := HealthHandler{
baseHandler: baseHandler{
done: internal.NewDone(),
- lines: make(chan line.Line, 100),
+ lines: make(chan *line.Line, 100),
serverMessages: make(chan string, 10),
maprMessages: make(chan string, 10),
ackCloseReceived: make(chan struct{}),
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index e425463..85f5b2d 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -165,7 +165,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext,
for {
if aggregate != nil {
- lines = make(chan line.Line, 100)
+ lines = make(chan *line.Line, 100)
aggregate.NextLinesCh <- lines
}
if err := reader.Start(ctx, ltx, lines, re); err != nil {
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 75a8acc..bc22c88 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -31,7 +31,7 @@ func NewServerHandler(user *user.User, catLimiter,
h := ServerHandler{
baseHandler: baseHandler{
done: internal.NewDone(),
- lines: make(chan line.Line, 100),
+ lines: make(chan *line.Line, 100),
serverMessages: make(chan string, 10),
maprMessages: make(chan string, 10),
ackCloseReceived: make(chan struct{}),