summaryrefslogtreecommitdiff
path: root/internal/clients/maprclient.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/clients/maprclient.go')
-rw-r--r--internal/clients/maprclient.go89
1 files changed, 59 insertions, 30 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index feb7e47..246946f 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -9,7 +9,9 @@ import (
"time"
"github.com/mimecast/dtail/internal/clients/handlers"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/color"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/omode"
)
@@ -29,25 +31,25 @@ const (
// MaprClient is used for running mapreduce aggregations on remote files.
type MaprClient struct {
baseClient
- // Query string for mapr aggregations
- queryStr string
// Global group set for merged mapr aggregation results
globalGroup *mapr.GlobalGroupSet
// The query object (constructed from queryStr)
query *mapr.Query
// Additative result or new result every interval run?
cumulative bool
+ // The last result string received
+ lastResult string
}
// NewMaprClient returns a new mapreduce client.
-func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) {
- if queryStr == "" {
+func NewMaprClient(args config.Args, maprClientMode MaprClientMode) (*MaprClient, error) {
+ if args.QueryStr == "" {
return nil, errors.New("No mapreduce query specified, use '-query' flag")
}
- query, err := mapr.NewQuery(queryStr)
+ query, err := mapr.NewQuery(args.QueryStr)
if err != nil {
- logger.FatalExit(queryStr, "Can't parse mapr query", err)
+ dlog.Client.FatalPanic(args.QueryStr, "Can't parse mapr query", err)
}
// Don't retry connection if in tail mode and no outfile specified.
@@ -64,7 +66,7 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*
cumulative = args.Mode == omode.MapClient || query.HasOutfile()
}
- logger.Debug("Cumulative mapreduce mode?", cumulative)
+ dlog.Client.Debug("Cumulative mapreduce mode?", cumulative)
c := MaprClient{
baseClient: baseClient{
@@ -73,7 +75,6 @@ func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*
retry: retry,
},
query: query,
- queryStr: queryStr,
cumulative: cumulative,
}
@@ -99,46 +100,51 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
status = c.baseClient.Start(ctx, statsCh)
if c.cumulative {
- logger.Debug("Received final mapreduce result")
+ dlog.Client.Debug("Received final mapreduce result")
c.reportResults()
}
return
}
+// NEXT: Make this a callback function rather trying to use polymorphism to call
+// this. This applies to all clients. It will make the code easier to read.
func (c MaprClient) makeHandler(server string) handlers.Handler {
return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}
-func (c MaprClient) makeCommands(options map[string]string) (commands []string) {
+func (c MaprClient) makeCommands() (commands []string) {
commands = append(commands, fmt.Sprintf("map %s", c.query.RawQuery))
-
modeStr := "cat"
if c.Mode == omode.TailClient {
modeStr = "tail"
}
- optionsStr := c.commandOptionsToString(options)
for _, file := range strings.Split(c.What, ",") {
+ regex, err := c.Regex.Serialize()
+ if err != nil {
+ dlog.Client.FatalPanic(err)
+ }
if c.Timeout > 0 {
- commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout, modeStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("timeout %d %s %s %s", c.Timeout,
+ modeStr, file, regex))
continue
}
- commands = append(commands, fmt.Sprintf("%s:%s %s %s", modeStr, optionsStr, file, c.Regex.Serialize()))
+ commands = append(commands, fmt.Sprintf("%s:%s %s %s",
+ modeStr, c.Args.SerializeOptions(), file, regex))
}
-
return
}
func (c *MaprClient) periodicReportResults(ctx context.Context) {
rampUpSleep := c.query.Interval / 2
- logger.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep)
+ dlog.Client.Debug("Ramp up sleeping before processing mapreduce results", rampUpSleep)
time.Sleep(rampUpSleep)
for {
select {
case <-time.After(c.query.Interval):
- logger.Debug("Gathering interim mapreduce result")
+ dlog.Client.Debug("Gathering interim mapreduce result")
c.reportResults()
case <-ctx.Done():
return
@@ -151,42 +157,65 @@ func (c *MaprClient) reportResults() {
c.writeResultsToOutfile()
return
}
-
c.printResults()
}
func (c *MaprClient) printResults() {
var result string
var err error
- var numLines int
+ var numRows int
+ rowsLimit := -1
+
+ if c.query.Limit == -1 {
+ // Limit output to 10 rows when the result is printed to stdout.
+ // This can be overriden with the limit clause though.
+ rowsLimit = 10
+ }
if c.cumulative {
- result, numLines, err = c.globalGroup.Result(c.query)
+ result, numRows, err = c.globalGroup.Result(c.query, rowsLimit)
} else {
- result, numLines, err = c.globalGroup.SwapOut().Result(c.query)
+ result, numRows, err = c.globalGroup.SwapOut().Result(c.query, rowsLimit)
}
if err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
- if numLines == 0 {
- logger.Warn("Empty result set this time...")
+ if result == c.lastResult {
+ dlog.Client.Debug("Result hasn't changed compared to last time...")
return
}
+ c.lastResult = result
- logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
- logger.Raw(result)
+ if numRows == 0 {
+ dlog.Client.Debug("Empty result set this time...")
+ return
+ }
+
+ rawQuery := c.query.RawQuery
+ if config.Client.TermColorsEnable {
+ rawQuery = color.PaintStrWithAttr(rawQuery,
+ config.Client.TermColors.MaprTable.RawQueryFg,
+ config.Client.TermColors.MaprTable.RawQueryBg,
+ config.Client.TermColors.MaprTable.RawQueryAttr)
+ }
+ dlog.Client.Raw(rawQuery)
+
+ if rowsLimit > 0 && numRows > rowsLimit {
+ dlog.Client.Warn(fmt.Sprintf("Got %d results but limited terminal output "+
+ "to %d rows! Use 'limit' clause to override!", numRows, rowsLimit))
+ }
+ dlog.Client.Raw(result)
}
func (c *MaprClient) writeResultsToOutfile() {
if c.cumulative {
if err := c.globalGroup.WriteResult(c.query); err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
return
}
-
if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil {
- logger.FatalExit(err)
+ dlog.Client.FatalPanic(err)
}
}