summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2021-12-04 11:06:30 +0000
committerPaul Buetow <pbuetow@mimecast.com>2021-12-04 11:06:30 +0000
commit1c7a6472b36df037fa31eb72fe0b5aa78d79b7fa (patch)
treec1112270290e18d92ba5515591b197154ac6e9f8
parentbdc4741742567ed95964978f84b94566dcacf505 (diff)
Refactor read method to reduce its Cognitive Complexity.
-rw-r--r--internal/io/fs/readfile.go155
1 files changed, 95 insertions, 60 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 806cd32..9fbbad5 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -23,6 +23,13 @@ import (
"github.com/DataDog/zstd"
)
+type readStatus int
+
+const (
+ abortReading readStatus = iota
+ continueReading readStatus = iota
+)
+
// Used to tail and filter a local log file.
type readFile struct {
// Various statistics (e.g. regex hit percentage, transfer percentage).
@@ -39,6 +46,8 @@ type readFile struct {
canSkipLines bool
// Seek to the EOF before processing file?
seekEOF bool
+ // Warned already about a long line.
+ warnedAboutLongLine bool
}
// String returns the string representation of the readFile
@@ -99,14 +108,14 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext,
return err
}
-func (f readFile) makeReader() (*bufio.Reader, *os.File, error) {
+func (f *readFile) makeReader() (*bufio.Reader, *os.File, error) {
if f.filePath == "" && f.globID == "-" {
return f.makePipeReader()
}
return f.makeFileReader()
}
-func (f readFile) makeFileReader() (*bufio.Reader, *os.File, error) {
+func (f *readFile) makeFileReader() (*bufio.Reader, *os.File, error) {
var reader *bufio.Reader
fd, err := os.Open(f.filePath)
if err != nil {
@@ -125,11 +134,11 @@ func (f readFile) makeFileReader() (*bufio.Reader, *os.File, error) {
return reader, fd, nil
}
-func (f readFile) makePipeReader() (*bufio.Reader, *os.File, error) {
+func (f *readFile) makePipeReader() (*bufio.Reader, *os.File, error) {
return bufio.NewReader(os.Stdin), nil, nil
}
-func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
+func (f *readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) {
for {
select {
case <-time.After(time.Second * 3):
@@ -143,7 +152,7 @@ func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struc
}
}
-func (f readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) {
+func (f *readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) {
switch {
case strings.HasSuffix(f.FilePath(), ".gz"):
fallthrough
@@ -164,39 +173,19 @@ func (f readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, e
return
}
-func (f readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader,
+func (f *readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader,
rawLines chan *bytes.Buffer, truncate <-chan struct{}) error {
var offset uint64
- warnedAboutLongLine := false
message := pool.BytesBuffer.Get().(*bytes.Buffer)
for {
b, err := reader.ReadByte()
-
if err != nil {
- if err != io.EOF {
+ status, err := f.handleReadError(ctx, err, fd, rawLines, truncate, message)
+ if abortReading == status {
return err
}
- select {
- case <-truncate:
- if isTruncated, err := f.truncated(fd); isTruncated {
- return err
- }
- case <-ctx.Done():
- return nil
- default:
- }
- if !f.seekEOF {
- dlog.Common.Info(f.FilePath(), "End of file reached")
- if len(message.Bytes()) > 0 {
- select {
- case rawLines <- message:
- case <-ctx.Done():
- }
- }
- return nil
- }
time.Sleep(time.Millisecond * 100)
continue
}
@@ -204,36 +193,16 @@ func (f readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader,
offset++
message.WriteByte(b)
- switch b {
- case '\n':
- select {
- case rawLines <- message:
- message = pool.BytesBuffer.Get().(*bytes.Buffer)
- warnedAboutLongLine = false
- case <-ctx.Done():
- return nil
- }
- default:
- if message.Len() >= config.Server.MaxLineLength {
- if !warnedAboutLongLine {
- f.serverMessages <- dlog.Common.Warn(f.filePath,
- "Long log line, splitting into multiple lines")
- warnedAboutLongLine = true
- }
- message.WriteByte('\n')
- select {
- case rawLines <- message:
- message = pool.BytesBuffer.Get().(*bytes.Buffer)
- case <-ctx.Done():
- return nil
- }
- }
+ status, newMessage := f.handleReadByte(ctx, b, rawLines, message)
+ if status == abortReading {
+ return nil
}
+ message = newMessage
}
}
// Filter log lines matching a given regular expression.
-func (f readFile) filter(ctx context.Context, ltx lcontext.LContext,
+func (f *readFile) filter(ctx context.Context, ltx lcontext.LContext,
rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
// Do we have any kind of local context settings? If so then run the more complex
@@ -249,7 +218,7 @@ func (f readFile) filter(ctx context.Context, ltx lcontext.LContext,
f.filterWithoutLContext(ctx, rawLines, lines, re)
}
-func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer,
+func (f *readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *bytes.Buffer,
lines chan<- line.Line, re regex.Regex) {
for {
@@ -271,7 +240,7 @@ func (f readFile) filterWithoutLContext(ctx context.Context, rawLines <-chan *by
}
// Filter log lines matching a given regular expression, however with local grep context.
-func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext,
+func (f *readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext,
rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) {
// Scenario 1: Finish once maxCount hits found
@@ -385,7 +354,7 @@ func (f readFile) filterWithLContext(ctx context.Context, ltx lcontext.LContext,
}
}
-func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int,
+func (f *readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity int,
re regex.Regex) (line.Line, bool) {
var read line.Line
@@ -413,7 +382,7 @@ func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity
}
// Check wether log file is truncated. Returns nil if not.
-func (f readFile) truncated(fd *os.File) (bool, error) {
+func (f *readFile) truncated(fd *os.File) (bool, error) {
if fd == nil {
return false, nil
}
@@ -421,7 +390,7 @@ func (f readFile) truncated(fd *os.File) (bool, error) {
dlog.Common.Debug(f.filePath, "File truncation check")
// Can not seek currently open FD.
- curPos, err := fd.Seek(0, os.SEEK_CUR)
+ currentPosition, err := fd.Seek(0, os.SEEK_CUR)
if err != nil {
return true, err
}
@@ -433,12 +402,78 @@ func (f readFile) truncated(fd *os.File) (bool, error) {
defer pathFd.Close()
// Can not seek file at original path.
- pathPos, err := pathFd.Seek(0, io.SeekEnd)
+ pathPosition, err := pathFd.Seek(0, io.SeekEnd)
if err != nil {
return true, err
}
- if curPos > pathPos {
+ if currentPosition > pathPosition {
return true, errors.New("File got truncated")
}
return false, nil
}
+
+// Deal with the scenario that nothing could be read from the fd.
+func (f *readFile) handleReadError(ctx context.Context, err error, fd *os.File,
+ rawLines chan *bytes.Buffer, truncate <-chan struct{},
+ message *bytes.Buffer) (readStatus, error) {
+
+ if err != io.EOF {
+ return abortReading, err
+ }
+
+ select {
+ case <-truncate:
+ if isTruncated, err := f.truncated(fd); isTruncated {
+ return abortReading, err
+ }
+ case <-ctx.Done():
+ return abortReading, nil
+ default:
+ }
+
+ if !f.seekEOF {
+ dlog.Common.Info(f.FilePath(), "End of file reached")
+ if len(message.Bytes()) > 0 {
+ select {
+ case rawLines <- message:
+ case <-ctx.Done():
+ }
+ }
+ return abortReading, nil
+ }
+
+ return continueReading, nil
+}
+
+// Now process the byte we just read from the fd.
+func (f *readFile) handleReadByte(ctx context.Context, b byte,
+ rawLines chan *bytes.Buffer, message *bytes.Buffer) (readStatus, *bytes.Buffer) {
+
+ switch b {
+ case '\n':
+ select {
+ case rawLines <- message:
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ f.warnedAboutLongLine = false
+ case <-ctx.Done():
+ return abortReading, message
+ }
+ default:
+ if message.Len() >= config.Server.MaxLineLength {
+ if !f.warnedAboutLongLine {
+ f.serverMessages <- dlog.Common.Warn(f.filePath,
+ "Long log line, splitting into multiple lines")
+ f.warnedAboutLongLine = true
+ }
+ message.WriteByte('\n')
+ select {
+ case rawLines <- message:
+ message = pool.BytesBuffer.Get().(*bytes.Buffer)
+ case <-ctx.Done():
+ return abortReading, message
+ }
+ }
+ }
+
+ return continueReading, message
+}