diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/maprclient.go | 3 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 102 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 15 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 86 |
4 files changed, 84 insertions, 122 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 2cad15d..68352ea 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -158,7 +158,8 @@ func (c *MaprClient) reportResults() { func (c *MaprClient) printResults() { var result string var err error - var numRows, rowsLimit int + var numRows int + rowsLimit := -1 if c.query.Limit == -1 { // Limit output to 10 rows when the result is printed to stdout. diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index a6d6bb1..d11ed7d 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -19,16 +19,12 @@ import ( // Aggregate is for aggregating mapreduce data on the DTail server side. type Aggregate struct { done *internal.Done - // Log lines to process (parsing MAPREDUCE lines). - Lines chan line.Line + // NextLinesCh can be used to use a new line ch. + NextLinesCh chan chan line.Line // Hostname of the current server (used to populate $hostname field). hostname string // Signals to serialize data. serialize chan struct{} - // Signals to flush data. - flush chan struct{} - // Signals that data has been flushed - flushed chan struct{} // The mapr query query *mapr.Query // The mapr log format parser @@ -69,14 +65,12 @@ func NewAggregate(queryStr string) (*Aggregate, error) { } a := Aggregate{ - done: internal.NewDone(), - Lines: make(chan line.Line, 100), - serialize: make(chan struct{}), - flush: make(chan struct{}), - flushed: make(chan struct{}), - hostname: s[0], - query: query, - parser: logParser, + done: internal.NewDone(), + NextLinesCh: make(chan chan line.Line, 10), + serialize: make(chan struct{}), + hostname: s[0], + query: query, + parser: logParser, } return &a, nil @@ -84,12 +78,11 @@ func NewAggregate(queryStr string) (*Aggregate, error) { // Shutdown the aggregation engine. func (a *Aggregate) Shutdown() { - a.Flush() a.done.Shutdown() } // Start an aggregation. -func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { +func (a *Aggregate) Start(ctx context.Context, maprMessages chan<- string) { myCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -102,16 +95,16 @@ func (a *Aggregate) Start(ctx context.Context, maprLines chan<- string) { } }() - fieldsCh := a.makeFields(myCtx) + fieldsCh := a.fieldsFromLines(myCtx) // Add fields (e.g. via 'set' clause) if len(a.query.Set) > 0 { - fieldsCh = a.addFields(myCtx, fieldsCh) + fieldsCh = a.setAdditionalFields(myCtx, fieldsCh) } // Periodically pre-aggregate data every a.query.Interval seconds. go a.aggregateTimer(myCtx) - a.makeMaprLines(myCtx, fieldsCh, maprLines) + a.aggregateAndSerialize(myCtx, fieldsCh, maprMessages) } func (a *Aggregate) aggregateTimer(ctx context.Context) { @@ -125,23 +118,38 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { } } -func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { - ch := make(chan map[string]string) +func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]string { + fieldsCh := make(chan map[string]string) go func() { - defer close(ch) + defer close(fieldsCh) + var lines chan line.Line + + // Gather first lines channel (first input file) + select { + case lines = <-a.NextLinesCh: + case <-ctx.Done(): + return + } for { select { - case line, ok := <-a.Lines: + case line, ok := <-lines: if !ok { - return + select { + case lines = <-a.NextLinesCh: + // Have a new lines channel (e.g. new input file) + case <-ctx.Done(): + default: + // No new lines channel found. + return + } } maprLine := strings.TrimSpace(line.Content.String()) + fields, err := a.parser.MakeFields(maprLine) pool.RecycleBytesBuffer(line.Content) - fields, err := a.parser.MakeFields(maprLine) if err != nil { // Should fields be ignored anyway? if err != logformat.IgnoreFieldsErr { @@ -155,7 +163,7 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { } select { - case ch <- fields: + case fieldsCh <- fields: case <-ctx.Done(): } case <-ctx.Done(): @@ -164,17 +172,16 @@ func (a *Aggregate) makeFields(ctx context.Context) <-chan map[string]string { } }() - return ch + return fieldsCh } -func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { - ch := make(chan map[string]string) +func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map[string]string) <-chan map[string]string { + newFieldsCh := make(chan map[string]string) go func() { - defer close(ch) + defer close(newFieldsCh) for { - // fieldsCh will be closed via 'makeFields' when ctx is done fields, ok := <-fieldsCh if !ok { return @@ -184,23 +191,22 @@ func (a *Aggregate) addFields(ctx context.Context, fieldsCh <-chan map[string]st } select { - case ch <- fields: + case newFieldsCh <- fields: case <-ctx.Done(): } } }() - return ch + return newFieldsCh } -func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[string]string, maprLines chan<- string) { +func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan map[string]string, maprMessages chan<- string) { group := mapr.NewGroupSet() serialize := func() { logger.Info("Serializing mapreduce result") - group.Serialize(ctx, maprLines) + group.Serialize(ctx, maprMessages) group = mapr.NewGroupSet() - logger.Info("Done serializing mapreduce result") } for { @@ -213,9 +219,6 @@ func (a *Aggregate) makeMaprLines(ctx context.Context, fieldsCh <-chan map[strin a.aggregate(group, fields) case <-a.serialize: serialize() - case <-a.flush: - serialize() - a.flushed <- struct{}{} case <-ctx.Done(): return } @@ -264,24 +267,3 @@ func (a *Aggregate) Serialize(ctx context.Context) { case <-ctx.Done(): } } - -// Flush all data. -func (a *Aggregate) Flush() { - select { - case a.flush <- struct{}{}: - logger.Info("Flushing mapreduce data") - case <-time.After(time.Minute): - logger.Warn("Starting to flush mapreduce data takes over a minute") - return - case <-a.done.Done(): - return - } - - select { - case <-a.flushed: - logger.Info("Done flushing") - case <-time.After(time.Minute): - logger.Warn("Waiting for data to be flushed takes over a minute") - case <-a.done.Done(): - } -} diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 5bab26f..69dd4a5 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -8,6 +8,7 @@ import ( "time" "github.com/mimecast/dtail/internal/io/fs" + "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" @@ -113,16 +114,20 @@ func (r *readCommand) readFile(ctx context.Context, path, globID string, re rege } lines := r.server.lines - - // Plug in mappreduce engine - if r.server.aggregate != nil { - lines = r.server.aggregate.Lines - } + aggregate := r.server.aggregate for { + if aggregate != nil { + lines = make(chan line.Line, 100) + aggregate.NextLinesCh <- lines + } if err := reader.Start(ctx, lines, re); err != nil { logger.Error(r.server.user, path, globID, err) } + if aggregate != nil { + // Also makes aggregate to Flush + close(lines) + } select { case <-ctx.Done(): diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index e74e686..ed19412 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -32,36 +32,35 @@ const ( // the Bi-directional communication between SSH client and server. // This handler implements the handler of the SSH server. type ServerHandler struct { - done *internal.Done - lines chan line.Line - regex string - aggregate *server.Aggregate - aggregatedMessages chan string - serverMessages chan string - hostname string - user *user.User - catLimiter chan struct{} - tailLimiter chan struct{} - ackCloseReceived chan struct{} - activeCommands int32 - activeReaders int32 - quiet bool - readBuf bytes.Buffer - writeBuf bytes.Buffer + done *internal.Done + lines chan line.Line + regex string + aggregate *server.Aggregate + maprMessages chan string + serverMessages chan string + hostname string + user *user.User + catLimiter chan struct{} + tailLimiter chan struct{} + ackCloseReceived chan struct{} + activeCommands int32 + quiet bool + readBuf bytes.Buffer + writeBuf bytes.Buffer } // NewServerHandler returns the server handler. func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler { h := ServerHandler{ - done: internal.NewDone(), - lines: make(chan line.Line, 100), - serverMessages: make(chan string, 10), - aggregatedMessages: make(chan string, 10), - ackCloseReceived: make(chan struct{}), - catLimiter: catLimiter, - tailLimiter: tailLimiter, - regex: ".", - user: user, + done: internal.NewDone(), + lines: make(chan line.Line, 100), + serverMessages: make(chan string, 10), + maprMessages: make(chan string, 10), + ackCloseReceived: make(chan struct{}), + catLimiter: catLimiter, + tailLimiter: tailLimiter, + regex: ".", + user: user, } fqdn, err := os.Hostname() @@ -108,7 +107,7 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { h.readBuf.WriteByte(protocol.MessageDelimiter) n = copy(p, h.readBuf.Bytes()) - case message := <-h.aggregatedMessages: + case message := <-h.maprMessages: // Send mapreduce-aggregated data as a message. h.readBuf.WriteString("AGGREGATE") h.readBuf.WriteString(protocol.FieldDelimiter) @@ -260,14 +259,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] h.shutdown() } } - readerFinished := func() { - if h.decrementActiveReaders() == 0 { - if h.aggregate == nil { - return - } - h.aggregate.Shutdown() - } - } splitted := strings.Split(args[0], ":") commandName := splitted[0] @@ -289,18 +280,14 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] case "grep", "cat": command := newReadCommand(h, omode.CatClient) go func() { - h.incrementActiveReaders() command.Start(ctx, argc, args, 1) - readerFinished() commandFinished() }() case "tail": command := newReadCommand(h, omode.TailClient) go func() { - h.incrementActiveReaders() command.Start(ctx, argc, args, 10) - readerFinished() commandFinished() }() @@ -315,7 +302,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] h.aggregate = aggregate go func() { - command.Start(ctx, h.aggregatedMessages) + command.Start(ctx, h.maprMessages) commandFinished() }() @@ -361,15 +348,11 @@ func (h *ServerHandler) serverMessageC() chan<- string { return h.serverMessages } -func (h *ServerHandler) flush() { - logger.Debug(h.user, "flush()") - - if h.aggregate != nil { - h.aggregate.Flush() - } +func (h *ServerHandler) flushMessages() { + logger.Debug(h.user, "flushMessages()") unsentMessages := func() int { - return len(h.lines) + len(h.serverMessages) + len(h.aggregatedMessages) + return len(h.lines) + len(h.serverMessages) + len(h.maprMessages) } for i := 0; i < 3; i++ { if unsentMessages() == 0 { @@ -385,7 +368,7 @@ func (h *ServerHandler) flush() { func (h *ServerHandler) shutdown() { logger.Debug(h.user, "shutdown()") - h.flush() + h.flushMessages() go func() { select { @@ -413,15 +396,6 @@ func (h *ServerHandler) decrementActiveCommands() int32 { return atomic.LoadInt32(&h.activeCommands) } -func (h *ServerHandler) incrementActiveReaders() { - atomic.AddInt32(&h.activeReaders, 1) -} - -func (h *ServerHandler) decrementActiveReaders() int32 { - atomic.AddInt32(&h.activeReaders, -1) - return atomic.LoadInt32(&h.activeReaders) -} - func readOptions(opts []string) (map[string]string, error) { options := make(map[string]string, len(opts)) |
