diff options
Diffstat (limited to 'internal/fs/readfile.go')
| -rw-r--r-- | internal/fs/readfile.go | 318 |
1 files changed, 0 insertions, 318 deletions
diff --git a/internal/fs/readfile.go b/internal/fs/readfile.go deleted file mode 100644 index 312447a..0000000 --- a/internal/fs/readfile.go +++ /dev/null @@ -1,318 +0,0 @@ -package fs - -import ( - "bufio" - "compress/gzip" - "github.com/mimecast/dtail/internal/logger" - "errors" - "io" - "os" - "regexp" - "strings" - "sync" - "time" - - "github.com/DataDog/zstd" -) - -// Used to tail and filter a local log file. -type readFile struct { - // Various statistics (e.g. regex hit percentage, transfer percentage). - stats - // Path of log file to tail. - filePath string - // Only consider all log lines matching this regular expression. - re *regexp.Regexp - // The glob identifier of the file. - globID string - // Channel to send a server message to the dtail client - serverMessages chan<- string - // Signals to stop tailing the log file. - stop chan struct{} - // Periodically retry reading file. - retry bool - // Can I skip messages when there are too many? - canSkipLines bool - // Seek to the EOF before processing file? - seekEOF bool - // Mutex to control the stopping of the file - mutex *sync.Mutex - limiter chan struct{} -} - -// FilePath returns the full file path. -func (f readFile) FilePath() string { - return f.filePath -} - -// Retry reading the file on error? -func (f readFile) Retry() bool { - return f.retry -} - -// Start tailing a log file. -func (f readFile) Start(lines chan<- LineRead, regex string) error { - defer func() { - select { - case <-f.limiter: - default: - } - }() - - select { - case f.limiter <- struct{}{}: - default: - select { - case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."): - case <-f.stop: - return nil - } - f.limiter <- struct{}{} - } - - fd, err := os.Open(f.filePath) - if err != nil { - return err - } - defer fd.Close() - - if f.seekEOF { - fd.Seek(0, io.SeekEnd) - } - - rawLines := make(chan []byte, 100) - truncate := make(chan struct{}) - - var wg sync.WaitGroup - wg.Add(1) - - go f.periodicTruncateCheck(truncate) - go f.filter(&wg, rawLines, lines, regex) - - err = f.read(fd, rawLines, truncate) - close(rawLines) - wg.Wait() - - return err -} - -func (f readFile) periodicTruncateCheck(truncate chan struct{}) { - for { - select { - case <-time.After(time.Second * 3): - select { - case truncate <- struct{}{}: - case <-f.stop: - } - case <-f.stop: - return - } - } -} - -// Stop reading file. -func (f readFile) Stop() { - f.mutex.Lock() - defer f.mutex.Unlock() - - select { - case <-f.stop: - return - default: - } - - close(f.stop) -} - -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { - switch { - case strings.HasSuffix(f.FilePath(), ".gz"): - fallthrough - case strings.HasSuffix(f.FilePath(), ".gzip"): - logger.Info(f.FilePath(), "Detected gzip compression format") - var gzipReader *gzip.Reader - gzipReader, err = gzip.NewReader(fd) - if err != nil { - return - } - reader = bufio.NewReader(gzipReader) - case strings.HasSuffix(f.FilePath(), ".zst"): - logger.Info(f.FilePath(), "Detected zstd compression format") - reader = bufio.NewReader(zstd.NewReader(fd)) - default: - reader = bufio.NewReader(fd) - } - - return -} - -func (f readFile) read(fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error { - reader, err := f.makeReader(fd) - if err != nil { - return err - } - rawLine := make([]byte, 0, 512) - var offset uint64 - - lineLengthThreshold := 1024 * 1024 // 1mb - longLineWarning := false - - for { - select { - case <-truncate: - if isTruncated, err := f.truncated(fd); isTruncated { - return err - } - logger.Info(f.filePath, "Current offset", offset) - - case <-f.stop: - return nil - default: - } - - // Read some bytes (max 4k at once as of go 1.12). isPrefix will - // be set if line does not fit into 4k buffer. - bytes, isPrefix, err := reader.ReadLine() - - if err != nil { - // If EOF, sleep a couple of ms and return with nil error. - // If other error, return with non-nil error. - if err != io.EOF { - return err - } - if !f.seekEOF { - logger.Debug(f.FilePath(), "End of file reached") - return nil - } - time.Sleep(time.Millisecond * 100) - continue - } - - rawLine = append(rawLine, bytes...) - offset += uint64(len(bytes)) - - if !isPrefix { - // last LineRead call returned contend until end of line. - rawLine = append(rawLine, '\n') - select { - case rawLines <- rawLine: - case <-f.stop: - return nil - } - rawLine = make([]byte, 0, 512) - if longLineWarning { - longLineWarning = false - } - continue - } - - // Last LineRead call could not read content until end of line, buffer - // was too small. Determine whether we exceed the max line length we - // want dtail to send to the client at once. Possibly split up log line - // into multiple log lines. - if len(rawLine) >= lineLengthThreshold { - if !longLineWarning { - f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines") - // Only print out one warning per long log line. - longLineWarning = true - } - rawLine = append(rawLine, '\n') - select { - case rawLines <- rawLine: - case <-f.stop: - return nil - } - rawLine = make([]byte, 0, 512) - } - } -} - -// Filter log lines matching a given regular expression. -func (f readFile) filter(wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- LineRead, regex string) { - defer wg.Done() - - if regex == "" { - regex = "." - } - - re, err := regexp.Compile(regex) - if err != nil { - logger.Error(regex, "Can't compile regex, using '.' instead", err) - re = regexp.MustCompile(".") - } - f.re = re - - for { - select { - case line, ok := <-rawLines: - f.updatePosition() - if !ok { - return - } - if filteredLine, ok := f.transmittable(line, len(lines), cap(lines)); ok { - select { - case lines <- filteredLine: - case <-f.stop: - return - } - } - } - } -} - -func (f readFile) transmittable(line []byte, length, capacity int) (LineRead, bool) { - var read LineRead - - if !f.re.Match(line) { - f.updateLineNotMatched() - f.updateLineNotTransmitted() - return read, false - } - f.updateLineMatched() - - // Can we actually send more messages, channel capacity reached? - if f.canSkipLines && length >= capacity { - f.updateLineNotTransmitted() - return read, false - } - f.updateLineTransmitted() - - read = LineRead{ - Content: line, - GlobID: &f.globID, - Count: f.totalLineCount(), - TransmittedPerc: f.transmittedPerc(), - } - - return read, true -} - -// Check wether log file is truncated. Returns nil if not. -func (f readFile) truncated(fd *os.File) (bool, error) { - logger.Debug(f.filePath, "File truncation check") - - // Can not seek currently open FD. - curPos, err := fd.Seek(0, os.SEEK_CUR) - if err != nil { - return true, err - } - - // Can not open file at original path. - pathFd, err := os.Open(f.filePath) - if err != nil { - return true, err - } - defer pathFd.Close() - - // Can not seek file at original path. - pathPos, err := pathFd.Seek(0, io.SeekEnd) - if err != nil { - return true, err - } - - if curPos > pathPos { - return true, errors.New("File got truncated") - } - - return false, nil -} |
