diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-30 18:14:46 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-30 18:14:46 +0300 |
| commit | 4bca83bdd8544448732d996919da4d7c61a971dc (patch) | |
| tree | 98762b3cd419ec3b4a7720d5129528b91e94695c /internal/io | |
| parent | 683aa83d8171bd4265f4c6464c1103ec9be8271f (diff) | |
add support to read from stdin pipe in serverless mode, e.g. grep foo.log | dmap "select from ...."
Diffstat (limited to 'internal/io')
| -rw-r--r-- | internal/io/fs/readfile.go | 58 |
1 files changed, 42 insertions, 16 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index a42fc53..9c2f53c 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -65,17 +65,13 @@ func (f readFile) Retry() bool { func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, lines chan<- line.Line, re regex.Regex) error { - dlog.Common.Trace("readFile", f) - - fd, err := os.Open(f.filePath) + reader, fd, err := f.makeReader() + if fd != nil { + defer fd.Close() + } if err != nil { return err } - defer fd.Close() - - if f.seekEOF { - fd.Seek(0, io.SeekEnd) - } rawLines := make(chan *bytes.Buffer, 100) truncate := make(chan struct{}) @@ -94,7 +90,7 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, readCancel() }() - err = f.read(readCtx, fd, rawLines, truncate) + err = f.read(readCtx, fd, reader, rawLines, truncate) close(rawLines) // Filter may sends some data still. So wait until it is done here. filterWg.Wait() @@ -102,6 +98,36 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, return err } +func (f readFile) makeReader() (*bufio.Reader, *os.File, error) { + if f.filePath == "PIPE" && f.globID == "PIPE" { + return f.makePipeReader() + } + return f.makeFileReader() +} + +func (f readFile) makeFileReader() (*bufio.Reader, *os.File, error) { + var reader *bufio.Reader + fd, err := os.Open(f.filePath) + if err != nil { + return reader, fd, err + } + + if f.seekEOF { + fd.Seek(0, io.SeekEnd) + } + + reader, err = f.makeCompressedFileReader(fd) + if err != nil { + return reader, fd, err + } + + return reader, fd, nil +} + +func (f readFile) makePipeReader() (*bufio.Reader, *os.File, error) { + return bufio.NewReader(os.Stdin), nil, nil +} + func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struct{}) { for { select { @@ -116,7 +142,7 @@ func (f readFile) periodicTruncateCheck(ctx context.Context, truncate chan struc } } -func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { +func (f readFile) makeCompressedFileReader(fd *os.File) (reader *bufio.Reader, err error) { switch { case strings.HasSuffix(f.FilePath(), ".gz"): fallthrough @@ -137,14 +163,10 @@ func (f readFile) makeReader(fd *os.File) (reader *bufio.Reader, err error) { return } -func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Buffer, - truncate <-chan struct{}) error { +func (f readFile) read(ctx context.Context, fd *os.File, reader *bufio.Reader, + rawLines chan *bytes.Buffer, truncate <-chan struct{}) error { var offset uint64 - reader, err := f.makeReader(fd) - if err != nil { - return err - } lineLengthThreshold := 1024 * 1024 // 1mb warnedAboutLongLine := false @@ -387,6 +409,10 @@ func (f readFile) transmittable(lineBytesBuffer *bytes.Buffer, length, capacity // Check wether log file is truncated. Returns nil if not. func (f readFile) truncated(fd *os.File) (bool, error) { + if fd == nil { + return false, nil + } + dlog.Common.Debug(f.filePath, "File truncation check") // Can not seek currently open FD. |
