From 6d727b9bdbc387c8a5c34406a2c4de9140face38 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 28 Aug 2021 19:36:46 +0100 Subject: use a byte.Buffer in the file reader --- internal/io/fs/readfile.go | 32 ++++++++++++++++++------------- internal/io/line/line.go | 5 +++-- internal/io/pool/bytesbuffer.go | 19 ++++++++++++++++++ internal/mapr/server/aggregate.go | 4 +++- internal/server/handlers/serverhandler.go | 9 +++++---- 5 files changed, 49 insertions(+), 20 deletions(-) create mode 100644 internal/io/pool/bytesbuffer.go diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 8a365a1..e44f30e 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -2,6 +2,7 @@ package fs import ( "bufio" + "bytes" "compress/gzip" "context" "errors" @@ -14,6 +15,7 @@ import ( "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/protocol" "github.com/mimecast/dtail/internal/regex" @@ -90,7 +92,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re fd.Seek(0, io.SeekEnd) } - rawLines := make(chan []byte, 100) + rawLines := make(chan *bytes.Buffer, 100) truncate := make(chan struct{}) var wg sync.WaitGroup @@ -142,7 +144,7 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { return } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { +func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { var offset uint64 reader, err := f.makeReader(fd) @@ -152,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t lineLengthThreshold := 1024 * 1024 // 1mb warnedAboutLongLine := false - message := make([]byte, 0, 512) + message := pool.BytesBuffer.Get().(*bytes.Buffer) for { select { @@ -182,37 +184,41 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t switch b { case '\n': - if len(message) == 0 { + if message.Len() == 0 { time.Sleep(time.Millisecond * 100) continue } + message.WriteByte(protocol.MessageDelimiter) select { - case rawLines <- append(message, protocol.MessageDelimiter): - message = make([]byte, 0, 512) + case rawLines <- message: + message = pool.BytesBuffer.Get().(*bytes.Buffer) + //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message) warnedAboutLongLine = false case <-ctx.Done(): return nil } default: - if len(message) >= lineLengthThreshold { + if message.Len() >= lineLengthThreshold { if !warnedAboutLongLine { f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") warnedAboutLongLine = true } + message.WriteByte(protocol.MessageDelimiter) select { + case rawLines <- message: + message = pool.BytesBuffer.Get().(*bytes.Buffer) + //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message) case <-ctx.Done(): return nil - case rawLines <- append(message, protocol.MessageDelimiter): - message = make([]byte, 0, 512) } } - message = append(message, b) + message.WriteByte(b) } } } // Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { +func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { defer wg.Done() for { @@ -233,10 +239,10 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-cha } } -func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) { +func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) { var read line.Line - if !re.Match(lineBytes) { + if !re.Match(lineBytes.Bytes()) { f.updateLineNotMatched() f.updateLineNotTransmitted() return read, false diff --git a/internal/io/line/line.go b/internal/io/line/line.go index 715be34..d306c88 100644 --- a/internal/io/line/line.go +++ b/internal/io/line/line.go @@ -1,13 +1,14 @@ package line import ( + "bytes" "fmt" ) // Line represents a read log line. type Line struct { // The content of the log line. - Content []byte + Content *bytes.Buffer // Until now, how many log lines were processed? Count uint64 // Sometimes we produce too many log lines so that the client @@ -25,7 +26,7 @@ type Line struct { // Return a human readable representation of the followed line. func (l Line) String() string { return fmt.Sprintf("Line(Content:%s,TransmittedPerc:%v,Count:%v,SourceID:%s)", - string(l.Content), + l.Content.String(), l.TransmittedPerc, l.Count, l.SourceID) diff --git a/internal/io/pool/bytesbuffer.go b/internal/io/pool/bytesbuffer.go new file mode 100644 index 0000000..0a159f5 --- /dev/null +++ b/internal/io/pool/bytesbuffer.go @@ -0,0 +1,19 @@ +package pool + +import ( + "bytes" + "sync" +) + +var BytesBuffer = sync.Pool{ + New: func() interface{} { + b := bytes.Buffer{} + b.Grow(128) + return &b + }, +} + +func RecycleBytesBuffer(b *bytes.Buffer) { + b.Reset() + BytesBuffer.Put(b) +} diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 28bb074..9106f52 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -10,6 +10,7 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/mapr/logformat" ) @@ -136,7 +137,8 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { return } - maprLine := strings.TrimSpace(string(line.Content)) + maprLine := strings.TrimSpace(line.Content.String()) + pool.RecycleBytesBuffer(line.Content) fields, err := a.parser.MakeFields(maprLine) logger.Debug(fields, err) diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 9541a34..62f3c2b 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -15,6 +15,7 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/protocol" @@ -114,10 +115,10 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { case line := <-h.lines: //fmt.Printf("\t<<<%d,%s>>>\n", len(line.Content), line.Content) // Send normal file content data as a message. - serverInfo := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|", - h.hostname, line.TransmittedPerc, line.Count, line.SourceID)) - wholePayload := append(serverInfo, line.Content[:]...) - n = copy(p, wholePayload) + payload := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|%s", + h.hostname, line.TransmittedPerc, line.Count, line.SourceID, line.Content.String())) + n = copy(p, payload) + pool.RecycleBytesBuffer(line.Content) return case <-time.After(time.Second): -- cgit v1.2.3