diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-12-05 10:37:20 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-12-05 10:37:20 +0000 |
| commit | 7ec5c5b144866c392e3676778041a2ae6aa9d360 (patch) | |
| tree | 55538e624912e24b94d6faa0ae5f0060b824d1bf /internal/io | |
| parent | 6c12fc4b33049111ad6ddc3f62bf979f843fad73 (diff) | |
buffer line.Line for performance
Diffstat (limited to 'internal/io')
| -rw-r--r-- | internal/io/fs/filereader.go | 2 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 24 | ||||
| -rw-r--r-- | internal/io/fs/readfilelcontext.go | 54 | ||||
| -rw-r--r-- | internal/io/line/line.go | 41 |
4 files changed, 74 insertions, 47 deletions
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index b05fd39..e27d2a7 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -11,7 +11,7 @@ import ( // FileReader is the interface used on the dtail server to read/cat/grep/mapr... // a file. type FileReader interface { - Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line, + Start(ctx context.Context, ltx lcontext.LContext, lines chan<- *line.Line, re regex.Regex) error FilePath() string Retry() bool diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 008111d..f37b07d 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -73,7 +73,7 @@ func (f readFile) Retry() bool { // Start tailing a log file. func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, - lines chan<- line.Line, re regex.Regex) error { + lines chan<- *line.Line, re regex.Regex) error { reader, fd, err := f.makeReader() if fd != nil { @@ -203,7 +203,7 @@ func (f *readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader, // Filter log lines matching a given regular expression. func (f *readFile) filter(ctx context.Context, ltx lcontext.LContext, - rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re regex.Regex) { // Do we have any kind of local context settings? If so then run the more complex // filterWithLContext method. @@ -218,31 +218,25 @@ func (f *readFile) filter(ctx context.Context, ltx lcontext.LContext, f.filterWithoutLContext(ctx, rawLines, lines, re) } -func (f *readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int, - re regex.Regex) (line.Line, bool) { +func (f *readFile) transmittable(rawLine *bytes.Buffer, length, capacity int, + re regex.Regex) (*line.Line, bool) { - var read line.Line - if !re.Match(lineBytesBuffer.Bytes()) { + newLine := line.Null() + if !re.Match(rawLine.Bytes()) { f.updateLineNotMatched() f.updateLineNotTransmitted() - return read, false + return newLine, false } f.updateLineMatched() // Can we actually send more messages, channel capacity reached? if f.canSkipLines && length >= capacity { f.updateLineNotTransmitted() - return read, false + return newLine, false } f.updateLineTransmitted() - read = line.Line{ - Content: lineBytesBuffer, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: f.transmittedPerc(), - } - return read, true + return line.New(rawLine, f.totalLineCount(), f.transmittedPerc(), f.globID), true } // Check wether log file is truncated. Returns nil if not. diff --git a/internal/io/fs/readfilelcontext.go b/internal/io/fs/readfilelcontext.go index d6212f8..0d41a07 100644 --- a/internal/io/fs/readfilelcontext.go +++ b/internal/io/fs/readfilelcontext.go @@ -29,18 +29,18 @@ type ltxState struct { // We don't have any local grep context, which makes life much simpler and more efficient. func (f *readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer, - lines chan<- line.Line, re regex.Regex) { + lines chan<- *line.Line, re regex.Regex) { for { select { - case line, ok := <-rawLines: + case rawLine, ok := <-rawLines: f.updatePosition() if !ok { return } - if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { + if newLine, ok := f.transmittable(rawLine, len(lines), cap(lines), re); ok { select { - case lines <- filteredLine: + case lines <- newLine: case <-ctx.Done(): return } @@ -51,7 +51,7 @@ func (f *readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *b // Filter log lines matching a given regular expression, however with local grep context. func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, - rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re regex.Regex) { var ls ltxState @@ -73,8 +73,8 @@ func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext ls.after = 0 ls.processAfter = ltx.AfterContext > 0 - for lineBytesBuffer := range rawLines { - status := f.filterLineWithLContext(ctx, <x, &ls, rawLines, lines, &re, lineBytesBuffer) + for rawLine := range rawLines { + status := f.filterLineWithLContext(ctx, <x, &ls, rawLines, lines, &re, rawLine) if status == abortReading { return } @@ -83,21 +83,16 @@ func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext // Filter log lines matching a given regular expression, however with local grep context. func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LContext, - ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re *regex.Regex, - lineBytesBuffer *bytes.Buffer) readStatus { + ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re *regex.Regex, + rawLine *bytes.Buffer) readStatus { f.updatePosition() - if !re.Match(lineBytesBuffer.Bytes()) { + if !re.Match(rawLine.Bytes()) { f.updateLineNotMatched() if ls.processAfter && ls.after > 0 { ls.after-- - myLine := line.Line{ - Content: lineBytesBuffer, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: 100, - } + myLine := line.New(rawLine, f.totalLineCount(), 100, f.globID) select { case lines <- myLine: @@ -108,10 +103,10 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo } else if ls.processBefore { // Keep last num BeforeContext raw messages. select { - case ls.beforeBuf <- lineBytesBuffer: + case ls.beforeBuf <- rawLine: default: pool.RecycleBytesBuffer(<-ls.beforeBuf) - ls.beforeBuf <- lineBytesBuffer + ls.beforeBuf <- rawLine } } return continueReading @@ -130,13 +125,8 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo i := uint64(len(ls.beforeBuf)) for { select { - case lineBytesBuffer := <-ls.beforeBuf: - myLine := line.Line{ - Content: lineBytesBuffer, - SourceID: f.globID, - Count: f.totalLineCount() - i, - TransmittedPerc: 100, - } + case rawLine := <-ls.beforeBuf: + myLine := line.New(rawLine, f.totalLineCount()-i, 100, f.globID) i-- select { @@ -153,12 +143,7 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo } } - line := line.Line{ - Content: lineBytesBuffer, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: 100, - } + line := line.New(rawLine, f.totalLineCount(), 100, f.globID) select { case lines <- line: @@ -178,3 +163,10 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo return continueReading } + +/* +func (f *readFile) filterLineWithLContextNoMatch(ctx context.Context, ltx *lcontext.LContext, + ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re *regex.Regex, + rawLine *bytes.Buffer) readStatus { +} +*/ diff --git a/internal/io/line/line.go b/internal/io/line/line.go index d306c88..c168274 100644 --- a/internal/io/line/line.go +++ b/internal/io/line/line.go @@ -3,8 +3,17 @@ package line import ( "bytes" "fmt" + "sync" ) +// lineBuffer is there to optimize memory allocations. DTail otherwise allocates +// a lot of memory while reading logs. +var lineBuffer = sync.Pool{ + New: func() interface{} { + return &Line{} + }, +} + // Line represents a read log line. type Line struct { // The content of the log line. @@ -23,6 +32,22 @@ type Line struct { SourceID string } +func New(content *bytes.Buffer, count uint64, transmittedPerc int, sourceID string) *Line { + l := lineBuffer.Get().(*Line) + l.Content = content + l.Count = count + l.TransmittedPerc = transmittedPerc + l.SourceID = sourceID + return l +} + +// Null returns a new line with all members initialized to their null value. +func Null() *Line { + l := lineBuffer.Get().(*Line) + l.NullValues() + return l +} + // Return a human readable representation of the followed line. func (l Line) String() string { return fmt.Sprintf("Line(Content:%s,TransmittedPerc:%v,Count:%v,SourceID:%s)", @@ -31,3 +56,19 @@ func (l Line) String() string { l.Count, l.SourceID) } + +// Recycle the line. Once done, don't reuse this instance!!! +func (l *Line) Recycle() { + // No explicit reset required, as NewLine overrides all elements + // already takes care of it. + //l.Reset() + lineBuffer.Put(l) +} + +// NullValues nulls all line struct members to their default state. +func (l *Line) NullValues() { + l.Content = nil + l.Count = 0 + l.TransmittedPerc = 0 + l.SourceID = "" +} |
