summaryrefslogtreecommitdiff
path: root/internal/io
diff options
context:
space:
mode:
authorPaul Buetow <git@mx.buetow.org>2021-02-07 10:43:10 +0000
committerPaul Buetow <git@mx.buetow.org>2021-02-07 10:43:10 +0000
commit07b9fd5044a4eb470a74048bf2878bc9d75afa1d (patch)
treeeca342e9c7ea67925e242aaab70597af8d4df480 /internal/io
parent742e6c444f7236ca3c9953050b0704bc88283ed3 (diff)
add rbuffer data structure
Diffstat (limited to 'internal/io')
-rw-r--r--internal/io/fs/readfile.go68
1 files changed, 55 insertions, 13 deletions
diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go
index 4b2af7c..4ac82d8 100644
--- a/internal/io/fs/readfile.go
+++ b/internal/io/fs/readfile.go
@@ -231,54 +231,96 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan []byte, t
func (f readFile) filter(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
defer wg.Done()
- /*
- beforeContext := make([]string, lContext.BeforeContext)
- afterContext := make([]string, lContext.AfterContext)
- */
+ for {
+ select {
+ case rawLine, ok := <-rawLines:
+ f.updatePosition()
+ if !ok {
+ return
+ }
+
+ line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re)
+ if transmittable {
+ select {
+ case lines <- line:
+ continue
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }
+}
+
+// Filter log lines matching a given regular expression.
+func (f readFile) filterWithLContext(ctx context.Context, lContext lcontext.LContext, wg *sync.WaitGroup, rawLines <-chan []byte, lines chan<- line.Line, re regex.Regex) {
+ defer wg.Done()
+
+ var bPos, bCount int
+ before := make([]*[]byte, lContext.BeforeContext)
for {
select {
- case line, ok := <-rawLines:
+ case rawLine, ok := <-rawLines:
f.updatePosition()
if !ok {
return
}
- if filteredLine, ok := f.transmittable(line, len(lines), cap(lines), re); ok {
+
+ if lContext.BeforeContext > 0 {
+ before[bPos] = &rawLine
+ bPos = (bPos + 1) % lContext.BeforeContext
+ if bCount < lContext.BeforeContext {
+ bCount++
+ }
+ }
+
+ line, _, transmittable := f.lineTransmittable(rawLine, len(lines), cap(lines), re)
+ if transmittable {
+ if lContext.BeforeContext > 0 {
+ for bCount > 0 {
+ bCount--
+ }
+ }
select {
- case lines <- filteredLine:
+ case lines <- line:
+ continue
case <-ctx.Done():
return
}
}
+ // before[bPos] = line
+ // bPos = (bPos+1) % lContext.BeforeContext
+ // bCount = (bCount+1) % lContext.BeforeContext
}
}
}
-func (f readFile) transmittable(lineBytes []byte, length, capacity int, re regex.Regex) (line.Line, bool) {
+func (f readFile) lineTransmittable(rawLine []byte, length, capacity int, re regex.Regex) (line.Line, bool, bool) {
var read line.Line
- if !re.Match(lineBytes) {
+ if !re.Match(rawLine) {
f.updateLineNotMatched()
f.updateLineNotTransmitted()
- return read, false
+ return read, false, false
}
f.updateLineMatched()
// Can we actually send more messages, channel capacity reached?
if f.canSkipLines && length >= capacity {
f.updateLineNotTransmitted()
- return read, false
+ return read, true, false
}
f.updateLineTransmitted()
read = line.Line{
- Content: lineBytes,
+ Content: rawLine,
SourceID: f.globID,
Count: f.totalLineCount(),
TransmittedPerc: f.transmittedPerc(),
}
- return read, true
+ return read, true, true
}
// Check wether log file is truncated. Returns nil if not.