summaryrefslogtreecommitdiff
path: root/internal/mapr
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-19 13:22:59 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commitfe3e68afd99d8ea246be52893730f987e138ec24 (patch)
tree726e0914730912e0a3b223f7b37facc05ba31140 /internal/mapr
parentabeac87aec44249bf67f1b0eca471a31086265ca (diff)
move args to config package
logger package rewrite as dlog
Diffstat (limited to 'internal/mapr')
-rw-r--r--internal/mapr/aggregateset.go6
-rw-r--r--internal/mapr/client/aggregate.go4
-rw-r--r--internal/mapr/funcs/function.go4
-rw-r--r--internal/mapr/groupset.go4
-rw-r--r--internal/mapr/logformat/default.go1
-rw-r--r--internal/mapr/logformat/generickv.go2
-rw-r--r--internal/mapr/logformat/parser.go3
-rw-r--r--internal/mapr/query.go4
-rw-r--r--internal/mapr/server/aggregate.go22
-rw-r--r--internal/mapr/token.go10
-rw-r--r--internal/mapr/whereclause.go6
-rw-r--r--internal/mapr/wherecondition.go6
12 files changed, 33 insertions, 39 deletions
diff --git a/internal/mapr/aggregateset.go b/internal/mapr/aggregateset.go
index 47f4925..14e6943 100644
--- a/internal/mapr/aggregateset.go
+++ b/internal/mapr/aggregateset.go
@@ -6,7 +6,7 @@ import (
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -37,7 +37,7 @@ func (s *AggregateSet) String() string {
// Merge one aggregate set into this one.
func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
s.Samples += set.Samples
- //logger.Trace("Merge", set)
+ //dlog.Common.Trace("Merge", set)
for _, sc := range query.Select {
storage := sc.FieldStorage
@@ -70,7 +70,7 @@ func (s *AggregateSet) Merge(query *Query, set *AggregateSet) error {
// Serialize the aggregate set so it can be sent over the wire.
func (s *AggregateSet) Serialize(ctx context.Context, groupKey string, ch chan<- string) {
- logger.Trace("Serialising mapr.AggregateSet", s)
+ dlog.Common.Trace("Serialising mapr.AggregateSet", s)
sb := pool.BuilderBuffer.Get().(*strings.Builder)
defer pool.RecycleBuilderBuffer(sb)
diff --git a/internal/mapr/client/aggregate.go b/internal/mapr/client/aggregate.go
index 5cc09a1..d0c1d70 100644
--- a/internal/mapr/client/aggregate.go
+++ b/internal/mapr/client/aggregate.go
@@ -5,7 +5,7 @@ import (
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -52,7 +52,7 @@ func (a *Aggregate) Aggregate(message string) error {
for _, sc := range a.query.Select {
if val, ok := fields[sc.FieldStorage]; ok {
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, true); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
continue
}
addedSamples = true
diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go
index 1a89c3a..0433b9a 100644
--- a/internal/mapr/funcs/function.go
+++ b/internal/mapr/funcs/function.go
@@ -58,9 +58,9 @@ func NewFunctionStack(in string) (FunctionStack, string, error) {
// Call the function stack.
func (fs FunctionStack) Call(str string) string {
for i := len(fs) - 1; i >= 0; i-- {
- //logger.Debug("Call", fs[i].Name, str)
+ //dlog.Common.Debug("Call", fs[i].Name, str)
str = fs[i].call(str)
- //logger.Debug("Call.result", fs[i].Name, str)
+ //dlog.Common.Debug("Call.result", fs[i].Name, str)
}
return str
diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go
index 9bff790..df8c603 100644
--- a/internal/mapr/groupset.go
+++ b/internal/mapr/groupset.go
@@ -11,7 +11,7 @@ import (
"github.com/mimecast/dtail/internal/color"
"github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/protocol"
)
@@ -189,7 +189,7 @@ func (g *GroupSet) WriteResult(query *Query) error {
return err
}
- logger.Info("Writing outfile", query.Outfile)
+ dlog.Common.Info("Writing outfile", query.Outfile)
tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile)
file, err := os.Create(tmpOutfile)
diff --git a/internal/mapr/logformat/default.go b/internal/mapr/logformat/default.go
index c67137e..e0bbc30 100644
--- a/internal/mapr/logformat/default.go
+++ b/internal/mapr/logformat/default.go
@@ -26,6 +26,7 @@ func (p *Parser) MakeFieldsDEFAULT(maprLine string) (map[string]string, error) {
fields["$timeoffset"] = p.timeZoneOffset
fields["$severity"] = splitted[0]
+ fields["$loglevel"] = splitted[0]
// TODO: Parse time like we do at Mimecast
fields["$time"] = splitted[1]
diff --git a/internal/mapr/logformat/generickv.go b/internal/mapr/logformat/generickv.go
index 23d75cb..3769c22 100644
--- a/internal/mapr/logformat/generickv.go
+++ b/internal/mapr/logformat/generickv.go
@@ -21,7 +21,7 @@ func (p *Parser) MakeFieldsGENERIGKV(maprLine string) (map[string]string, error)
for _, kv := range splitted[0:] {
keyAndValue := strings.SplitN(kv, "=", 2)
if len(keyAndValue) != 2 {
- //logger.Debug("Unable to parse key-value token, ignoring it", kv)
+ //dlog.Common.Debug("Unable to parse key-value token, ignoring it", kv)
continue
}
fields[strings.ToLower(keyAndValue[0])] = keyAndValue[1]
diff --git a/internal/mapr/logformat/parser.go b/internal/mapr/logformat/parser.go
index 6582b5f..a352580 100644
--- a/internal/mapr/logformat/parser.go
+++ b/internal/mapr/logformat/parser.go
@@ -8,7 +8,6 @@ import (
"strings"
"time"
- "github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr"
)
@@ -76,12 +75,10 @@ func (p *Parser) MakeFields(maprLine string) (fields map[string]string, err erro
if errInterface == nil {
fields, err = returnValues[0].Interface().(map[string]string), nil
- logger.Trace("parser.MakeFields", fields, err)
return
}
fields, err = returnValues[0].Interface().(map[string]string), errInterface.(error)
- logger.Trace("parser.MakeFields", fields, err)
return
}
diff --git a/internal/mapr/query.go b/internal/mapr/query.go
index 01852da..6c1d849 100644
--- a/internal/mapr/query.go
+++ b/internal/mapr/query.go
@@ -6,8 +6,6 @@ import (
"strconv"
"strings"
"time"
-
- "github.com/mimecast/dtail/internal/io/logger"
)
const (
@@ -67,8 +65,6 @@ func NewQuery(queryStr string) (*Query, error) {
}
err := q.parse(tokens)
-
- logger.Debug(q)
return &q, err
}
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index d11ed7d..767aada 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -9,7 +9,7 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
@@ -40,7 +40,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
fqdn, err := os.Hostname()
if err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
}
s := strings.Split(fqdn, ".")
@@ -55,12 +55,12 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
parserName = query.LogFormat
}
- logger.Info("Creating log format parser", parserName)
+ dlog.Common.Info("Creating log format parser", parserName)
logParser, err := logformat.NewParser(parserName, query)
if err != nil {
- logger.Error("Could not create log format parser. Falling back to 'generic'", err)
+ dlog.Common.Error("Could not create log format parser. Falling back to 'generic'", err)
if logParser, err = logformat.NewParser("generic", query); err != nil {
- logger.FatalExit("Could not create log format parser", err)
+ dlog.Common.FatalPanic("Could not create log format parser", err)
}
}
@@ -153,7 +153,7 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
if err != nil {
// Should fields be ignored anyway?
if err != logformat.IgnoreFieldsErr {
- logger.Error(fields, err)
+ dlog.Common.Error(fields, err)
}
continue
}
@@ -187,7 +187,7 @@ func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map
return
}
if err := a.query.SetClause(fields); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
}
select {
@@ -204,7 +204,7 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan m
group := mapr.NewGroupSet()
serialize := func() {
- logger.Info("Serializing mapreduce result")
+ dlog.Common.Info("Serializing mapreduce result")
group.Serialize(ctx, maprMessages)
group = mapr.NewGroupSet()
}
@@ -243,7 +243,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
for _, sc := range a.query.Select {
if val, ok := fields[sc.Field]; ok {
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
continue
}
addedSample = true
@@ -255,7 +255,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
return
}
- logger.Trace("Aggregated data locally without adding new samples")
+ dlog.Common.Trace("Aggregated data locally without adding new samples")
}
// Serialize all the aggregated data.
@@ -263,7 +263,7 @@ func (a *Aggregate) Serialize(ctx context.Context) {
select {
case a.serialize <- struct{}{}:
case <-time.After(time.Minute):
- logger.Warn("Starting to serialize mapredice data takes over a minute")
+ dlog.Common.Warn("Starting to serialize mapredice data takes over a minute")
case <-ctx.Done():
}
}
diff --git a/internal/mapr/token.go b/internal/mapr/token.go
index 8972188..7c6578b 100644
--- a/internal/mapr/token.go
+++ b/internal/mapr/token.go
@@ -58,12 +58,12 @@ func tokenize(queryStr string) []token {
}
func tokensConsume(tokens []token) ([]token, []token) {
- //logger.Trace("=====================")
+ //dlog.Common.Trace("=====================")
var consumed []token
for i, t := range tokens {
if t.isKeyword() {
- //logger.Trace("keyword", t)
+ //dlog.Common.Trace("keyword", t)
return tokens[i:], consumed
}
// strip escapes, such as ` from `foo`, this allows to use keywords as field names
@@ -73,7 +73,7 @@ func tokensConsume(tokens []token) ([]token, []token) {
}
if t.str[0] == '`' && t.str[length-1] == '`' {
stripped := t.str[1 : length-1]
- //logger.Trace("stripped", stripped)
+ //dlog.Common.Trace("stripped", stripped)
t := token{
str: stripped,
isBareword: t.isBareword,
@@ -81,11 +81,11 @@ func tokensConsume(tokens []token) ([]token, []token) {
consumed = append(consumed, t)
continue
}
- //logger.Trace("bare", token)
+ //dlog.Common.Trace("bare", token)
consumed = append(consumed, t)
}
- //logger.Trace("result", consumed)
+ //dlog.Common.Trace("result", consumed)
return nil, consumed
}
diff --git a/internal/mapr/whereclause.go b/internal/mapr/whereclause.go
index cc1c164..6356d94 100644
--- a/internal/mapr/whereclause.go
+++ b/internal/mapr/whereclause.go
@@ -3,7 +3,7 @@ package mapr
import (
"strconv"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// WhereClause interprets the where clause of the mapreduce query.
@@ -55,7 +55,7 @@ func whereClauseFloatValue(fields map[string]string, str string, float float64,
}
return f, true
default:
- logger.Error("Unexpected argument in 'where' clause", str, float, t)
+ dlog.Common.Error("Unexpected argument in 'where' clause", str, float, t)
return 0, false
}
}
@@ -71,7 +71,7 @@ func whereClauseStringValue(fields map[string]string, str string, t fieldType) (
case String:
return str, true
default:
- logger.Error("Unexpected argument in 'where' clause", str, t)
+ dlog.Common.Error("Unexpected argument in 'where' clause", str, t)
return str, false
}
}
diff --git a/internal/mapr/wherecondition.go b/internal/mapr/wherecondition.go
index 7a60dba..c60c0a5 100644
--- a/internal/mapr/wherecondition.go
+++ b/internal/mapr/wherecondition.go
@@ -6,7 +6,7 @@ import (
"strconv"
"strings"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
)
// QueryOperation determines the mapreduce operation.
@@ -168,7 +168,7 @@ func (wc *whereCondition) floatClause(lValue float64, rValue float64) bool {
case FloatGe:
return lValue >= rValue
default:
- logger.Error("Unknown float operation", lValue, wc.Operation, rValue)
+ dlog.Common.Error("Unknown float operation", lValue, wc.Operation, rValue)
}
return false
@@ -193,7 +193,7 @@ func (wc *whereCondition) stringClause(lValue string, rValue string) bool {
case StringNotHasSuffix:
return !strings.HasSuffix(lValue, rValue)
default:
- logger.Error("Unknown string operation", lValue, wc.Operation, rValue)
+ dlog.Common.Error("Unknown string operation", lValue, wc.Operation, rValue)
}
return false