diff options
| author | Paul Buetow <git@mx.buetow.org> | 2021-02-07 10:43:10 +0000 |
|---|---|---|
| committer | Paul Buetow <git@mx.buetow.org> | 2021-02-07 10:43:10 +0000 |
| commit | 07b9fd5044a4eb470a74048bf2878bc9d75afa1d (patch) | |
| tree | eca342e9c7ea67925e242aaab70597af8d4df480 /internal/io | |
| parent | 742e6c444f7236ca3c9953050b0704bc88283ed3 (diff) | |
add rbuffer data structure
Diffstat (limited to 'internal/io')
| -rw-r--r-- | internal/io/fs/readfile.go | 68 |
1 files changed, 55 insertions, 13 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 4b2af7c..4ac82d8 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -231,54 +231,96 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t func (f readFile) filter(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { defer wg.Done() - /* - beforeContext := make([]string, lContext.BeforeContext) - afterContext := make([]string, lContext.AfterContext) - */ + for { + select { + case rawLine, ok := <-rawLines: + f.updatePosition() + if !ok { + return + } + + line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re) + if transmittable { + select { + case lines <- line: + continue + case <-ctx.Done(): + return + } + } + } + } +} + +// Filter log lines matching a given regular expression. +func (f readFile) filterWithLContext(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) { + defer wg.Done() + + var bPos, bCount int + before := make([]*[]byte, lContext.BeforeContext) for { select { - case line, ok := <-rawLines: + case rawLine, ok := <-rawLines: f.updatePosition() if !ok { return } - if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok { + + if lContext.BeforeContext > 0 { + before[bPos] = &rawLine + bPos = (bPos + 1) % lContext.BeforeContext + if bCount < lContext.BeforeContext { + bCount++ + } + } + + line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re) + if transmittable { + if lContext.BeforeContext > 0 { + for bCount > 0 { + bCount-- + } + } select { - case lines <- filteredLine: + case lines <- line: + continue case <-ctx.Done(): return } } + // before[bPos] = line + // bPos = (bPos+1) % lContext.BeforeContext + // bCount = (bCount+1) % lContext.BeforeContext } } } -func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) { +func (f readFile) lineTransmittable(rawLine []byte, length, capacity int, re regex.Regex) (line.Line, bool, bool) { var read line.Line - if !re.Match(lineBytes) { + if !re.Match(rawLine) { f.updateLineNotMatched() f.updateLineNotTransmitted() - return read, false + return read, false, false } f.updateLineMatched() // Can we actually send more messages, channel capacity reached? if f.canSkipLines && length >= capacity { f.updateLineNotTransmitted() - return read, false + return read, true, false } f.updateLineTransmitted() read = line.Line{ - Content: lineBytes, + Content: rawLine, SourceID: f.globID, Count: f.totalLineCount(), TransmittedPerc: f.transmittedPerc(), } - return read, true + return read, true, true } // Check wether log file is truncated. Returns nil if not. |
