summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-08-28 19:36:46 +0100
committerPaul Buetow <paul@buetow.org>2021-08-28 19:36:46 +0100
commit6d727b9bdbc387c8a5c34406a2c4de9140face38 (patch)
treeb6638220853374536db3d32e862961e4dbaa820a
parent9883a190109623b64e6d311dc2b462a6eae68003 (diff)
use a byte.Buffer in the file reader
-rw-r--r--internal/io/fs/readfile.go32
-rw-r--r--internal/io/line/line.go5
-rw-r--r--internal/io/pool/bytesbuffer.go19
-rw-r--r--internal/mapr/server/aggregate.go4
-rw-r--r--internal/server/handlers/serverhandler.go9
5 files changed, 49 insertions, 20 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 8a365a1..e44f30e 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -2,6 +2,7 @@ package fs
import (
"bufio"
+ "bytes"
"compress/gzip"
"context"
"errors"
@@ -14,6 +15,7 @@ import (
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/protocol"
"github.com/mimecast/dtail/internal/regex"
@@ -90,7 +92,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re
fd.Seek(0, io.SeekEnd)
}
- rawLines := make(chan []byte, 100)
+ rawLines := make(chan *bytes.Buffer, 100)
truncate := make(chan struct{})
var wg sync.WaitGroup
@@ -142,7 +144,7 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
return
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error {
var offset uint64
reader, err := f.makeReader(fd)
@@ -152,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
lineLengthThreshold := 1024 * 1024 // 1mb
warnedAboutLongLine := false
- message := make([]byte, 0, 512)
+ message := pool.BytesBuffer.Get().(*bytes.Buffer)
for {
select {
@@ -182,37 +184,41 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
switch b {
case '\n':
- if len(message) == 0 {
+ if message.Len() == 0 {
time.Sleep(time.Millisecond * 100)
continue
}
+ message.WriteByte(protocol.MessageDelimiter)
select {
- case rawLines <- append(message, protocol.MessageDelimiter):
- message = make([]byte, 0, 512)
+ case rawLines <- message:
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message)
warnedAboutLongLine = false
case <-ctx.Done():
return nil
}
default:
- if len(message) >= lineLengthThreshold {
+ if message.Len() >= lineLengthThreshold {
if !warnedAboutLongLine {
f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
warnedAboutLongLine = true
}
+ message.WriteByte(protocol.MessageDelimiter)
select {
+ case rawLines <- message:
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message)
case <-ctx.Done():
return nil
- case rawLines <- append(message, protocol.MessageDelimiter):
- message = make([]byte, 0, 512)
}
}
- message = append(message, b)
+ message.WriteByte(b)
}
}
}
// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
+func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
defer wg.Done()
for {
@@ -233,10 +239,10 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-cha
}
}
-func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) {
+func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) {
var read line.Line
- if !re.Match(lineBytes) {
+ if !re.Match(lineBytes.Bytes()) {
f.updateLineNotMatched()
f.updateLineNotTransmitted()
return read, false
diff --git a/internal/io/line/line.go b/internal/io/line/line.go
index 715be34..d306c88 100644
--- a/internal/io/line/line.go
+++ b/internal/io/line/line.go
@@ -1,13 +1,14 @@
package line
import (
+ "bytes"
"fmt"
)
// Line represents a read log line.
type Line struct {
// The content of the log line.
- Content []byte
+ Content *bytes.Buffer
// Until now, how many log lines were processed?
Count uint64
// Sometimes we produce too many log lines so that the client
@@ -25,7 +26,7 @@ type Line struct {
// Return a human readable representation of the followed line.
func (l Line) String() string {
return fmt.Sprintf("Line(Content:%s,TransmittedPerc:%v,Count:%v,SourceID:%s)",
- string(l.Content),
+ l.Content.String(),
l.TransmittedPerc,
l.Count,
l.SourceID)
diff --git a/internal/io/pool/bytesbuffer.go b/internal/io/pool/bytesbuffer.go
new file mode 100644
index 0000000..0a159f5
--- /dev/null
+++ b/internal/io/pool/bytesbuffer.go
@@ -0,0 +1,19 @@
+package pool
+
+import (
+ "bytes"
+ "sync"
+)
+
+var BytesBuffer = sync.Pool{
+ New: func() interface{} {
+ b := bytes.Buffer{}
+ b.Grow(128)
+ return &b
+ },
+}
+
+func RecycleBytesBuffer(b *bytes.Buffer) {
+ b.Reset()
+ BytesBuffer.Put(b)
+}
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index 28bb074..9106f52 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -10,6 +10,7 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
)
@@ -136,7 +137,8 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
return
}
- maprLine := strings.TrimSpace(string(line.Content))
+ maprLine := strings.TrimSpace(line.Content.String())
+ pool.RecycleBytesBuffer(line.Content)
fields, err := a.parser.MakeFields(maprLine)
logger.Debug(fields, err)
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 9541a34..62f3c2b 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -15,6 +15,7 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/protocol"
@@ -114,10 +115,10 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) {
case line := <-h.lines:
//fmt.Printf("\t<<<%d,%s>>>\n", len(line.Content), line.Content)
// Send normal file content data as a message.
- serverInfo := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|",
- h.hostname, line.TransmittedPerc, line.Count, line.SourceID))
- wholePayload := append(serverInfo, line.Content[:]...)
- n = copy(p, wholePayload)
+ payload := []byte(fmt.Sprintf("REMOTE|%s|%3d|%v|%s|%s",
+ h.hostname, line.TransmittedPerc, line.Count, line.SourceID, line.Content.String()))
+ n = copy(p, payload)
+ pool.RecycleBytesBuffer(line.Content)
return
case <-time.After(time.Second):