summaryrefslogtreecommitdiff
path: root/internal/mapr/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-09-19 13:22:59 +0300
committerPaul Buetow <paul@buetow.org>2021-10-02 12:26:29 +0300
commitfe3e68afd99d8ea246be52893730f987e138ec24 (patch)
tree726e0914730912e0a3b223f7b37facc05ba31140 /internal/mapr/server
parentabeac87aec44249bf67f1b0eca471a31086265ca (diff)
move args to config package
logger package rewrite as dlog
Diffstat (limited to 'internal/mapr/server')
-rw-r--r--internal/mapr/server/aggregate.go22
1 files changed, 11 insertions, 11 deletions
diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go
index d11ed7d..767aada 100644
--- a/internal/mapr/server/aggregate.go
+++ b/internal/mapr/server/aggregate.go
@@ -9,7 +9,7 @@ import (
"github.com/mimecast/dtail/internal"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/line"
- "github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/pool"
"github.com/mimecast/dtail/internal/mapr"
"github.com/mimecast/dtail/internal/mapr/logformat"
@@ -40,7 +40,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
fqdn, err := os.Hostname()
if err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
}
s := strings.Split(fqdn, ".")
@@ -55,12 +55,12 @@ func NewAggregate(queryStr string) (*Aggregate, error) {
parserName = query.LogFormat
}
- logger.Info("Creating log format parser", parserName)
+ dlog.Common.Info("Creating log format parser", parserName)
logParser, err := logformat.NewParser(parserName, query)
if err != nil {
- logger.Error("Could not create log format parser. Falling back to 'generic'", err)
+ dlog.Common.Error("Could not create log format parser. Falling back to 'generic'", err)
if logParser, err = logformat.NewParser("generic", query); err != nil {
- logger.FatalExit("Could not create log format parser", err)
+ dlog.Common.FatalPanic("Could not create log format parser", err)
}
}
@@ -153,7 +153,7 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin
if err != nil {
// Should fields be ignored anyway?
if err != logformat.IgnoreFieldsErr {
- logger.Error(fields, err)
+ dlog.Common.Error(fields, err)
}
continue
}
@@ -187,7 +187,7 @@ func (a *Aggregate) setAdditionalFields(ctx context.Context, fieldsCh <-chan map
return
}
if err := a.query.SetClause(fields); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
}
select {
@@ -204,7 +204,7 @@ func (a *Aggregate) aggregateAndSerialize(ctx context.Context, fieldsCh <-chan m
group := mapr.NewGroupSet()
serialize := func() {
- logger.Info("Serializing mapreduce result")
+ dlog.Common.Info("Serializing mapreduce result")
group.Serialize(ctx, maprMessages)
group = mapr.NewGroupSet()
}
@@ -243,7 +243,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
for _, sc := range a.query.Select {
if val, ok := fields[sc.Field]; ok {
if err := set.Aggregate(sc.FieldStorage, sc.Operation, val, false); err != nil {
- logger.Error(err)
+ dlog.Common.Error(err)
continue
}
addedSample = true
@@ -255,7 +255,7 @@ func (a *Aggregate) aggregate(group *mapr.GroupSet, fields map[string]string) {
return
}
- logger.Trace("Aggregated data locally without adding new samples")
+ dlog.Common.Trace("Aggregated data locally without adding new samples")
}
// Serialize all the aggregated data.
@@ -263,7 +263,7 @@ func (a *Aggregate) Serialize(ctx context.Context) {
select {
case a.serialize <- struct{}{}:
case <-time.After(time.Minute):
- logger.Warn("Starting to serialize mapredice data takes over a minute")
+ dlog.Common.Warn("Starting to serialize mapredice data takes over a minute")
case <-ctx.Done():
}
}