From 06ece112c0dd20c0c211c538216fe64ebe4045c9 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Thu, 14 Oct 2021 20:10:55 +0300 Subject: add dgrep context integration tests --- internal/io/fs/readfile.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) (limited to 'internal/io/fs') diff --git a/internal/io/fs/readfile.go b/internal/io/fs/readfile.go index 88d467e..28cbe58 100644 --- a/internal/io/fs/readfile.go +++ b/internal/io/fs/readfile.go @@ -99,15 +99,24 @@ func (f readFile) Start(ctx context.Context, ltx lcontext.LContext, rawLines := make(chan *bytes.Buffer, 100) truncate := make(chan struct{}) - var wg sync.WaitGroup - wg.Add(1) + readCtx, readCancel := context.WithCancel(ctx) + var filterWg sync.WaitGroup + filterWg.Add(1) go f.periodicTruncateCheck(ctx, truncate) - go f.filter(ctx, ltx, &wg, rawLines, lines, re) + go func() { + f.filter(ctx, ltx, rawLines, lines, re) + filterWg.Done() + // If the filter stopped, make the reader stop too, no need to read + // more data if there is nothing more the filter wants to filter for! + // E.g. it could be that we only want to filter N matches but not more. + readCancel() + }() - err = f.read(ctx, fd, rawLines, truncate) + err = f.read(readCtx, fd, rawLines, truncate) close(rawLines) - wg.Wait() + // Filter may sends some data still. So wait until it is done here. + filterWg.Wait() return err } @@ -215,10 +224,8 @@ func (f readFile) read(ctx context.Context, fd *os.File, rawLines chan *bytes.Bu // Filter log lines matching a given regular expression. func (f readFile) filter(ctx context.Context, ltx lcontext.LContext, - wg *sync.WaitGroup, rawLines <-chan *bytes.Buffer, lines chan<- line.Line, - re regex.Regex) { + rawLines <-chan *bytes.Buffer, lines chan<- line.Line, re regex.Regex) { - defer wg.Done() // Do we have any kind of local context settings? If so then run the more complex // filterWithLContext method. if ltx.Has() { -- cgit v1.2.3