summaryrefslogtreecommitdiff
path: root/internal/io/fs
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-08-28 19:36:46 +0100
committerPaul Buetow <paul@buetow.org>2021-08-28 19:36:46 +0100
commit6d727b9bdbc387c8a5c34406a2c4de9140face38 (patch)
treeb6638220853374536db3d32e862961e4dbaa820a /internal/io/fs
parent9883a190109623b64e6d311dc2b462a6eae68003 (diff)
use a byte.Buffer in the file reader
Diffstat (limited to 'internal/io/fs')
-rw-r--r--internal/io/fs/readfile.go32
1 files changed, 19 insertions, 13 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 8a365a1..e44f30e 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -2,6 +2,7 @@ package fs
import (
"bufio"
+ "bytes"
"compress/gzip"
"context"
"errors"
@@ -14,6 +15,7 @@ import (
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/protocol"
"github.com/mimecast/dtail/internal/regex"
@@ -90,7 +92,7 @@ func (f readFile) Start(ctx context.Context, lines chan<- line.Line, re regex.Re
fd.Seek(0, io.SeekEnd)
}
- rawLines := make(chan []byte, 100)
+ rawLines := make(chan *bytes.Buffer, 100)
truncate := make(chan struct{})
var wg sync.WaitGroup
@@ -142,7 +144,7 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
return
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error {
var offset uint64
reader, err := f.makeReader(fd)
@@ -152,7 +154,7 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
lineLengthThreshold := 1024 * 1024 // 1mb
warnedAboutLongLine := false
- message := make([]byte, 0, 512)
+ message := pool.BytesBuffer.Get().(*bytes.Buffer)
for {
select {
@@ -182,37 +184,41 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
switch b {
case '\n':
- if len(message) == 0 {
+ if message.Len() == 0 {
time.Sleep(time.Millisecond * 100)
continue
}
+ message.WriteByte(protocol.MessageDelimiter)
select {
- case rawLines <- append(message, protocol.MessageDelimiter):
- message = make([]byte, 0, 512)
+ case rawLines <- message:
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message)
warnedAboutLongLine = false
case <-ctx.Done():
return nil
}
default:
- if len(message) >= lineLengthThreshold {
+ if message.Len() >= lineLengthThreshold {
if !warnedAboutLongLine {
f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
warnedAboutLongLine = true
}
+ message.WriteByte(protocol.MessageDelimiter)
select {
+ case rawLines <- message:
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message)
case <-ctx.Done():
return nil
- case rawLines <- append(message, protocol.MessageDelimiter):
- message = make([]byte, 0, 512)
}
}
- message = append(message, b)
+ message.WriteByte(b)
}
}
}
// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
+func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
defer wg.Done()
for {
@@ -233,10 +239,10 @@ func (f readFile) filter(ctx context.Context, wg *sync.WaitGroup, rawLines <-cha
}
}
-func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) {
+func (f readFile) transmittable(lineBytes *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) {
var read line.Line
- if !re.Match(lineBytes) {
+ if !re.Match(lineBytes.Bytes()) {
f.updateLineNotMatched()
f.updateLineNotTransmitted()
return read, false