summaryrefslogtreecommitdiff
path: root/internal/server/handlers/readcommand.go
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-12 19:04:42 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commit2ebe7e9d63ba62c6f19749c39fe0a577d86ca775 (patch)
tree2ae6d11a3cbc82152085a9d7755adef436b3ce46 /internal/server/handlers/readcommand.go
parent842fd5800000bb68d6306a9ecad80a98ed762a2f (diff)
bugfix: dmap skipped the last couple of mapreduce lines
Diffstat (limited to 'internal/server/handlers/readcommand.go')
-rw-r--r--internal/server/handlers/readcommand.go15
1 files changed, 10 insertions, 5 deletions
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 5bab26f..69dd4a5 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/mimecast/dtail/internal/io/fs"
+ "github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
@@ -113,16 +114,20 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege
}
lines := r.server.lines
-
- // Plug in mappreduce engine
- if r.server.aggregate != nil {
- lines = r.server.aggregate.Lines
- }
+ aggregate := r.server.aggregate
for {
+ if aggregate != nil {
+ lines = make(chan line.Line, 100)
+ aggregate.NextLinesCh <- lines
+ }
if err := reader.Start(ctx, lines, re); err != nil {
logger.Error(r.server.user, path, globID, err)
}
+ if aggregate != nil {
+ // Also makes aggregate to Flush
+ close(lines)
+ }
select {
case <-ctx.Done():