summaryrefslogtreecommitdiff
path: root/fs/readfile.go
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-01-20 18:41:05 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-01-21 14:35:23 +0000
commitc128865c4c7411c29a59fca9a3a2f95537686d7b (patch)
tree193bccc70d942c8b70cc93fae2670263701e43aa /fs/readfile.go
parent3755a9911ecb05886577095f2b8cc8b9e4066a3a (diff)
Move commands to cmd/ and move internal dependencies to internal/
Diffstat (limited to 'fs/readfile.go')
-rw-r--r--fs/readfile.go318
1 files changed, 0 insertions, 318 deletions
diff --git a/fs/readfile.go b/fs/readfile.go
deleted file mode 100644
index 375378b..0000000
--- a/fs/readfile.go
+++ /dev/null
@@ -1,318 +0,0 @@
-package fs
-
-import (
- "bufio"
- "compress/gzip"
- "dtail/logger"
- "errors"
- "io"
- "os"
- "regexp"
- "strings"
- "sync"
- "time"
-
- "github.com/DataDog/zstd"
-)
-
-// Used to tail and filter a local log file.
-type readFile struct {
- // Various statistics (e.g. regex hit percentage, transfer percentage).
- stats
- // Path of log file to tail.
- filePath string
- // Only consider all log lines matching this regular expression.
- re *regexp.Regexp
- // The glob identifier of the file.
- globID string
- // Channel to send a server message to the dtail client
- serverMessages chan<- string
- // Signals to stop tailing the log file.
- stop chan struct{}
- // Periodically retry reading file.
- retry bool
- // Can I skip messages when there are too many?
- canSkipLines bool
- // Seek to the EOF before processing file?
- seekEOF bool
- // Mutex to control the stopping of the file
- mutex *sync.Mutex
- limiter chan struct{}
-}
-
-// FilePath returns the full file path.
-func (f readFile) FilePath() string {
- return f.filePath
-}
-
-// Retry reading the file on error?
-func (f readFile) Retry() bool {
- return f.retry
-}
-
-// Start tailing a log file.
-func (f readFile) Start(lines chan<- LineRead, regex string) error {
- defer func() {
- select {
- case <-f.limiter:
- default:
- }
- }()
-
- select {
- case f.limiter <- struct{}{}:
- default:
- select {
- case f.serverMessages <- logger.Warn(f.filePath, f.globID, "Server limit reached. Queuing file..."):
- case <-f.stop:
- return nil
- }
- f.limiter <- struct{}{}
- }
-
- fd, err := os.Open(f.filePath)
- if err != nil {
- return err
- }
- defer fd.Close()
-
- if f.seekEOF {
- fd.Seek(0, io.SeekEnd)
- }
-
- rawLines := make(chan []byte, 100)
- truncate := make(chan struct{})
-
- var wg sync.WaitGroup
- wg.Add(1)
-
- go f.periodicTruncateCheck(truncate)
- go f.filter(&wg, rawLines, lines, regex)
-
- err = f.read(fd, rawLines, truncate)
- close(rawLines)
- wg.Wait()
-
- return err
-}
-
-func (f readFile) periodicTruncateCheck(truncate chan struct{}) {
- for {
- select {
- case <-time.After(time.Second * 3):
- select {
- case truncate <- struct{}{}:
- case <-f.stop:
- }
- case <-f.stop:
- return
- }
- }
-}
-
-// Stop reading file.
-func (f readFile) Stop() {
- f.mutex.Lock()
- defer f.mutex.Unlock()
-
- select {
- case <-f.stop:
- return
- default:
- }
-
- close(f.stop)
-}
-
-func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) {
- switch {
- case strings.HasSuffix(f.FilePath(), ".gz"):
- fallthrough
- case strings.HasSuffix(f.FilePath(), ".gzip"):
- logger.Info(f.FilePath(), "Detected gzip compression format")
- var gzipReader *gzip.Reader
- gzipReader, err = gzip.NewReader(fd)
- if err != nil {
- return
- }
- reader = bufio.NewReader(gzipReader)
- case strings.HasSuffix(f.FilePath(), ".zst"):
- logger.Info(f.FilePath(), "Detected zstd compression format")
- reader = bufio.NewReader(zstd.NewReader(fd))
- default:
- reader = bufio.NewReader(fd)
- }
-
- return
-}
-
-func (f readFile) read(fd *os.File, rawLines chan []byte, truncate <-chan struct{}) error {
- reader, err := f.makeReader(fd)
- if err != nil {
- return err
- }
- rawLine := make([]byte, 0, 512)
- var offset uint64
-
- lineLengthThreshold := 1024 * 1024 // 1mb
- longLineWarning := false
-
- for {
- select {
- case <-truncate:
- if isTruncated, err := f.truncated(fd); isTruncated {
- return err
- }
- logger.Info(f.filePath, "Current offset", offset)
-
- case <-f.stop:
- return nil
- default:
- }
-
- // Read some bytes (max 4k at once as of go 1.12). isPrefix will
- // be set if line does not fit into 4k buffer.
- bytes, isPrefix, err := reader.ReadLine()
-
- if err != nil {
- // If EOF, sleep a couple of ms and return with nil error.
- // If other error, return with non-nil error.
- if err != io.EOF {
- return err
- }
- if !f.seekEOF {
- logger.Debug(f.FilePath(), "End of file reached")
- return nil
- }
- time.Sleep(time.Millisecond * 100)
- continue
- }
-
- rawLine = append(rawLine, bytes...)
- offset += uint64(len(bytes))
-
- if !isPrefix {
- // last LineRead call returned contend until end of line.
- rawLine = append(rawLine, '\n')
- select {
- case rawLines <- rawLine:
- case <-f.stop:
- return nil
- }
- rawLine = make([]byte, 0, 512)
- if longLineWarning {
- longLineWarning = false
- }
- continue
- }
-
- // Last LineRead call could not read content until end of line, buffer
- // was too small. Determine whether we exceed the max line length we
- // want dtail to send to the client at once. Possibly split up log line
- // into multiple log lines.
- if len(rawLine) >= lineLengthThreshold {
- if !longLineWarning {
- f.serverMessages <- logger.Warn(f.filePath, "Long log line, splitting into multiple lines")
- // Only print out one warning per long log line.
- longLineWarning = true
- }
- rawLine = append(rawLine, '\n')
- select {
- case rawLines <- rawLine:
- case <-f.stop:
- return nil
- }
- rawLine = make([]byte, 0, 512)
- }
- }
-}
-
-// Filter log lines matching a given regular expression.
-func (f readFile) filter(wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- LineRead, regex string) {
- defer wg.Done()
-
- if regex == "" {
- regex = "."
- }
-
- re, err := regexp.Compile(regex)
- if err != nil {
- logger.Error(regex, "Can't compile regex, using '.' instead", err)
- re = regexp.MustCompile(".")
- }
- f.re = re
-
- for {
- select {
- case line, ok := <-rawLines:
- f.updatePosition()
- if !ok {
- return
- }
- if filteredLine, ok := f.transmittable(line, len(lines), cap(lines)); ok {
- select {
- case lines <- filteredLine:
- case <-f.stop:
- return
- }
- }
- }
- }
-}
-
-func (f readFile) transmittable(line []byte, length, capacity int) (LineRead, bool) {
- var read LineRead
-
- if !f.re.Match(line) {
- f.updateLineNotMatched()
- f.updateLineNotTransmitted()
- return read, false
- }
- f.updateLineMatched()
-
- // Can we actually send more messages, channel capacity reached?
- if f.canSkipLines && length >= capacity {
- f.updateLineNotTransmitted()
- return read, false
- }
- f.updateLineTransmitted()
-
- read = LineRead{
- Content: line,
- GlobID: &f.globID,
- Count: f.totalLineCount(),
- TransmittedPerc: f.transmittedPerc(),
- }
-
- return read, true
-}
-
-// Check wether log file is truncated. Returns nil if not.
-func (f readFile) truncated(fd *os.File) (bool, error) {
- logger.Debug(f.filePath, "File truncation check")
-
- // Can not seek currently open FD.
- curPos, err := fd.Seek(0, os.SEEK_CUR)
- if err != nil {
- return true, err
- }
-
- // Can not open file at original path.
- pathFd, err := os.Open(f.filePath)
- if err != nil {
- return true, err
- }
- defer pathFd.Close()
-
- // Can not seek file at original path.
- pathPos, err := pathFd.Seek(0, io.SeekEnd)
- if err != nil {
- return true, err
- }
-
- if curPos > pathPos {
- return true, errors.New("File got truncated")
- }
-
- return false, nil
-}