summaryrefslogtreecommitdiff
path: root/internal/io/fs
diff options
context:
space:
mode:
Diffstat (limited to 'internal/io/fs')
-rw-r--r--internal/io/fs/catfile.go4
-rw-r--r--internal/io/fs/filereader.go6
-rw-r--r--internal/io/fs/filter.go167
-rw-r--r--internal/io/fs/permissions/permission.go4
-rw-r--r--internal/io/fs/permissions/permission_linuxacl.go2
-rw-r--r--internal/io/fs/readfile.go384
-rw-r--r--internal/io/fs/tailfile.go4
-rw-r--r--internal/io/fs/truncate.go61
8 files changed, 310 insertions, 322 deletions
diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go
index 7f387bc..01c15ba 100644
--- a/internal/io/fs/catfile.go
+++ b/internal/io/fs/catfile.go
@@ -6,7 +6,9 @@ type CatFile struct {
}
// NewCatFile returns a new file catter.
-func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile {
+func NewCatFile(filePath string, globID string, serverMessages chan<- string,
+ limiter chan struct{}) CatFile {
+
return CatFile{
readFile: readFile{
filePath: filePath,
diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go
index efd410e..b05fd39 100644
--- a/internal/io/fs/filereader.go
+++ b/internal/io/fs/filereader.go
@@ -8,9 +8,11 @@ import (
"github.com/mimecast/dtail/internal/regex"
)
-// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file.
+// FileReader is the interface used on the dtail server to read/cat/grep/mapr...
+// a file.
type FileReader interface {
- Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error
+ Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line,
+ re regex.Regex) error
FilePath() string
Retry() bool
}
diff --git a/internal/io/fs/filter.go b/internal/io/fs/filter.go
deleted file mode 100644
index c4f605e..0000000
--- a/internal/io/fs/filter.go
+++ /dev/null
@@ -1,167 +0,0 @@
-package fs
-
-import (
- "context"
-
- "github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/lcontext"
- "github.com/mimecast/dtail/internal/regex"
-)
-
-func (f readFile) filter(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) {
- // Do we have any kind of local context settings? If so then run the more complex
- // filterWithLContext method.
- if lContext.Has() {
- // We can not skip transmitting any lines to the client with a local
- // grep context specified.
- f.canSkipLines = false
- f.filterWithLContext(ctx, rawLines, lines, re, lContext)
- return
- }
-
- f.filterWithoutLContext(ctx, rawLines, lines, re)
-}
-
-// Filter log lines matching a given regular expression, however with local grep context.
-func (f readFile) filterWithLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) {
- // Scenario 1: Finish once maxCount hits found
- maxCount := lContext.MaxCount
- processMaxCount := maxCount > 0
- maxReached := false
-
- // Scenario 2: Print prev. N lines when current line matches.
- before := lContext.BeforeContext
- processBefore := before > 0
- var beforeBuf chan []byte
- if processBefore {
- beforeBuf = make(chan []byte, before)
- }
-
- // Screnario 3: Print next N lines when current line matches.
- after := 0
- processAfter := lContext.AfterContext > 0
-
- for rawLine := range rawLines {
- // logger.Debug("rawLine", string(rawLine))
- f.updatePosition()
-
- if !re.Match(rawLine) {
- f.updateLineNotMatched()
-
- if processAfter && after > 0 {
- after--
- myLine := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100}
- select {
- case lines <- myLine:
- case <-ctx.Done():
- return
- }
-
- } else if processBefore {
- // Keep last num BeforeContext raw messages.
- select {
- case beforeBuf <- rawLine:
- default:
- <-beforeBuf
- beforeBuf <- rawLine
- }
- }
- continue
- }
-
- f.updateLineMatched()
-
- if processAfter {
- if maxReached {
- return
- }
- after = lContext.AfterContext
- }
-
- if processBefore {
- i := uint64(len(beforeBuf))
- for {
- select {
- case myRawLine := <-beforeBuf:
- myLine := line.Line{Content: myRawLine, SourceID: f.globID, Count: f.totalLineCount() - i, TransmittedPerc: 100}
- i--
- select {
- case lines <- myLine:
- case <-ctx.Done():
- return
- }
- default:
- // beforeBuf is now empty.
- }
- if len(beforeBuf) == 0 {
- break
- }
- }
- }
-
- line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100}
-
- select {
- case lines <- line:
- if processMaxCount {
- maxCount--
- if maxCount == 0 {
- if !processAfter || after == 0 {
- return
- }
- // Unfortunatley we have to continue filter, as there might be more lines to print
- maxReached = true
- }
- }
- case <-ctx.Done():
- return
- }
- }
-}
-
-// Filter log lines matching a given regular expression, there is no local grep context specified.
-func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
- for {
- select {
- case rawLine, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
-
- if f.lineUntransmittable(rawLine, len(lines), cap(lines), re) {
- continue
- }
-
- line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc()}
-
- select {
- case lines <- line:
- continue
- case <-ctx.Done():
- return
- }
- }
- }
-}
-
-func (f readFile) lineUntransmittable(rawLine []byte, length, capacity int, re regex.Regex) bool {
- if !re.Match(rawLine) {
- f.updateLineNotMatched()
- f.updateLineNotTransmitted()
- // Regex dosn't match, so not interested in it.
- return true
- }
- f.updateLineMatched()
-
- // Can we actually send more messages, channel capacity reached?
- if f.canSkipLines && length >= capacity {
- f.updateLineNotTransmitted()
- // Matching, not transmittable
- return true
- }
- f.updateLineTransmitted()
-
- // Matching, transmittable
- return false
-}
diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go
index cc5dd9b..d621c09 100644
--- a/internal/io/fs/permissions/permission.go
+++ b/internal/io/fs/permissions/permission.go
@@ -3,12 +3,12 @@
package permissions
import (
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// ToRead is to check whether user has read permissions to a given file.
func ToRead(user, filePath string) (bool, error) {
// Only implemented for Linux, always expect true
- logger.Warn(user, filePath, "Not performing ACL check, not supported on this platform")
+ dlog.Common.Debug(user, filePath, "Not performing ACL check as not compiled in")
return true, nil
}
diff --git a/internal/io/fs/permissions/permission_linuxacl.go b/internal/io/fs/permissions/permission_linuxacl.go
index 7d2d7ca..904b90f 100644
--- a/internal/io/fs/permissions/permission_linuxacl.go
+++ b/internal/io/fs/permissions/permission_linuxacl.go
@@ -13,7 +13,7 @@ import (
"unsafe"
)
-// ToRead checks whether user has Linux file system permissions to read a given file.
+// ToRead checks whether user has Linux file system permissions to read a file.
func ToRead(user, filePath string) (bool, error) {
cUser := C.CString(user)
cFilePath := C.CString(filePath)
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 161e3f0..28cbe58 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -2,16 +2,20 @@ package fs
import (
"bufio"
+ "bytes"
"compress/gzip"
"context"
+ "errors"
"fmt"
"io"
"os"
"strings"
+ "sync"
"time"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/lcontext"
"github.com/mimecast/dtail/internal/regex"
@@ -37,31 +41,10 @@ type readFile struct {
limiter chan struct{}
}
-func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
- switch {
- case strings.HasSuffix(f.FilePath(), ".gz"):
- fallthrough
- case strings.HasSuffix(f.FilePath(), ".gzip"):
- logger.Info(f.FilePath(), "Detected gzip compression format")
- var gzipReader *gzip.Reader
- gzipReader, err = gzip.NewReader(fd)
- if err != nil {
- return
- }
- reader = bufio.NewReader(gzipReader)
- case strings.HasSuffix(f.FilePath(), ".zst"):
- logger.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
- default:
- reader = bufio.NewReader(fd)
- }
-
- return
-}
-
// String returns the string representation of the readFile
func (f readFile) String() string {
- return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
+ return fmt.Sprintf(
+ "readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)",
f.filePath,
f.globID,
f.retry,
@@ -80,8 +63,10 @@ func (f readFile) Retry() bool {
}
// Start tailing a log file.
-func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error {
- logger.Debug("readFile", f)
+func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
+ lines chan<- line.Line, re regex.Regex) error {
+
+ dlog.Common.Debug("readFile", f)
defer func() {
select {
case <-f.limiter:
@@ -93,7 +78,8 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c
case f.limiter <- struct{}{}:
default:
select {
- case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."):
+ case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID,
+ "Server limit reached. Queuing file..."):
case <-ctx.Done():
return nil
}
@@ -110,111 +96,335 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c
fd.Seek(0, io.SeekEnd)
}
- rawLines := make(chan []byte, 100)
+ rawLines := make(chan *bytes.Buffer, 100)
+ truncate := make(chan struct{})
+
readCtx, readCancel := context.WithCancel(ctx)
+ var filterWg sync.WaitGroup
+ filterWg.Add(1)
- filterDone := make(chan struct{})
+ go f.periodicTruncateCheck(ctx, truncate)
go func() {
- f.filter(ctx, rawLines, lines, re, lContext)
- close(filterDone)
+ f.filter(ctx, ltx, rawLines, lines, re)
+ filterWg.Done()
// If the filter stopped, make the reader stop too, no need to read
// more data if there is nothing more the filter wants to filter for!
// E.g. it could be that we only want to filter N matches but not more.
readCancel()
}()
- err = f.read(readCtx, fd, rawLines)
+ err = f.read(readCtx, fd, rawLines, truncate)
close(rawLines)
-
- // Filter may flushes some data still. So wait until it is done here.
- <-filterDone
+ // Filter may sends some data still. So wait until it is done here.
+ filterWg.Wait()
return err
}
-func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error {
- var offset uint64
+func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
+ for {
+ select {
+ case <-time.After(time.Second * 3):
+ select {
+ case truncate <- struct{}{}:
+ case <-ctx.Done():
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
+ switch {
+ case strings.HasSuffix(f.FilePath(), ".gz"):
+ fallthrough
+ case strings.HasSuffix(f.FilePath(), ".gzip"):
+ dlog.Common.Info(f.FilePath(), "Detected gzip compression format")
+ var gzipReader *gzip.Reader
+ gzipReader, err = gzip.NewReader(fd)
+ if err != nil {
+ return
+ }
+ reader = bufio.NewReader(gzipReader)
+ case strings.HasSuffix(f.FilePath(), ".zst"):
+ dlog.Common.Info(f.FilePath(), "Detected zstd compression format")
+ reader = bufio.NewReader(zstd.NewReader(fd))
+ default:
+ reader = bufio.NewReader(fd)
+ }
+ return
+}
+
+func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error {
+ var offset uint64
reader, err := f.makeReader(fd)
if err != nil {
return err
}
- rawLine := make([]byte, 0, 512)
lineLengthThreshold := 1024 * 1024 // 1mb
- longLineWarning := false
-
- checkTruncate := f.truncateTimer(ctx)
+ warnedAboutLongLine := false
+ message := pool.BytesBuffer.Get().(*bytes.Buffer)
for {
- select {
- case <-ctx.Done():
- return nil
- default:
- }
-
- select {
- case <-checkTruncate:
- if isTruncated, err := f.truncated(fd); isTruncated {
- return err
- }
- logger.Info(f.filePath, "Current offset", offset)
- default:
- }
-
- // Read some bytes (max 4k at once as of go 1.12). isPrefix will
- // be set if line does not fit into 4k buffer.
- bytes, isPrefix, err := reader.ReadLine()
+ b, err := reader.ReadByte()
if err != nil {
- // If EOF, sleep a couple of ms and return with nil error.
- // If other error, return with non-nil error.
if err != io.EOF {
return err
}
+ select {
+ case <-truncate:
+ if isTruncated, err := f.truncated(fd); isTruncated {
+ return err
+ }
+ case <-ctx.Done():
+ return nil
+ default:
+ }
if !f.seekEOF {
- logger.Debug(f.FilePath(), "End of file reached")
+ dlog.Common.Info(f.FilePath(), "End of file reached")
return nil
}
time.Sleep(time.Millisecond * 100)
continue
}
+ offset++
- rawLine = append(rawLine, bytes...)
- offset += uint64(len(bytes))
-
- if !isPrefix {
- // last LineRead call returned contend until end of line.
- rawLine = append(rawLine, '\n')
+ switch b {
+ case '\n':
select {
- case rawLines <- rawLine:
+ case rawLines <- message:
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message)
+ warnedAboutLongLine = false
case <-ctx.Done():
return nil
}
- rawLine = make([]byte, 0, 512)
- if longLineWarning {
- longLineWarning = false
+ default:
+ if message.Len() >= lineLengthThreshold {
+ if !warnedAboutLongLine {
+ f.serverMessages <- dlog.Common.Warn(f.filePath,
+ "Long log line, splitting into multiple lines")
+ warnedAboutLongLine = true
+ }
+ message.WriteString("\n")
+ select {
+ case rawLines <- message:
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ case <-ctx.Done():
+ return nil
+ }
+ }
+ message.WriteByte(b)
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression.
+func (f readFile) filter(ctx context.Context, ltx lcontext.LContext,
+ rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+
+ // Do we have any kind of local context settings? If so then run the more complex
+ // filterWithLContext method.
+ if ltx.Has() {
+ // We can not skip transmitting any lines to the client with a local
+ // grep context specified.
+ f.canSkipLines = false
+ f.filterWithLContext(ctx, ltx, rawLines, lines, re)
+ return
+ }
+
+ f.filterWithoutLContext(ctx, rawLines, lines, re)
+}
+
+func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer,
+ lines chan<- line.Line, re regex.Regex) {
+
+ for {
+ select {
+ case line, ok := <-rawLines:
+ f.updatePosition()
+ if !ok {
+ return
+ }
+ if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok {
+ select {
+ case lines <- filteredLine:
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression, however with local grep context.
+func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext,
+ rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
+
+ // Scenario 1: Finish once maxCount hits found
+ maxCount := ltx.MaxCount
+ processMaxCount := maxCount > 0
+ maxReached := false
+
+ // Scenario 2: Print prev. N lines when current line matches.
+ before := ltx.BeforeContext
+ processBefore := before > 0
+ var beforeBuf chan *bytes.Buffer
+ if processBefore {
+ beforeBuf = make(chan *bytes.Buffer, before)
+ }
+
+ // Screnario 3: Print next N lines when current line matches.
+ after := 0
+ processAfter := ltx.AfterContext > 0
+
+ for lineBytesBuffer := range rawLines {
+ f.updatePosition()
+
+ if !re.Match(lineBytesBuffer.Bytes()) {
+ f.updateLineNotMatched()
+
+ if processAfter && after > 0 {
+ after--
+ myLine := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: 100,
+ }
+
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+
+ } else if processBefore {
+ // Keep last num BeforeContext raw messages.
+ select {
+ case beforeBuf <- lineBytesBuffer:
+ default:
+ pool.RecycleBytesBuffer(<-beforeBuf)
+ beforeBuf <- lineBytesBuffer
+ }
}
continue
}
- // Last LineRead call could not read content until end of line, buffer
- // was too small. Determine whether we exceed the max line length we
- // want dtail to send to the client at once. Possibly split up log line
- // into multiple log lines.
- if len(rawLine) >= lineLengthThreshold {
- if !longLineWarning {
- f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
- // Only print out one warning per long log line.
- longLineWarning = true
+ f.updateLineMatched()
+
+ if processAfter {
+ if maxReached {
+ return
}
- rawLine = append(rawLine, '\n')
- select {
- case rawLines <- rawLine:
- case <-ctx.Done():
- return nil
+ after = ltx.AfterContext
+ }
+
+ if processBefore {
+ i := uint64(len(beforeBuf))
+ for {
+ select {
+ case lineBytesBuffer := <-beforeBuf:
+ myLine := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount() - i,
+ TransmittedPerc: 100,
+ }
+ i--
+
+ select {
+ case lines <- myLine:
+ case <-ctx.Done():
+ return
+ }
+ default:
+ // beforeBuf is now empty.
+ }
+ if len(beforeBuf) == 0 {
+ break
+ }
}
- rawLine = make([]byte, 0, 512)
}
+
+ line := line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: 100,
+ }
+
+ select {
+ case lines <- line:
+ if processMaxCount {
+ maxCount--
+ if maxCount == 0 {
+ if !processAfter || after == 0 {
+ return
+ }
+ // Unfortunatley we have to continue filter, as there might be more lines to print
+ maxReached = true
+ }
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int,
+ re regex.Regex) (line.Line, bool) {
+
+ var read line.Line
+ if !re.Match(lineBytesBuffer.Bytes()) {
+ f.updateLineNotMatched()
+ f.updateLineNotTransmitted()
+ return read, false
+ }
+ f.updateLineMatched()
+
+ // Can we actually send more messages, channel capacity reached?
+ if f.canSkipLines && length >= capacity {
+ f.updateLineNotTransmitted()
+ return read, false
+ }
+ f.updateLineTransmitted()
+
+ read = line.Line{
+ Content: lineBytesBuffer,
+ SourceID: f.globID,
+ Count: f.totalLineCount(),
+ TransmittedPerc: f.transmittedPerc(),
+ }
+ return read, true
+}
+
+// Check wether log file is truncated. Returns nil if not.
+func (f readFile) truncated(fd *os.File) (bool, error) {
+ dlog.Common.Debug(f.filePath, "File truncation check")
+
+ // Can not seek currently open FD.
+ curPos, err := fd.Seek(0, os.SEEK_CUR)
+ if err != nil {
+ return true, err
+ }
+ // Can not open file at original path.
+ pathFd, err := os.Open(f.filePath)
+ if err != nil {
+ return true, err
+ }
+ defer pathFd.Close()
+
+ // Can not seek file at original path.
+ pathPos, err := pathFd.Seek(0, io.SeekEnd)
+ if err != nil {
+ return true, err
+ }
+ if curPos > pathPos {
+ return true, errors.New("File got truncated")
}
+ return false, nil
}
diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go
index 14994e5..b03b45d 100644
--- a/internal/io/fs/tailfile.go
+++ b/internal/io/fs/tailfile.go
@@ -6,7 +6,9 @@ type TailFile struct {
}
// NewTailFile returns a new file tailer.
-func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile {
+func NewTailFile(filePath string, globID string, serverMessages chan<- string,
+ limiter chan struct{}) TailFile {
+
return TailFile{
readFile: readFile{
filePath: filePath,
diff --git a/internal/io/fs/truncate.go b/internal/io/fs/truncate.go
deleted file mode 100644
index a8d59ac..0000000
--- a/internal/io/fs/truncate.go
+++ /dev/null
@@ -1,61 +0,0 @@
-package fs
-
-import (
- "context"
- "errors"
- "io"
- "os"
- "time"
-
- "github.com/mimecast/dtail/internal/io/logger"
-)
-
-func (f readFile) truncateTimer(ctx context.Context) (checkTruncate chan struct{}) {
- checkTruncate = make(chan struct{})
-
- go func() {
- for {
- select {
- case <-time.After(time.Second * 3):
- select {
- case checkTruncate <- struct{}{}:
- case <-ctx.Done():
- }
- case <-ctx.Done():
- return
- }
- }
- }()
-
- return
-}
-
-// Check wether log file is truncated. Returns nil if not.
-func (f readFile) truncated(fd *os.File) (bool, error) {
- logger.Debug(f.filePath, "File truncation check")
-
- // Can not seek currently open FD.
- curPos, err := fd.Seek(0, os.SEEK_CUR)
- if err != nil {
- return true, err
- }
-
- // Can not open file at original path.
- pathFd, err := os.Open(f.filePath)
- if err != nil {
- return true, err
- }
- defer pathFd.Close()
-
- // Can not seek file at original path.
- pathPos, err := pathFd.Seek(0, io.SeekEnd)
- if err != nil {
- return true, err
- }
-
- if curPos > pathPos {
- return true, errors.New("File got truncated")
- }
-
- return false, nil
-}