diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
| commit | f4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch) | |
| tree | ea5e4a2d2a67035f645bdee496ae55a52034178a /internal/io/fs | |
| parent | d80d6070557e3a800e3a54967af9eced518f116b (diff) | |
| parent | 739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff) | |
merge develop
Diffstat (limited to 'internal/io/fs')
| -rw-r--r-- | internal/io/fs/catfile.go | 4 | ||||
| -rw-r--r-- | internal/io/fs/filereader.go | 6 | ||||
| -rw-r--r-- | internal/io/fs/filter.go | 167 | ||||
| -rw-r--r-- | internal/io/fs/permissions/permission.go | 4 | ||||
| -rw-r--r-- | internal/io/fs/permissions/permission_linuxacl.go | 2 | ||||
| -rw-r--r-- | internal/io/fs/readfile.go | 384 | ||||
| -rw-r--r-- | internal/io/fs/tailfile.go | 4 | ||||
| -rw-r--r-- | internal/io/fs/truncate.go | 61 |
8 files changed, 310 insertions, 322 deletions
diff --git a/internal/io/fs/catfile.go b/internal/io/fs/catfile.go index 7f387bc..01c15ba 100644 --- a/internal/io/fs/catfile.go +++ b/internal/io/fs/catfile.go @@ -6,7 +6,9 @@ type CatFile struct { } // NewCatFile returns a new file catter. -func NewCatFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) CatFile { +func NewCatFile(filePath string, globID string, serverMessages chan<- string, + limiter chan struct{}) CatFile { + return CatFile{ readFile: readFile{ filePath: filePath, diff --git a/internal/io/fs/filereader.go b/internal/io/fs/filereader.go index efd410e..b05fd39 100644 --- a/internal/io/fs/filereader.go +++ b/internal/io/fs/filereader.go @@ -8,9 +8,11 @@ import ( "github.com/mimecast/dtail/internal/regex" ) -// FileReader is the interface used on the dtail server to read/cat/grep/mapr... a file. +// FileReader is the interface used on the dtail server to read/cat/grep/mapr... +// a file. type FileReader interface { - Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error + Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line, + re regex.Regex) error FilePath() string Retry() bool } diff --git a/internal/io/fs/filter.go b/internal/io/fs/filter.go deleted file mode 100644 index c4f605e..0000000 --- a/internal/io/fs/filter.go +++ /dev/null @@ -1,167 +0,0 @@ -package fs - -import ( - "context" - - "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/lcontext" - "github.com/mimecast/dtail/internal/regex" -) - -func (f readFile) filter(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) { - // Do we have any kind of local context settings? If so then run the more complex - // filterWithLContext method. - if lContext.Has() { - // We can not skip transmitting any lines to the client with a local - // grep context specified. - f.canSkipLines = false - f.filterWithLContext(ctx, rawLines, lines, re, lContext) - return - } - - f.filterWithoutLContext(ctx, rawLines, lines, re) -} - -// Filter log lines matching a given regular expression, however with local grep context. -func (f readFile) filterWithLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex, lContext lcontext.LContext) { - // Scenario 1: Finish once maxCount hits found - maxCount := lContext.MaxCount - processMaxCount := maxCount > 0 - maxReached := false - - // Scenario 2: Print prev. N lines when current line matches. - before := lContext.BeforeContext - processBefore := before > 0 - var beforeBuf chan []byte - if processBefore { - beforeBuf = make(chan []byte, before) - } - - // Screnario 3: Print next N lines when current line matches. - after := 0 - processAfter := lContext.AfterContext > 0 - - for rawLine := range rawLines { - // logger.Debug("rawLine", string(rawLine)) - f.updatePosition() - - if !re.Match(rawLine) { - f.updateLineNotMatched() - - if processAfter && after > 0 { - after-- - myLine := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100} - select { - case lines <- myLine: - case <-ctx.Done(): - return - } - - } else if processBefore { - // Keep last num BeforeContext raw messages. - select { - case beforeBuf <- rawLine: - default: - <-beforeBuf - beforeBuf <- rawLine - } - } - continue - } - - f.updateLineMatched() - - if processAfter { - if maxReached { - return - } - after = lContext.AfterContext - } - - if processBefore { - i := uint64(len(beforeBuf)) - for { - select { - case myRawLine := <-beforeBuf: - myLine := line.Line{Content: myRawLine, SourceID: f.globID, Count: f.totalLineCount() - i, TransmittedPerc: 100} - i-- - select { - case lines <- myLine: - case <-ctx.Done(): - return - } - default: - // beforeBuf is now empty. - } - if len(beforeBuf) == 0 { - break - } - } - } - - line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: 100} - - select { - case lines <- line: - if processMaxCount { - maxCount-- - if maxCount == 0 { - if !processAfter || after == 0 { - return - } - // Unfortunatley we have to continue filter, as there might be more lines to print - maxReached = true - } - } - case <-ctx.Done(): - return - } - } -} - -// Filter log lines matching a given regular expression, there is no local grep context specified. -func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { - for { - select { - case rawLine, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - - if f.lineUntransmittable(rawLine, len(lines), cap(lines), re) { - continue - } - - line := line.Line{Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc()} - - select { - case lines <- line: - continue - case <-ctx.Done(): - return - } - } - } -} - -func (f readFile) lineUntransmittable(rawLine []byte, length, capacity int, re regex.Regex) bool { - if !re.Match(rawLine) { - f.updateLineNotMatched() - f.updateLineNotTransmitted() - // Regex dosn't match, so not interested in it. - return true - } - f.updateLineMatched() - - // Can we actually send more messages, channel capacity reached? - if f.canSkipLines && length >= capacity { - f.updateLineNotTransmitted() - // Matching, not transmittable - return true - } - f.updateLineTransmitted() - - // Matching, transmittable - return false -} diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go index cc5dd9b..d621c09 100644 --- a/internal/io/fs/permissions/permission.go +++ b/internal/io/fs/permissions/permission.go @@ -3,12 +3,12 @@ package permissions import ( - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/dlog" ) // ToRead is to check whether user has read permissions to a given file. func ToRead(user, filePath string) (bool, error) { // Only implemented for Linux, always expect true - logger.Warn(user, filePath, "Not performing ACL check, not supported on this platform") + dlog.Common.Debug(user, filePath, "Not performing ACL check as not compiled in") return true, nil } diff --git a/internal/io/fs/permissions/permission_linuxacl.go b/internal/io/fs/permissions/permission_linuxacl.go index 7d2d7ca..904b90f 100644 --- a/internal/io/fs/permissions/permission_linuxacl.go +++ b/internal/io/fs/permissions/permission_linuxacl.go @@ -13,7 +13,7 @@ import ( "unsafe" ) -// ToRead checks whether user has Linux file system permissions to read a given file. +// ToRead checks whether user has Linux file system permissions to read a file. func ToRead(user, filePath string) (bool, error) { cUser := C.CString(user) cFilePath := C.CString(filePath) diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 161e3f0..28cbe58 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -2,16 +2,20 @@ package fs import ( "bufio" + "bytes" "compress/gzip" "context" + "errors" "fmt" "io" "os" "strings" + "sync" "time" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/io/pool" "github.com/mimecast/dtail/internal/lcontext" "github.com/mimecast/dtail/internal/regex" @@ -37,31 +41,10 @@ type readFile struct { limiter chan struct{} } -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { - switch { - case strings.HasSuffix(f.FilePath(), ".gz"): - fallthrough - case strings.HasSuffix(f.FilePath(), ".gzip"): - logger.Info(f.FilePath(), "Detected gzip compression format") - var gzipReader *gzip.Reader - gzipReader, err = gzip.NewReader(fd) - if err != nil { - return - } - reader = bufio.NewReader(gzipReader) - case strings.HasSuffix(f.FilePath(), ".zst"): - logger.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) - default: - reader = bufio.NewReader(fd) - } - - return -} - // String returns the string representation of the readFile func (f readFile) String() string { - return fmt.Sprintf("readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", + return fmt.Sprintf( + "readFile(filePath:%s,globID:%s,retry:%v,canSkipLines:%v,seekEOF:%v)", f.filePath, f.globID, f.retry, @@ -80,8 +63,10 @@ func (f readFile) Retry() bool { } // Start tailing a log file. -func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines chan<- line.Line, re regex.Regex) error { - logger.Debug("readFile", f) +func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, + lines chan<- line.Line, re regex.Regex) error { + + dlog.Common.Debug("readFile", f) defer func() { select { case <-f.limiter: @@ -93,7 +78,8 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c case f.limiter <- struct{}{}: default: select { - case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."): + case f.serverMessages <- dlog.Common.Warn(f.filePath, f.globID, + "Server limit reached. Queuing file..."): case <-ctx.Done(): return nil } @@ -110,111 +96,335 @@ func (f readFile) Start(ctx context.Context, lContext lcontext.LContext, lines c fd.Seek(0, io.SeekEnd) } - rawLines := make(chan []byte, 100) + rawLines := make(chan *bytes.Buffer, 100) + truncate := make(chan struct{}) + readCtx, readCancel := context.WithCancel(ctx) + var filterWg sync.WaitGroup + filterWg.Add(1) - filterDone := make(chan struct{}) + go f.periodicTruncateCheck(ctx, truncate) go func() { - f.filter(ctx, rawLines, lines, re, lContext) - close(filterDone) + f.filter(ctx, ltx, rawLines, lines, re) + filterWg.Done() // If the filter stopped, make the reader stop too, no need to read // more data if there is nothing more the filter wants to filter for! // E.g. it could be that we only want to filter N matches but not more. readCancel() }() - err = f.read(readCtx, fd, rawLines) + err = f.read(readCtx, fd, rawLines, truncate) close(rawLines) - - // Filter may flushes some data still. So wait until it is done here. - <-filterDone + // Filter may sends some data still. So wait until it is done here. + filterWg.Wait() return err } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte) error { - var offset uint64 +func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { + for { + select { + case <-time.After(time.Second * 3): + select { + case truncate <- struct{}{}: + case <-ctx.Done(): + } + case <-ctx.Done(): + return + } + } +} +func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { + switch { + case strings.HasSuffix(f.FilePath(), ".gz"): + fallthrough + case strings.HasSuffix(f.FilePath(), ".gzip"): + dlog.Common.Info(f.FilePath(), "Detected gzip compression format") + var gzipReader *gzip.Reader + gzipReader, err = gzip.NewReader(fd) + if err != nil { + return + } + reader = bufio.NewReader(gzipReader) + case strings.HasSuffix(f.FilePath(), ".zst"): + dlog.Common.Info(f.FilePath(), "Detected zstd compression format") + reader = bufio.NewReader(zstd.NewReader(fd)) + default: + reader = bufio.NewReader(fd) + } + return +} + +func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { + var offset uint64 reader, err := f.makeReader(fd) if err != nil { return err } - rawLine := make([]byte, 0, 512) lineLengthThreshold := 1024 * 1024 // 1mb - longLineWarning := false - - checkTruncate := f.truncateTimer(ctx) + warnedAboutLongLine := false + message := pool.BytesBuffer.Get().(*bytes.Buffer) for { - select { - case <-ctx.Done(): - return nil - default: - } - - select { - case <-checkTruncate: - if isTruncated, err := f.truncated(fd); isTruncated { - return err - } - logger.Info(f.filePath, "Current offset", offset) - default: - } - - // Read some bytes (max 4k at once as of go 1.12). isPrefix will - // be set if line does not fit into 4k buffer. - bytes, isPrefix, err := reader.ReadLine() + b, err := reader.ReadByte() if err != nil { - // If EOF, sleep a couple of ms and return with nil error. - // If other error, return with non-nil error. if err != io.EOF { return err } + select { + case <-truncate: + if isTruncated, err := f.truncated(fd); isTruncated { + return err + } + case <-ctx.Done(): + return nil + default: + } if !f.seekEOF { - logger.Debug(f.FilePath(), "End of file reached") + dlog.Common.Info(f.FilePath(), "End of file reached") return nil } time.Sleep(time.Millisecond * 100) continue } + offset++ - rawLine = append(rawLine, bytes...) - offset += uint64(len(bytes)) - - if !isPrefix { - // last LineRead call returned contend until end of line. - rawLine = append(rawLine, '\n') + switch b { + case '\n': select { - case rawLines <- rawLine: + case rawLines <- message: + message = pool.BytesBuffer.Get().(*bytes.Buffer) + //fmt.Printf("%d %d %p\n", message.Len(), message.Cap(), message) + warnedAboutLongLine = false case <-ctx.Done(): return nil } - rawLine = make([]byte, 0, 512) - if longLineWarning { - longLineWarning = false + default: + if message.Len() >= lineLengthThreshold { + if !warnedAboutLongLine { + f.serverMessages <- dlog.Common.Warn(f.filePath, + "Long log line, splitting into multiple lines") + warnedAboutLongLine = true + } + message.WriteString("\n") + select { + case rawLines <- message: + message = pool.BytesBuffer.Get().(*bytes.Buffer) + case <-ctx.Done(): + return nil + } + } + message.WriteByte(b) + } + } +} + +// Filter log lines matching a given regular expression. +func (f readFile) filter(ctx context.Context, ltx lcontext.LContext, + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + + // Do we have any kind of local context settings? If so then run the more complex + // filterWithLContext method. + if ltx.Has() { + // We can not skip transmitting any lines to the client with a local + // grep context specified. + f.canSkipLines = false + f.filterWithLContext(ctx, ltx, rawLines, lines, re) + return + } + + f.filterWithoutLContext(ctx, rawLines, lines, re) +} + +func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer, + lines chan<- line.Line, re regex.Regex) { + + for { + select { + case line, ok := <-rawLines: + f.updatePosition() + if !ok { + return + } + if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { + select { + case lines <- filteredLine: + case <-ctx.Done(): + return + } + } + } + } +} + +// Filter log lines matching a given regular expression, however with local grep context. +func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { + + // Scenario 1: Finish once maxCount hits found + maxCount := ltx.MaxCount + processMaxCount := maxCount > 0 + maxReached := false + + // Scenario 2: Print prev. N lines when current line matches. + before := ltx.BeforeContext + processBefore := before > 0 + var beforeBuf chan *bytes.Buffer + if processBefore { + beforeBuf = make(chan *bytes.Buffer, before) + } + + // Screnario 3: Print next N lines when current line matches. + after := 0 + processAfter := ltx.AfterContext > 0 + + for lineBytesBuffer := range rawLines { + f.updatePosition() + + if !re.Match(lineBytesBuffer.Bytes()) { + f.updateLineNotMatched() + + if processAfter && after > 0 { + after-- + myLine := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: 100, + } + + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + + } else if processBefore { + // Keep last num BeforeContext raw messages. + select { + case beforeBuf <- lineBytesBuffer: + default: + pool.RecycleBytesBuffer(<-beforeBuf) + beforeBuf <- lineBytesBuffer + } } continue } - // Last LineRead call could not read content until end of line, buffer - // was too small. Determine whether we exceed the max line length we - // want dtail to send to the client at once. Possibly split up log line - // into multiple log lines. - if len(rawLine) >= lineLengthThreshold { - if !longLineWarning { - f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") - // Only print out one warning per long log line. - longLineWarning = true + f.updateLineMatched() + + if processAfter { + if maxReached { + return } - rawLine = append(rawLine, '\n') - select { - case rawLines <- rawLine: - case <-ctx.Done(): - return nil + after = ltx.AfterContext + } + + if processBefore { + i := uint64(len(beforeBuf)) + for { + select { + case lineBytesBuffer := <-beforeBuf: + myLine := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount() - i, + TransmittedPerc: 100, + } + i-- + + select { + case lines <- myLine: + case <-ctx.Done(): + return + } + default: + // beforeBuf is now empty. + } + if len(beforeBuf) == 0 { + break + } } - rawLine = make([]byte, 0, 512) } + + line := line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: 100, + } + + select { + case lines <- line: + if processMaxCount { + maxCount-- + if maxCount == 0 { + if !processAfter || after == 0 { + return + } + // Unfortunatley we have to continue filter, as there might be more lines to print + maxReached = true + } + } + case <-ctx.Done(): + return + } + } +} + +func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int, + re regex.Regex) (line.Line, bool) { + + var read line.Line + if !re.Match(lineBytesBuffer.Bytes()) { + f.updateLineNotMatched() + f.updateLineNotTransmitted() + return read, false + } + f.updateLineMatched() + + // Can we actually send more messages, channel capacity reached? + if f.canSkipLines && length >= capacity { + f.updateLineNotTransmitted() + return read, false + } + f.updateLineTransmitted() + + read = line.Line{ + Content: lineBytesBuffer, + SourceID: f.globID, + Count: f.totalLineCount(), + TransmittedPerc: f.transmittedPerc(), + } + return read, true +} + +// Check wether log file is truncated. Returns nil if not. +func (f readFile) truncated(fd *os.File) (bool, error) { + dlog.Common.Debug(f.filePath, "File truncation check") + + // Can not seek currently open FD. + curPos, err := fd.Seek(0, os.SEEK_CUR) + if err != nil { + return true, err + } + // Can not open file at original path. + pathFd, err := os.Open(f.filePath) + if err != nil { + return true, err + } + defer pathFd.Close() + + // Can not seek file at original path. + pathPos, err := pathFd.Seek(0, io.SeekEnd) + if err != nil { + return true, err + } + if curPos > pathPos { + return true, errors.New("File got truncated") } + return false, nil } diff --git a/internal/io/fs/tailfile.go b/internal/io/fs/tailfile.go index 14994e5..b03b45d 100644 --- a/internal/io/fs/tailfile.go +++ b/internal/io/fs/tailfile.go @@ -6,7 +6,9 @@ type TailFile struct { } // NewTailFile returns a new file tailer. -func NewTailFile(filePath string, globID string, serverMessages chan<- string, limiter chan struct{}) TailFile { +func NewTailFile(filePath string, globID string, serverMessages chan<- string, + limiter chan struct{}) TailFile { + return TailFile{ readFile: readFile{ filePath: filePath, diff --git a/internal/io/fs/truncate.go b/internal/io/fs/truncate.go deleted file mode 100644 index a8d59ac..0000000 --- a/internal/io/fs/truncate.go +++ /dev/null @@ -1,61 +0,0 @@ -package fs - -import ( - "context" - "errors" - "io" - "os" - "time" - - "github.com/mimecast/dtail/internal/io/logger" -) - -func (f readFile) truncateTimer(ctx context.Context) (checkTruncate chan struct{}) { - checkTruncate = make(chan struct{}) - - go func() { - for { - select { - case <-time.After(time.Second * 3): - select { - case checkTruncate <- struct{}{}: - case <-ctx.Done(): - } - case <-ctx.Done(): - return - } - } - }() - - return -} - -// Check wether log file is truncated. Returns nil if not. -func (f readFile) truncated(fd *os.File) (bool, error) { - logger.Debug(f.filePath, "File truncation check") - - // Can not seek currently open FD. - curPos, err := fd.Seek(0, os.SEEK_CUR) - if err != nil { - return true, err - } - - // Can not open file at original path. - pathFd, err := os.Open(f.filePath) - if err != nil { - return true, err - } - defer pathFd.Close() - - // Can not seek file at original path. - pathPos, err := pathFd.Seek(0, io.SeekEnd) - if err != nil { - return true, err - } - - if curPos > pathPos { - return true, errors.New("File got truncated") - } - - return false, nil -} |
