summaryrefslogtreecommitdiff
path: root/internal/mapr/groupset.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/mapr/groupset.go')
-rw-r--r--internal/mapr/groupset.go291
1 files changed, 209 insertions, 82 deletions
diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go
index b5c8a48..6ffc8b9 100644
--- a/internal/mapr/groupset.go
+++ b/internal/mapr/groupset.go
@@ -4,13 +4,16 @@ import (
"context"
"errors"
"fmt"
- "io/ioutil"
"os"
"sort"
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/color"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/pool"
+ "github.com/mimecast/dtail/internal/protocol"
)
// GroupSet represents a map of aggregate sets. The group sets
@@ -22,6 +25,14 @@ type GroupSet struct {
sets map[string]*AggregateSet
}
+// Internal helper type
+type result struct {
+ groupKey string
+ values []string
+ widths []int
+ orderBy float64
+}
+
// NewGroupSet returns a new empty group set.
func NewGroupSet() *GroupSet {
g := GroupSet{}
@@ -57,28 +68,181 @@ func (g *GroupSet) Serialize(ctx context.Context, ch chan<- string) {
}
// Result returns a nicely formated result of the query from the group set.
-func (g *GroupSet) Result(query *Query) (string, int, error) {
- return g.limitedResult(query, query.Limit, "\t", " ", false)
+func (g *GroupSet) Result(query *Query, rowsLimit int) (string, int, error) {
+ rows, widths, err := g.result(query, true)
+ if err != nil {
+ return "", 0, err
+ }
+ if query.Limit != -1 {
+ rowsLimit = query.Limit
+ }
+
+ sb := pool.BuilderBuffer.Get().(*strings.Builder)
+ defer pool.RecycleBuilderBuffer(sb)
+
+ // Generate header now
+ lastIndex := len(query.Select) - 1
+ for i, sc := range query.Select {
+ format := fmt.Sprintf(" %%%ds ", widths[i])
+ str := fmt.Sprintf(format, sc.FieldStorage)
+ if config.Client.TermColorsEnable {
+ attrs := []color.Attribute{config.Client.TermColors.MaprTable.HeaderAttr}
+ if sc.FieldStorage == query.OrderBy {
+ attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderSortKeyAttr)
+ }
+
+ for _, groupBy := range query.GroupBy {
+ if sc.FieldStorage == groupBy {
+ attrs = append(attrs, config.Client.TermColors.MaprTable.HeaderGroupKeyAttr)
+ break
+ }
+ }
+
+ color.PaintWithAttrs(sb, str,
+ config.Client.TermColors.MaprTable.HeaderFg,
+ config.Client.TermColors.MaprTable.HeaderBg,
+ attrs)
+ } else {
+ sb.WriteString(str)
+ }
+
+ if i == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+
+ for i := 0; i < len(query.Select); i++ {
+ str := fmt.Sprintf("-%s-", strings.Repeat("-", widths[i]))
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, str,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(str)
+ }
+ if i == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.HeaderDelimiterFg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterBg,
+ config.Client.TermColors.MaprTable.HeaderDelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+
+ // And now write the data
+ for i, r := range rows {
+ if i == rowsLimit {
+ break
+ }
+ for j, value := range r.values {
+ format := fmt.Sprintf(" %%%ds ", widths[j])
+ str := fmt.Sprintf(format, value)
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, str,
+ config.Client.TermColors.MaprTable.DataFg,
+ config.Client.TermColors.MaprTable.DataBg,
+ config.Client.TermColors.MaprTable.DataAttr)
+ } else {
+ sb.WriteString(str)
+ }
+
+ if j == lastIndex {
+ continue
+ }
+ if config.Client.TermColorsEnable {
+ color.PaintWithAttr(sb, protocol.FieldDelimiter,
+ config.Client.TermColors.MaprTable.DelimiterFg,
+ config.Client.TermColors.MaprTable.DelimiterBg,
+ config.Client.TermColors.MaprTable.DelimiterAttr)
+ } else {
+ sb.WriteString(protocol.FieldDelimiter)
+ }
+ }
+ sb.WriteString("\n")
+ }
+
+ return sb.String(), len(rows), nil
}
-// WriteResult writes the result to an outfile.
+func (*GroupSet) writeQueryFile(query *Query) error {
+ queryFile := fmt.Sprintf("%s.query", query.Outfile)
+ tmpQueryFile := fmt.Sprintf("%s.tmp", queryFile)
+ dlog.Common.Debug("Writing query file", queryFile)
+
+ fd, err := os.Create(tmpQueryFile)
+ if err != nil {
+ return err
+ }
+ defer fd.Close()
+
+ fd.WriteString(query.RawQuery)
+ os.Rename(tmpQueryFile, queryFile)
+ return nil
+}
+
+// WriteResult writes the result to an CSV outfile.
func (g *GroupSet) WriteResult(query *Query) error {
if !query.HasOutfile() {
return errors.New("No outfile specified")
}
+ if err := g.writeQueryFile(query); err != nil {
+ return err
+ }
- // -1: Don't limit the result, include all data sets
- result, _, err := g.limitedResult(query, query.Limit, "", ",", true)
+ rows, _, err := g.result(query, false)
if err != nil {
return err
}
- logger.Info("Writing outfile", query.Outfile)
+ dlog.Common.Info("Writing outfile", query.Outfile)
tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile)
- if err := ioutil.WriteFile(tmpOutfile, []byte(result), 0644); err != nil {
+ fd, err := os.Create(tmpOutfile)
+ if err != nil {
return err
}
+ defer fd.Close()
+
+ // Generate header now
+ lastIndex := len(query.Select) - 1
+ for i, sc := range query.Select {
+ fd.WriteString(sc.FieldStorage)
+ if i == lastIndex {
+ continue
+ }
+ fd.WriteString(protocol.CSVDelimiter)
+ }
+ fd.WriteString("\n")
+
+ // And now write the data
+ for i, r := range rows {
+ if i == query.Limit {
+ break
+ }
+ for j, value := range r.values {
+ fd.WriteString(value)
+ if j == lastIndex {
+ continue
+ }
+ fd.WriteString(protocol.CSVDelimiter)
+ }
+ fd.WriteString("\n")
+ }
if err := os.Rename(tmpOutfile, query.Outfile); err != nil {
os.Remove(tmpOutfile)
@@ -88,32 +252,21 @@ func (g *GroupSet) WriteResult(query *Query) error {
return nil
}
-// Return a nicely formated result of the query from the group set.
-func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSeparator string, addHeader bool) (string, int, error) {
- type result struct {
- groupKey string
- resultStr string
- orderBy float64
- }
-
- var resultSlice []result
+// Return a sorted result slice of the query from the group set.
+func (g *GroupSet) result(query *Query, gatherWidths bool) ([]result, []int, error) {
+ var rows []result
+ widths := make([]int, len(query.Select))
+ var valueStr string
+ var value float64
for groupKey, set := range g.sets {
- var sb strings.Builder
r := result{groupKey: groupKey}
- lastIndex := len(query.Select) - 1
for i, sc := range query.Select {
- storage := sc.FieldStorage
- orderByThis := storage == query.OrderBy
-
switch sc.Operation {
case Count:
- value := set.FValues[storage]
- sb.WriteString(fmt.Sprintf("%d", int(value)))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage]
+ valueStr = fmt.Sprintf("%d", int(value))
case Len:
fallthrough
case Sum:
@@ -121,74 +274,48 @@ func (g *GroupSet) limitedResult(query *Query, limit int, lineStarter, fieldSepa
case Min:
fallthrough
case Max:
- value := set.FValues[storage]
- sb.WriteString(fmt.Sprintf("%f", value))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage]
+ valueStr = fmt.Sprintf("%f", value)
case Last:
- value := set.SValues[storage]
- if orderByThis {
- f, err := strconv.ParseFloat(value, 64)
- if err == nil {
- r.orderBy = f
- }
- }
- sb.WriteString(value)
+ valueStr = set.SValues[sc.FieldStorage]
+ value, _ = strconv.ParseFloat(valueStr, 64)
case Avg:
- value := set.FValues[storage] / float64(set.Samples)
- sb.WriteString(fmt.Sprintf("%f", value))
- if orderByThis {
- r.orderBy = value
- }
+ value = set.FValues[sc.FieldStorage] / float64(set.Samples)
+ valueStr = fmt.Sprintf("%f", value)
default:
- return "", 0, fmt.Errorf("Unknown aggregation method '%v'", sc.Operation)
+ return rows, widths, fmt.Errorf("Unknown aggregation method '%v'",
+ sc.Operation)
}
- if i != lastIndex {
- sb.WriteString(fieldSeparator)
+
+ if sc.FieldStorage == query.OrderBy {
+ r.orderBy = value
}
- }
+ r.values = append(r.values, valueStr)
- r.resultStr = sb.String()
- resultSlice = append(resultSlice, r)
+ if !gatherWidths {
+ continue
+ }
+ if widths[i] < len(sc.FieldStorage) {
+ widths[i] = len(sc.FieldStorage)
+ }
+ if widths[i] < len(valueStr) {
+ widths[i] = len(valueStr)
+ }
+ }
+ rows = append(rows, r)
}
if query.OrderBy != "" {
if query.ReverseOrder {
- sort.SliceStable(resultSlice, func(i, j int) bool {
- return resultSlice[i].orderBy < resultSlice[j].orderBy
+ sort.SliceStable(rows, func(i, j int) bool {
+ return rows[i].orderBy < rows[j].orderBy
})
} else {
- sort.SliceStable(resultSlice, func(i, j int) bool {
- return resultSlice[i].orderBy > resultSlice[j].orderBy
+ sort.SliceStable(rows, func(i, j int) bool {
+ return rows[i].orderBy > rows[j].orderBy
})
}
}
- var sb strings.Builder
-
- // Write header first
- if addHeader {
- lastIndex := len(query.Select) - 1
- sb.WriteString(lineStarter)
- for i, sc := range query.Select {
- sb.WriteString(sc.FieldStorage)
- if i != lastIndex {
- sb.WriteString(fieldSeparator)
- }
- }
- sb.WriteString("\n")
- }
-
- // And now write the data
- for i, r := range resultSlice {
- if i == limit {
- break
- }
- sb.WriteString(lineStarter)
- sb.WriteString(r.resultStr)
- sb.WriteString("\n")
- }
-
- return sb.String(), len(resultSlice), nil
+ return rows, widths, nil
}