diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-12-04 11:06:30 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-12-04 11:06:30 +0000 |
| commit | 1c7a6472b36df037fa31eb72fe0b5aa78d79b7fa (patch) | |
| tree | c1112270290e18d92ba5515591b197154ac6e9f8 | |
| parent | bdc4741742567ed95964978f84b94566dcacf505 (diff) | |
Refactor read method to reduce its Cognitive Complexity.
| -rw-r--r-- | internal/io/fs/readfile.go | 155 |
1 files changed, 95 insertions, 60 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 806cd32..9fbbad5 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -23,6 +23,13 @@ import ( "github.com/DataDog/zstd" ) +type readStatus int + +const ( + abortReading readStatus = iota + continueReading readStatus = iota +) + // Used to tail and filter a local log file. type readFile struct { // Various statistics (e.g. regex hit percentage, transfer percentage). @@ -39,6 +46,8 @@ type readFile struct { canSkipLines bool // Seek to the EOF before processing file? seekEOF bool + // Warned already about a long line. + warnedAboutLongLine bool } // String returns the string representation of the readFile @@ -99,14 +108,14 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, return err } -func (f readFile) makeReader() (*bufio.Reader, *os.File, error) { +func (f *readFile) makeReader() (*bufio.Reader, *os.File, error) { if f.filePath == "" && f.globID == "-" { return f.makePipeReader() } return f.makeFileReader() } -func (f readFile) makeFileReader() (*bufio.Reader, *os.File, error) { +func (f *readFile) makeFileReader() (*bufio.Reader, *os.File, error) { var reader *bufio.Reader fd, err := os.Open(f.filePath) if err != nil { @@ -125,11 +134,11 @@ func (f readFile) makeFileReader() (*bufio.Reader, *os.File, error) { return reader, fd, nil } -func (f readFile) makePipeReader() (*bufio.Reader, *os.File, error) { +func (f *readFile) makePipeReader() (*bufio.Reader, *os.File, error) { return bufio.NewReader(os.Stdin), nil, nil } -func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { +func (f *readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { for { select { case <-time.After(time.Second * 3): @@ -143,7 +152,7 @@ func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struc } } -func (f readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) { +func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) { switch { case strings.HasSuffix(f.FilePath(), ".gz"): fallthrough @@ -164,39 +173,19 @@ func (f readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, e return } -func (f readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader, +func (f *readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader, rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { var offset uint64 - warnedAboutLongLine := false message := pool.BytesBuffer.Get().(*bytes.Buffer) for { b, err := reader.ReadByte() - if err != nil { - if err != io.EOF { + status, err := f.handleReadError(ctx, err, fd, rawLines, truncate, message) + if abortReading == status { return err } - select { - case <-truncate: - if isTruncated, err := f.truncated(fd); isTruncated { - return err - } - case <-ctx.Done(): - return nil - default: - } - if !f.seekEOF { - dlog.Common.Info(f.FilePath(), "End of file reached") - if len(message.Bytes()) > 0 { - select { - case rawLines <- message: - case <-ctx.Done(): - } - } - return nil - } time.Sleep(time.Millisecond * 100) continue } @@ -204,36 +193,16 @@ func (f readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader, offset++ message.WriteByte(b) - switch b { - case '\n': - select { - case rawLines <- message: - message = pool.BytesBuffer.Get().(*bytes.Buffer) - warnedAboutLongLine = false - case <-ctx.Done(): - return nil - } - default: - if message.Len() >= config.Server.MaxLineLength { - if !warnedAboutLongLine { - f.serverMessages <- dlog.Common.Warn(f.filePath, - "Long log line, splitting into multiple lines") - warnedAboutLongLine = true - } - message.WriteByte('\n') - select { - case rawLines <- message: - message = pool.BytesBuffer.Get().(*bytes.Buffer) - case <-ctx.Done(): - return nil - } - } + status, newMessage := f.handleReadByte(ctx, b, rawLines, message) + if status == abortReading { + return nil } + message = newMessage } } // Filter log lines matching a given regular expression. -func (f readFile) filter(ctx context.Context, ltx lcontext.LContext, +func (f *readFile) filter(ctx context.Context, ltx lcontext.LContext, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { // Do we have any kind of local context settings? If so then run the more complex @@ -249,7 +218,7 @@ func (f readFile) filter(ctx context.Context, ltx lcontext.LContext, f.filterWithoutLContext(ctx, rawLines, lines, re) } -func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer, +func (f *readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { for { @@ -271,7 +240,7 @@ func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *by } // Filter log lines matching a given regular expression, however with local grep context. -func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, +func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { // Scenario 1: Finish once maxCount hits found @@ -385,7 +354,7 @@ func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext, } } -func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int, +func (f *readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int, re regex.Regex) (line.Line, bool) { var read line.Line @@ -413,7 +382,7 @@ func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity } // Check wether log file is truncated. Returns nil if not. -func (f readFile) truncated(fd *os.File) (bool, error) { +func (f *readFile) truncated(fd *os.File) (bool, error) { if fd == nil { return false, nil } @@ -421,7 +390,7 @@ func (f readFile) truncated(fd *os.File) (bool, error) { dlog.Common.Debug(f.filePath, "File truncation check") // Can not seek currently open FD. - curPos, err := fd.Seek(0, os.SEEK_CUR) + currentPosition, err := fd.Seek(0, os.SEEK_CUR) if err != nil { return true, err } @@ -433,12 +402,78 @@ func (f readFile) truncated(fd *os.File) (bool, error) { defer pathFd.Close() // Can not seek file at original path. - pathPos, err := pathFd.Seek(0, io.SeekEnd) + pathPosition, err := pathFd.Seek(0, io.SeekEnd) if err != nil { return true, err } - if curPos > pathPos { + if currentPosition > pathPosition { return true, errors.New("File got truncated") } return false, nil } + +// Deal with the scenario that nothing could be read from the fd. +func (f *readFile) handleReadError(ctx context.Context, err error, fd *os.File, + rawLines chan *bytes.Buffer, truncate <-chan struct{}, + message *bytes.Buffer) (readStatus, error) { + + if err != io.EOF { + return abortReading, err + } + + select { + case <-truncate: + if isTruncated, err := f.truncated(fd); isTruncated { + return abortReading, err + } + case <-ctx.Done(): + return abortReading, nil + default: + } + + if !f.seekEOF { + dlog.Common.Info(f.FilePath(), "End of file reached") + if len(message.Bytes()) > 0 { + select { + case rawLines <- message: + case <-ctx.Done(): + } + } + return abortReading, nil + } + + return continueReading, nil +} + +// Now process the byte we just read from the fd. +func (f *readFile) handleReadByte(ctx context.Context, b byte, + rawLines chan *bytes.Buffer, message *bytes.Buffer) (readStatus, *bytes.Buffer) { + + switch b { + case '\n': + select { + case rawLines <- message: + message = pool.BytesBuffer.Get().(*bytes.Buffer) + f.warnedAboutLongLine = false + case <-ctx.Done(): + return abortReading, message + } + default: + if message.Len() >= config.Server.MaxLineLength { + if !f.warnedAboutLongLine { + f.serverMessages <- dlog.Common.Warn(f.filePath, + "Long log line, splitting into multiple lines") + f.warnedAboutLongLine = true + } + message.WriteByte('\n') + select { + case rawLines <- message: + message = pool.BytesBuffer.Get().(*bytes.Buffer) + case <-ctx.Done(): + return abortReading, message + } + } + } + + return continueReading, message +} |
