diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-12-05 10:37:20 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-12-05 10:37:20 +0000 |
| commit | 7ec5c5b144866c392e3676778041a2ae6aa9d360 (patch) | |
| tree | 55538e624912e24b94d6faa0ae5f0060b824d1bf /internal | |
| parent | 6c12fc4b33049111ad6ddc3f62bf979f843fad73 (diff) | |
buffer line.Line for performance
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/io/fs/filereader.go | 2 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 24 | ||||
| -rw-r--r-- | internal/io/fs/readfilelcontext.go | 54 | ||||
| -rw-r--r-- | internal/io/line/line.go | 41 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 18 | ||||
| -rw-r--r-- | internal/server/handlers/basehandler.go | 3 | ||||
| -rw-r--r-- | internal/server/handlers/healthhandler.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 2 |
9 files changed, 87 insertions, 61 deletions
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index b05fd39..e27d2a7 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -11,7 +11,7 @@ import ( // FileReader is the interface used on the dtail server to read/cat/grep/mapr... // a file. type FileReader interface { - Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line, + Start(ctx context.Context, ltx lcontext.LContext, lines chan<- *line.Line, re regex.Regex) error FilePath() string Retry() bool diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 008111d..f37b07d 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -73,7 +73,7 @@ func (f readFile) Retry() bool { // Start tailing a log file. func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, - lines chan<- line.Line, re regex.Regex) error { + lines chan<- *line.Line, re regex.Regex) error { reader, fd, err := f.makeReader() if fd != nil { @@ -203,7 +203,7 @@ func (f *readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader, // Filter log lines matching a given regular expression. func (f *readFile) filter(ctx context.Context, ltx lcontext.LContext, - rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re regex.Regex) { // Do we have any kind of local context settings? If so then run the more complex // filterWithLContext method. @@ -218,31 +218,25 @@ func (f *readFile) filter(ctx context.Context, ltx lcontext.LContext, f.filterWithoutLContext(ctx, rawLines, lines, re) } -func (f *readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int, - re regex.Regex) (line.Line, bool) { +func (f *readFile) transmittable(rawLine *bytes.Buffer, length, capacity int, + re regex.Regex) (*line.Line, bool) { - var read line.Line - if !re.Match(lineBytesBuffer.Bytes()) { + newLine := line.Null() + if !re.Match(rawLine.Bytes()) { f.updateLineNotMatched() f.updateLineNotTransmitted() - return read, false + return newLine, false } f.updateLineMatched() // Can we actually send more messages, channel capacity reached? if f.canSkipLines && length >= capacity { f.updateLineNotTransmitted() - return read, false + return newLine, false } f.updateLineTransmitted() - read = line.Line{ - Content: lineBytesBuffer, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: f.transmittedPerc(), - } - return read, true + return line.New(rawLine, f.totalLineCount(), f.transmittedPerc(), f.globID), true } // Check wether log file is truncated. Returns nil if not. diff --git a/internal/io/fs/readfilelcontext.go b/internal/io/fs/readfilelcontext.go index d6212f8..0d41a07 100644 --- a/internal/io/fs/readfilelcontext.go +++ b/internal/io/fs/readfilelcontext.go @@ -29,18 +29,18 @@ type ltxState struct { // We don't have any local grep context, which makes life much simpler and more efficient. func (f *readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer, - lines chan<- line.Line, re regex.Regex) { + lines chan<- *line.Line, re regex.Regex) { for { select { - case line, ok := <-rawLines: + case rawLine, ok := <-rawLines: f.updatePosition() if !ok { return } - if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { + if newLine, ok := f.transmittable(rawLine, len(lines), cap(lines), re); ok { select { - case lines <- filteredLine: + case lines <- newLine: case <-ctx.Done(): return } @@ -51,7 +51,7 @@ func (f *readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *b // Filter log lines matching a given regular expression, however with local grep context. func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, - rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re regex.Regex) { var ls ltxState @@ -73,8 +73,8 @@ func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext ls.after = 0 ls.processAfter = ltx.AfterContext > 0 - for lineBytesBuffer := range rawLines { - status := f.filterLineWithLContext(ctx, <x, &ls, rawLines, lines, &re, lineBytesBuffer) + for rawLine := range rawLines { + status := f.filterLineWithLContext(ctx, <x, &ls, rawLines, lines, &re, rawLine) if status == abortReading { return } @@ -83,21 +83,16 @@ func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext // Filter log lines matching a given regular expression, however with local grep context. func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LContext, - ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re *regex.Regex, - lineBytesBuffer *bytes.Buffer) readStatus { + ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- *line.Line, re *regex.Regex, + rawLine *bytes.Buffer) readStatus { f.updatePosition() - if !re.Match(lineBytesBuffer.Bytes()) { + if !re.Match(rawLine.Bytes()) { f.updateLineNotMatched() if ls.processAfter && ls.after > 0 { ls.after-- - myLine := line.Line{ - Content: lineBytesBuffer, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: 100, - } + myLine := line.New(rawLine, f.totalLineCount(), 100, f.globID) select { case lines <- myLine: @@ -108,10 +103,10 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo } else if ls.processBefore { // Keep last num BeforeContext raw messages. select { - case ls.beforeBuf <- lineBytesBuffer: + case ls.beforeBuf <- rawLine: default: pool.RecycleBytesBuffer(<-ls.beforeBuf) - ls.beforeBuf <- lineBytesBuffer + ls.beforeBuf <- rawLine } } return continueReading @@ -130,13 +125,8 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo i := uint64(len(ls.beforeBuf)) for { select { - case lineBytesBuffer := <-ls.beforeBuf: - myLine := line.Line{ - Content: lineBytesBuffer, - SourceID: f.globID, - Count: f.totalLineCount() - i, - TransmittedPerc: 100, - } + case rawLine := <-ls.beforeBuf: + myLine := line.New(rawLine, f.totalLineCount()-i, 100, f.globID) i-- select { @@ -153,12 +143,7 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo } } - line := line.Line{ - Content: lineBytesBuffer, - SourceID: f.globID, - Count: f.totalLineCount(), - TransmittedPerc: 100, - } + line := line.New(rawLine, f.totalLineCount(), 100, f.globID) select { case lines <- line: @@ -178,3 +163,10 @@ func (f *readFile) filterLineWithLContext(ctx context.Context, ltx *lcontext.LCo return continueReading } + +/* +func (f *readFile) filterLineWithLContextNoMatch(ctx context.Context, ltx *lcontext.LContext, + ls *ltxState, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re *regex.Regex, + rawLine *bytes.Buffer) readStatus { +} +*/ diff --git a/internal/io/line/line.go b/internal/io/line/line.go index d306c88..c168274 100644 --- a/internal/io/line/line.go +++ b/internal/io/line/line.go @@ -3,8 +3,17 @@ package line import ( "bytes" "fmt" + "sync" ) +// lineBuffer is there to optimize memory allocations. DTail otherwise allocates +// a lot of memory while reading logs. +var lineBuffer = sync.Pool{ + New: func() interface{} { + return &Line{} + }, +} + // Line represents a read log line. type Line struct { // The content of the log line. @@ -23,6 +32,22 @@ type Line struct { SourceID string } +func New(content *bytes.Buffer, count uint64, transmittedPerc int, sourceID string) *Line { + l := lineBuffer.Get().(*Line) + l.Content = content + l.Count = count + l.TransmittedPerc = transmittedPerc + l.SourceID = sourceID + return l +} + +// Null returns a new line with all members initialized to their null value. +func Null() *Line { + l := lineBuffer.Get().(*Line) + l.NullValues() + return l +} + // Return a human readable representation of the followed line. func (l Line) String() string { return fmt.Sprintf("Line(Content:%s,TransmittedPerc:%v,Count:%v,SourceID:%s)", @@ -31,3 +56,19 @@ func (l Line) String() string { l.Count, l.SourceID) } + +// Recycle the line. Once done, don't reuse this instance!!! +func (l *Line) Recycle() { + // No explicit reset required, as NewLine overrides all elements + // already takes care of it. + //l.Reset() + lineBuffer.Put(l) +} + +// NullValues nulls all line struct members to their default state. +func (l *Line) NullValues() { + l.Content = nil + l.Count = 0 + l.TransmittedPerc = 0 + l.SourceID = "" +} diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index cb0da2b..9e3f68e 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -18,8 +18,8 @@ import ( type Aggregate struct { done *internal.Done // NextLinesCh can be used to use a new line ch. - NextLinesCh chan chan line.Line - linesCh chan line.Line + NextLinesCh chan chan *line.Line + linesCh chan *line.Line // Hostname of the current server (used to populate $hostname field). hostname string // Signals to serialize data. @@ -65,7 +65,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { return &Aggregate{ done: internal.NewDone(), - NextLinesCh: make(chan chan line.Line, 10), + NextLinesCh: make(chan chan *line.Line, 100), serialize: make(chan struct{}), hostname: s[0], query: query, @@ -113,9 +113,9 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { } } -func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) { +func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) { + dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels) - dlog.Server.Trace("nextLine", "entry", line, ok, noMoreChannels) select { case line, ok = <-a.linesCh: if !ok { @@ -137,8 +137,7 @@ func (a *Aggregate) nextLine() (line line.Line, ok bool, noMoreChannels bool) { // No new lines channel found. } } - dlog.Server.Trace("nextLine", "exit", line, ok, noMoreChannels) - + dlog.Server.Trace("nextLine.exit", line, ok, noMoreChannels) return } @@ -169,13 +168,12 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin break } time.Sleep(time.Millisecond * 100) + continue } maprLine := strings.TrimSpace(line.Content.String()) + line.Recycle() // after this, don't use line object anymore!!! fields, err := a.parser.MakeFields(maprLine) - // Can't recycle it here yet, as field slices are still - // MAYBETODO: Add capability to recycle this bytes buffer. - //pool.RecycleBytesBuffer(line.Content) if err != nil { // Should fields be ignored anyway? diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index f068944..4fd718e 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -29,7 +29,7 @@ type handleCommandCb func(context.Context, lcontext.LContext, int, []string, str type baseHandler struct { done *internal.Done handleCommandCb handleCommandCb - lines chan line.Line + lines chan *line.Line aggregate *server.Aggregate maprMessages chan string serverMessages chan string @@ -112,6 +112,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { h.readBuf.WriteByte(protocol.MessageDelimiter) n = copy(p, h.readBuf.Bytes()) pool.RecycleBytesBuffer(line.Content) + line.Recycle() case <-time.After(time.Second): select { diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go index e7f7762..362fe24 100644 --- a/internal/server/handlers/healthhandler.go +++ b/internal/server/handlers/healthhandler.go @@ -23,7 +23,7 @@ func NewHealthHandler(user *user.User) *HealthHandler { h := HealthHandler{ baseHandler: baseHandler{ done: internal.NewDone(), - lines: make(chan line.Line, 100), + lines: make(chan *line.Line, 100), serverMessages: make(chan string, 10), maprMessages: make(chan string, 10), ackCloseReceived: make(chan struct{}), diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index e425463..85f5b2d 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -165,7 +165,7 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, for { if aggregate != nil { - lines = make(chan line.Line, 100) + lines = make(chan *line.Line, 100) aggregate.NextLinesCh <- lines } if err := reader.Start(ctx, ltx, lines, re); err != nil { diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 75a8acc..bc22c88 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -31,7 +31,7 @@ func NewServerHandler(user *user.User, catLimiter, h := ServerHandler{ baseHandler: baseHandler{ done: internal.NewDone(), - lines: make(chan line.Line, 100), + lines: make(chan *line.Line, 100), serverMessages: make(chan string, 10), maprMessages: make(chan string, 10), ackCloseReceived: make(chan struct{}), |
