summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-12 19:04:42 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commit2ebe7e9d63ba62c6f19749c39fe0a577d86ca775 (patch)
tree2ae6d11a3cbc82152085a9d7755adef436b3ce46 /internal
parent842fd5800000bb68d6306a9ecad80a98ed762a2f (diff)
bugfix: dmap skipped the last couple of mapreduce lines
Diffstat (limited to 'internal')
-rw-r--r--internal/clients/maprclient.go3
-rw-r--r--internal/mapr/server/aggregate.go102
-rw-r--r--internal/server/handlers/readcommand.go15
-rw-r--r--internal/server/handlers/serverhandler.go86
4 files changed, 84 insertions, 122 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 2cad15d..68352ea 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -158,7 +158,8 @@ func (c *MaprClient) reportResults() {
func (c *MaprClient) printResults() {
var result string
var err error
- var numRows, rowsLimit int
+ var numRows int
+ rowsLimit := -1
if c.query.Limit == -1 {
// Limit output to 10 rows when the result is printed to stdout.
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index a6d6bb1..d11ed7d 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -19,16 +19,12 @@ import (
// Aggregate is for aggregating mapreduce data on the DTail server side.
type Aggregate struct {
done *internal.Done
- // Log lines to process (parsing MAPREDUCE lines).
- Lines chan line.Line
+ // NextLinesCh can be used to use a new line ch.
+ NextLinesCh chan chan line.Line
// Hostname of the current server (used to populate $hostname field).
hostname string
// Signals to serialize data.
serialize chan struct{}
- // Signals to flush data.
- flush chan struct{}
- // Signals that data has been flushed
- flushed chan struct{}
// The mapr query
query *mapr.Query
// The mapr log format parser
@@ -69,14 +65,12 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
}
a := Aggregate{
- done: internal.NewDone(),
- Lines: make(chan line.Line, 100),
- serialize: make(chan struct{}),
- flush: make(chan struct{}),
- flushed: make(chan struct{}),
- hostname: s[0],
- query: query,
- parser: logParser,
+ done: internal.NewDone(),
+ NextLinesCh: make(chan chan line.Line, 10),
+ serialize: make(chan struct{}),
+ hostname: s[0],
+ query: query,
+ parser: logParser,
}
return &a, nil
@@ -84,12 +78,11 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
// Shutdown the aggregation engine.
func (a *Aggregate) Shutdown() {
- a.Flush()
a.done.Shutdown()
}
// Start an aggregation.
-func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
+func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) {
myCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -102,16 +95,16 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) {
}
}()
- fieldsCh := a.makeFields(myCtx)
+ fieldsCh := a.fieldsFromLines(myCtx)
// Add fields (e.g. via 'set' clause)
if len(a.query.Set) > 0 {
- fieldsCh = a.addFields(myCtx, fieldsCh)
+ fieldsCh = a.setAdditionalFields(myCtx, fieldsCh)
}
// Periodically pre-aggregate data every a.query.Interval seconds.
go a.aggregateTimer(myCtx)
- a.makeMaprLines(myCtx, fieldsCh, maprLines)
+ a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages)
}
func (a *Aggregate) aggregateTimer(ctx context.Context) {
@@ -125,23 +118,38 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) {
}
}
-func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
- ch := make(chan map[string]string)
+func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string {
+ fieldsCh := make(chan map[string]string)
go func() {
- defer close(ch)
+ defer close(fieldsCh)
+ var lines chan line.Line
+
+ // Gather first lines channel (first input file)
+ select {
+ case lines = <-a.NextLinesCh:
+ case <-ctx.Done():
+ return
+ }
for {
select {
- case line, ok := <-a.Lines:
+ case line, ok := <-lines:
if !ok {
- return
+ select {
+ case lines = <-a.NextLinesCh:
+ // Have a new lines channel (e.g. new input file)
+ case <-ctx.Done():
+ default:
+ // No new lines channel found.
+ return
+ }
}
maprLine := strings.TrimSpace(line.Content.String())
+ fields, err := a.parser.MakeFields(maprLine)
pool.RecycleBytesBuffer(line.Content)
- fields, err := a.parser.MakeFields(maprLine)
if err != nil {
// Should fields be ignored anyway?
if err != logformat.IgnoreFieldsErr {
@@ -155,7 +163,7 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
}
select {
- case ch <- fields:
+ case fieldsCh <- fields:
case <-ctx.Done():
}
case <-ctx.Done():
@@ -164,17 +172,16 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string {
}
}()
- return ch
+ return fieldsCh
}
-func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string {
- ch := make(chan map[string]string)
+func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string {
+ newFieldsCh := make(chan map[string]string)
go func() {
- defer close(ch)
+ defer close(newFieldsCh)
for {
- // fieldsCh will be closed via 'makeFields' when ctx is done
fields, ok := <-fieldsCh
if !ok {
return
@@ -184,23 +191,22 @@ func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]st
}
select {
- case ch <- fields:
+ case newFieldsCh <- fields:
case <-ctx.Done():
}
}
}()
- return ch
+ return newFieldsCh
}
-func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) {
+func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan map[string]string, maprMessages chan<- string) {
group := mapr.NewGroupSet()
serialize := func() {
logger.Info("Serializing mapreduce result")
- group.Serialize(ctx, maprLines)
+ group.Serialize(ctx, maprMessages)
group = mapr.NewGroupSet()
- logger.Info("Done serializing mapreduce result")
}
for {
@@ -213,9 +219,6 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin
a.aggregate(group, fields)
case <-a.serialize:
serialize()
- case <-a.flush:
- serialize()
- a.flushed <- struct{}{}
case <-ctx.Done():
return
}
@@ -264,24 +267,3 @@ func (a *Aggregate) Serialize(ctx context.Context) {
case <-ctx.Done():
}
}
-
-// Flush all data.
-func (a *Aggregate) Flush() {
- select {
- case a.flush <- struct{}{}:
- logger.Info("Flushing mapreduce data")
- case <-time.After(time.Minute):
- logger.Warn("Starting to flush mapreduce data takes over a minute")
- return
- case <-a.done.Done():
- return
- }
-
- select {
- case <-a.flushed:
- logger.Info("Done flushing")
- case <-time.After(time.Minute):
- logger.Warn("Waiting for data to be flushed takes over a minute")
- case <-a.done.Done():
- }
-}
diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go
index 5bab26f..69dd4a5 100644
--- a/internal/server/handlers/readcommand.go
+++ b/internal/server/handlers/readcommand.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/mimecast/dtail/internal/io/fs"
+ "github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
@@ -113,16 +114,20 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege
}
lines := r.server.lines
-
- // Plug in mappreduce engine
- if r.server.aggregate != nil {
- lines = r.server.aggregate.Lines
- }
+ aggregate := r.server.aggregate
for {
+ if aggregate != nil {
+ lines = make(chan line.Line, 100)
+ aggregate.NextLinesCh <- lines
+ }
if err := reader.Start(ctx, lines, re); err != nil {
logger.Error(r.server.user, path, globID, err)
}
+ if aggregate != nil {
+ // Also makes aggregate to Flush
+ close(lines)
+ }
select {
case <-ctx.Done():
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index e74e686..ed19412 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -32,36 +32,35 @@ const (
// the Bi-directional communication between SSH client and server.
// This handler implements the handler of the SSH server.
type ServerHandler struct {
- done *internal.Done
- lines chan line.Line
- regex string
- aggregate *server.Aggregate
- aggregatedMessages chan string
- serverMessages chan string
- hostname string
- user *user.User
- catLimiter chan struct{}
- tailLimiter chan struct{}
- ackCloseReceived chan struct{}
- activeCommands int32
- activeReaders int32
- quiet bool
- readBuf bytes.Buffer
- writeBuf bytes.Buffer
+ done *internal.Done
+ lines chan line.Line
+ regex string
+ aggregate *server.Aggregate
+ maprMessages chan string
+ serverMessages chan string
+ hostname string
+ user *user.User
+ catLimiter chan struct{}
+ tailLimiter chan struct{}
+ ackCloseReceived chan struct{}
+ activeCommands int32
+ quiet bool
+ readBuf bytes.Buffer
+ writeBuf bytes.Buffer
}
// NewServerHandler returns the server handler.
func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler {
h := ServerHandler{
- done: internal.NewDone(),
- lines: make(chan line.Line, 100),
- serverMessages: make(chan string, 10),
- aggregatedMessages: make(chan string, 10),
- ackCloseReceived: make(chan struct{}),
- catLimiter: catLimiter,
- tailLimiter: tailLimiter,
- regex: ".",
- user: user,
+ done: internal.NewDone(),
+ lines: make(chan line.Line, 100),
+ serverMessages: make(chan string, 10),
+ maprMessages: make(chan string, 10),
+ ackCloseReceived: make(chan struct{}),
+ catLimiter: catLimiter,
+ tailLimiter: tailLimiter,
+ regex: ".",
+ user: user,
}
fqdn, err := os.Hostname()
@@ -108,7 +107,7 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) {
h.readBuf.WriteByte(protocol.MessageDelimiter)
n = copy(p, h.readBuf.Bytes())
- case message := <-h.aggregatedMessages:
+ case message := <-h.maprMessages:
// Send mapreduce-aggregated data as a message.
h.readBuf.WriteString("AGGREGATE")
h.readBuf.WriteString(protocol.FieldDelimiter)
@@ -260,14 +259,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
h.shutdown()
}
}
- readerFinished := func() {
- if h.decrementActiveReaders() == 0 {
- if h.aggregate == nil {
- return
- }
- h.aggregate.Shutdown()
- }
- }
splitted := strings.Split(args[0], ":")
commandName := splitted[0]
@@ -289,18 +280,14 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
case "grep", "cat":
command := newReadCommand(h, omode.CatClient)
go func() {
- h.incrementActiveReaders()
command.Start(ctx, argc, args, 1)
- readerFinished()
commandFinished()
}()
case "tail":
command := newReadCommand(h, omode.TailClient)
go func() {
- h.incrementActiveReaders()
command.Start(ctx, argc, args, 10)
- readerFinished()
commandFinished()
}()
@@ -315,7 +302,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
h.aggregate = aggregate
go func() {
- command.Start(ctx, h.aggregatedMessages)
+ command.Start(ctx, h.maprMessages)
commandFinished()
}()
@@ -361,15 +348,11 @@ func (h *ServerHandler) serverMessageC() chan<- string {
return h.serverMessages
}
-func (h *ServerHandler) flush() {
- logger.Debug(h.user, "flush()")
-
- if h.aggregate != nil {
- h.aggregate.Flush()
- }
+func (h *ServerHandler) flushMessages() {
+ logger.Debug(h.user, "flushMessages()")
unsentMessages := func() int {
- return len(h.lines) + len(h.serverMessages) + len(h.aggregatedMessages)
+ return len(h.lines) + len(h.serverMessages) + len(h.maprMessages)
}
for i := 0; i < 3; i++ {
if unsentMessages() == 0 {
@@ -385,7 +368,7 @@ func (h *ServerHandler) flush() {
func (h *ServerHandler) shutdown() {
logger.Debug(h.user, "shutdown()")
- h.flush()
+ h.flushMessages()
go func() {
select {
@@ -413,15 +396,6 @@ func (h *ServerHandler) decrementActiveCommands() int32 {
return atomic.LoadInt32(&h.activeCommands)
}
-func (h *ServerHandler) incrementActiveReaders() {
- atomic.AddInt32(&h.activeReaders, 1)
-}
-
-func (h *ServerHandler) decrementActiveReaders() int32 {
- atomic.AddInt32(&h.activeReaders, -1)
- return atomic.LoadInt32(&h.activeReaders)
-}
-
func readOptions(opts []string) (map[string]string, error) {
options := make(map[string]string, len(opts))