diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2021-10-21 21:28:49 +0300 |
| commit | f4207a55f71bfbcfdc532d5cdd3befaa3474a157 (patch) | |
| tree | ea5e4a2d2a67035f645bdee496ae55a52034178a /internal/clients/maprclient.go | |
| parent | d80d6070557e3a800e3a54967af9eced518f116b (diff) | |
| parent | 739205206d63bf42f4e843b39d04d4c8cd8207c3 (diff) | |
merge develop
Diffstat (limited to 'internal/clients/maprclient.go')
| -rw-r--r-- | internal/clients/maprclient.go | 89 |
1 files changed, 59 insertions, 30 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index feb7e47..246946f 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -9,7 +9,9 @@ import ( "time" "github.com/mimecast/dtail/internal/clients/handlers" - "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/color" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/mapr" "github.com/mimecast/dtail/internal/omode" ) @@ -29,25 +31,25 @@ const ( // MaprClient is used for running mapreduce aggregations on remote files. type MaprClient struct { baseClient - // Query string for mapr aggregations - queryStr string // Global group set for merged mapr aggregation results globalGroup *mapr.GlobalGroupSet // The query object (constructed from queryStr) query *mapr.Query // Additative result or new result every interval run? cumulative bool + // The last result string received + lastResult string } // NewMaprClient returns a new mapreduce client. -func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) { - if queryStr == "" { +func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient, error) { + if args.QueryStr == "" { return nil, errors.New("No mapreduce query specified, use '-query' flag") } - query, err := mapr.NewQuery(queryStr) + query, err := mapr.NewQuery(args.QueryStr) if err != nil { - logger.FatalExit(queryStr, "Can't parse mapr query", err) + dlog.Client.FatalPanic(args.QueryStr, "Can't parse mapr query", err) } // Don't retry connection if in tail mode and no outfile specified. @@ -64,7 +66,7 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* cumulative = args.Mode == omode.MapClient || query.HasOutfile() } - logger.Debug("Cumulative mapreduce mode?", cumulative) + dlog.Client.Debug("Cumulative mapreduce mode?", cumulative) c := MaprClient{ baseClient: baseClient{ @@ -73,7 +75,6 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (* retry: retry, }, query: query, - queryStr: queryStr, cumulative: cumulative, } @@ -99,46 +100,51 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i status = c.baseClient.Start(ctx, statsCh) if c.cumulative { - logger.Debug("Received final mapreduce result") + dlog.Client.Debug("Received final mapreduce result") c.reportResults() } return } +// NEXT: Make this a callback function rather trying to use polymorphism to call +// this. This applies to all clients. It will make the code easier to read. func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } -func (c MaprClient) makeCommands(options map[string]string) (commands []string) { +func (c MaprClient) makeCommands() (commands []string) { commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery)) - modeStr := "cat" if c.Mode == omode.TailClient { modeStr = "tail" } - optionsStr := c.commandOptionsToString(options) for _, file := range strings.Split(c.What, ",") { + regex, err := c.Regex.Serialize() + if err != nil { + dlog.Client.FatalPanic(err) + } if c.Timeout > 0 { - commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, + modeStr, file, regex)) continue } - commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, optionsStr, file, c.Regex.Serialize())) + commands = append(commands, fmt.Sprintf("%s:%s %s %s", + modeStr, c.Args.SerializeOptions(), file, regex)) } - return } func (c *MaprClient) periodicReportResults(ctx context.Context) { rampUpSleep := c.query.Interval / 2 - logger.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep) + dlog.Client.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep) time.Sleep(rampUpSleep) for { select { case <-time.After(c.query.Interval): - logger.Debug("Gathering interim mapreduce result") + dlog.Client.Debug("Gathering interim mapreduce result") c.reportResults() case <-ctx.Done(): return @@ -151,42 +157,65 @@ func (c *MaprClient) reportResults() { c.writeResultsToOutfile() return } - c.printResults() } func (c *MaprClient) printResults() { var result string var err error - var numLines int + var numRows int + rowsLimit := -1 + + if c.query.Limit == -1 { + // Limit output to 10 rows when the result is printed to stdout. + // This can be overriden with the limit clause though. + rowsLimit = 10 + } if c.cumulative { - result, numLines, err = c.globalGroup.Result(c.query) + result, numRows, err = c.globalGroup.Result(c.query, rowsLimit) } else { - result, numLines, err = c.globalGroup.SwapOut().Result(c.query) + result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit) } if err != nil { - logger.FatalExit(err) + dlog.Client.FatalPanic(err) } - if numLines == 0 { - logger.Warn("Empty result set this time...") + if result == c.lastResult { + dlog.Client.Debug("Result hasn't changed compared to last time...") return } + c.lastResult = result - logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) - logger.Raw(result) + if numRows == 0 { + dlog.Client.Debug("Empty result set this time...") + return + } + + rawQuery := c.query.RawQuery + if config.Client.TermColorsEnable { + rawQuery = color.PaintStrWithAttr(rawQuery, + config.Client.TermColors.MaprTable.RawQueryFg, + config.Client.TermColors.MaprTable.RawQueryBg, + config.Client.TermColors.MaprTable.RawQueryAttr) + } + dlog.Client.Raw(rawQuery) + + if rowsLimit > 0 && numRows > rowsLimit { + dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output "+ + "to %d rows! Use 'limit' clause to override!", numRows, rowsLimit)) + } + dlog.Client.Raw(result) } func (c *MaprClient) writeResultsToOutfile() { if c.cumulative { if err := c.globalGroup.WriteResult(c.query); err != nil { - logger.FatalExit(err) + dlog.Client.FatalPanic(err) } return } - if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil { - logger.FatalExit(err) + dlog.Client.FatalPanic(err) } } |
