summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-07-03 12:15:59 +0100
committerPaul Buetow <pbuetow@mimecast.com>2020-07-03 12:15:59 +0100
commiteb9c8d4ae7f8fb7e65f912ff4838c7737b5487d0 (patch)
treef5e6c0be15200b18f306878037c3896e1084cf53
parent912f7dce7222d345dc6cc6cc593a45ee7e2e15f8 (diff)
refactor mapr client
-rw-r--r--cmd/dmap/main.go2
-rw-r--r--cmd/dtail/main.go2
-rw-r--r--internal/clients/maprclient.go89
-rw-r--r--internal/config/server.go30
-rw-r--r--internal/mapr/groupset.go19
-rw-r--r--internal/mapr/query.go4
-rw-r--r--internal/server/continuous.go20
-rw-r--r--internal/server/scheduler.go2
-rw-r--r--internal/server/server.go6
9 files changed, 98 insertions, 76 deletions
diff --git a/cmd/dmap/main.go b/cmd/dmap/main.go
index d7b5fae..ae16e97 100644
--- a/cmd/dmap/main.go
+++ b/cmd/dmap/main.go
@@ -73,7 +73,7 @@ func main() {
PrivateKeyPathFile: privateKeyPathFile,
}
- client, err := clients.NewMaprClient(args, queryStr)
+ client, err := clients.NewMaprClient(args, queryStr, clients.DefaultMode)
if err != nil {
panic(err)
}
diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go
index 699784b..3ec1dd4 100644
--- a/cmd/dtail/main.go
+++ b/cmd/dtail/main.go
@@ -106,7 +106,7 @@ func main() {
panic(err)
}
default:
- if client, err = clients.NewMaprClient(args, queryStr); err != nil {
+ if client, err = clients.NewMaprClient(args, queryStr, clients.DefaultMode); err != nil {
panic(err)
}
}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index c4e445b..e28dadb 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -14,6 +14,14 @@ import (
"github.com/mimecast/dtail/internal/omode"
)
+type MaprClientMode int
+
+const (
+ DefaultMode MaprClientMode = iota
+ CumulativeMode MaprClientMode = iota
+ NonCumulativeMode MaprClientMode = iota
+)
+
// MaprClient is used for running mapreduce aggregations on remote files.
type MaprClient struct {
baseClient
@@ -23,12 +31,12 @@ type MaprClient struct {
globalGroup *mapr.GlobalGroupSet
// The query object (constructed from queryStr)
query *mapr.Query
- // Additative result or new result every run?
+ // Additative result or new result every interval run?
cumulative bool
}
// NewMaprClient returns a new mapreduce client.
-func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
+func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) {
if queryStr == "" {
return nil, errors.New("No mapreduce query specified, use '-query' flag")
}
@@ -39,10 +47,18 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
}
// Don't retry connection if in tail mode and no outfile specified.
- retry := args.Mode == omode.TailClient && query.Outfile == ""
-
- // Result is comulative if we are in MapClient mode or with outfile
- cumulative := args.Mode == omode.MapClient || query.Outfile != ""
+ retry := args.Mode == omode.TailClient && !query.HasOutfile()
+
+ var cumulative bool
+ switch maprClientMode {
+ case CumulativeMode:
+ cumulative = true
+ case NonCumulativeMode:
+ cumulative = false
+ default:
+ // Result is comulative if we are in MapClient mode or with outfile
+ cumulative = args.Mode == omode.MapClient || query.HasOutfile()
+ }
c := MaprClient{
baseClient: baseClient{
@@ -72,14 +88,12 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) {
// Start starts the mapreduce client.
func (c *MaprClient) Start(ctx context.Context) (status int) {
- if c.query.Outfile == "" {
- // Only print out periodic results if we don't write an outfile
- go c.periodicPrintResults(ctx)
- }
+ go c.periodicReportResults(ctx)
status = c.baseClient.Start(ctx)
if c.cumulative {
- c.recievedFinalResult()
+ logger.Info("Received final mapreduce result")
+ c.reportResults()
}
return
@@ -108,35 +122,27 @@ func (c MaprClient) makeCommands() (commands []string) {
return
}
-func (c *MaprClient) recievedFinalResult() {
- logger.Info("Received final mapreduce result")
-
- if c.query.Outfile == "" {
- c.printResults()
- return
- }
-
- logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile))
- err := c.globalGroup.WriteResult(c.query)
- if err != nil {
- logger.FatalExit(err)
- return
- }
- logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile))
-}
-
-func (c *MaprClient) periodicPrintResults(ctx context.Context) {
+func (c *MaprClient) periodicReportResults(ctx context.Context) {
for {
select {
case <-time.After(c.query.Interval):
logger.Info("Gathering interim mapreduce result")
- c.printResults()
+ c.reportResults()
case <-ctx.Done():
return
}
}
}
+func (c *MaprClient) reportResults() {
+ if c.query.HasOutfile() {
+ c.writeResultsToOutfile()
+ return
+ }
+
+ c.printResults()
+}
+
func (c *MaprClient) printResults() {
var result string
var err error
@@ -150,8 +156,25 @@ func (c *MaprClient) printResults() {
if err != nil {
logger.FatalExit(err)
}
- if numLines > 0 {
- logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
- logger.Raw(result)
+
+ if numLines == 0 {
+ logger.Info("Empty result set this time...")
+ return
+ }
+
+ logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery))
+ logger.Raw(result)
+}
+
+func (c *MaprClient) writeResultsToOutfile() {
+ if c.cumulative {
+ if err := c.globalGroup.WriteResult(c.query); err != nil {
+ logger.FatalExit(err)
+ }
+ return
+ }
+
+ if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil {
+ logger.FatalExit(err)
}
}
diff --git a/internal/config/server.go b/internal/config/server.go
index 43b4c34..4166fd3 100644
--- a/internal/config/server.go
+++ b/internal/config/server.go
@@ -13,8 +13,8 @@ type Permissions struct {
Users map[string][]string
}
-// Scheduled allows to configure scheduled mapreduce jobs.
-type Scheduled struct {
+// JobCommons summarises common job fields
+type JobCommons struct {
Name string
Enable bool
Files string
@@ -22,21 +22,19 @@ type Scheduled struct {
Outfile string `json:",omitempty"`
Discovery string `json:",omitempty"`
Servers []string `json:",omitempty"`
- TimeRange [2]int
AllowFrom []string `json:",omitempty"`
}
-// Monitoring on log files.
-type Monitoring struct {
- Name string
- Enable bool
- Files string
- Query string
- ExcludeRegexes []string `json:",omitempty"`
- Outfile string `json:",omitempty"`
- Discovery string `json:",omitempty"`
- Servers []string `json:",omitempty"`
- AllowFrom []string `json:",omitempty"`
+// Scheduled allows to configure scheduled mapreduce jobs.
+type Scheduled struct {
+ JobCommons
+ TimeRange [2]int
+}
+
+// Continuous allows to configure continuous running mapreduce jobs.
+type Continuous struct {
+ JobCommons
+ RestartOnDayChange bool `json:",omitempty"`
}
// ServerConfig represents the server configuration.
@@ -59,8 +57,8 @@ type ServerConfig struct {
HostKeyBits int
// Scheduled mapreduce jobs.
Schedule []Scheduled `json:",omitempty"`
- // Monitoring on log files.
- Monitoring []Monitoring `json:",omitempty"`
+ // Continuous mapreduce jobs
+ Continuous []Continuous `json:",omitempty"`
}
// Create a new default server configuration.
diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go
index e9e0d37..6ee2811 100644
--- a/internal/mapr/groupset.go
+++ b/internal/mapr/groupset.go
@@ -5,9 +5,12 @@ import (
"errors"
"fmt"
"io/ioutil"
+ "os"
"sort"
"strconv"
"strings"
+
+ "github.com/mimecast/dtail/internal/io/logger"
)
// GroupSet represents a map of aggregate sets. The group sets
@@ -60,7 +63,7 @@ func (g *GroupSet) Result(query *Query) (string, int, error) {
// WriteResult writes the result to an outfile.
func (g *GroupSet) WriteResult(query *Query) error {
- if query.Outfile == "" {
+ if !query.HasOutfile() {
return errors.New("No outfile specified")
}
@@ -70,7 +73,19 @@ func (g *GroupSet) WriteResult(query *Query) error {
return err
}
- return ioutil.WriteFile(query.Outfile, []byte(result), 0644)
+ logger.Info("Writing outfile", query.Outfile)
+ tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile)
+
+ if err := ioutil.WriteFile(tmpOutfile, []byte(result), 0644); err != nil {
+ return err
+ }
+
+ if err := os.Rename(tmpOutfile, query.Outfile); err != nil {
+ os.Remove(tmpOutfile)
+ return err
+ }
+
+ return nil
}
// Return a nicely formated result of the query from the group set.
diff --git a/internal/mapr/query.go b/internal/mapr/query.go
index be766d1..6dff792 100644
--- a/internal/mapr/query.go
+++ b/internal/mapr/query.go
@@ -68,6 +68,10 @@ func NewQuery(queryStr string) (*Query, error) {
return &q, err
}
+func (q *Query) HasOutfile() bool {
+ return q.Outfile != ""
+}
+
func (q *Query) parse(tokens []token) error {
var found []token
var err error
diff --git a/internal/server/continuous.go b/internal/server/continuous.go
index bdb4079..cf89cdd 100644
--- a/internal/server/continuous.go
+++ b/internal/server/continuous.go
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
- "strconv"
"strings"
"time"
@@ -45,26 +44,9 @@ func (s *continuous) runJobs(ctx context.Context) {
continue
}
- hour, err := strconv.Atoi(time.Now().Format("15"))
- if err != nil {
- logger.Error(job.Name, "Unable to create job job", err)
- continue
- }
-
- if hour < job.TimeRange[0] || hour >= job.TimeRange[1] {
- logger.Debug(job.Name, "Not running job out of time range")
- continue
- }
-
files := fillDates(job.Files)
outfile := fillDates(job.Outfile)
- _, err = os.Stat(outfile)
- if !os.IsNotExist(err) {
- logger.Debug(job.Name, "Not running job as outfile already exists", outfile)
- continue
- }
-
servers := strings.Join(job.Servers, ",")
if servers == "" {
servers = config.Server.SSHBindAddress
@@ -84,7 +66,7 @@ func (s *continuous) runJobs(ctx context.Context) {
tmpOutfile := fmt.Sprintf("%s.tmp", outfile)
query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile)
- client, err := clients.NewMaprClient(args, query)
+ client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode)
if err != nil {
logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err)
continue
diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go
index 1fdbeea..e75077e 100644
--- a/internal/server/scheduler.go
+++ b/internal/server/scheduler.go
@@ -84,7 +84,7 @@ func (s *scheduler) runJobs(ctx context.Context) {
tmpOutfile := fmt.Sprintf("%s.tmp", outfile)
query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile)
- client, err := clients.NewMaprClient(args, query)
+ client, err := clients.NewMaprClient(args, query, clients.CumulativeMode)
if err != nil {
logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err)
continue
diff --git a/internal/server/server.go b/internal/server/server.go
index 0377598..486b8c6 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -33,7 +33,7 @@ type Server struct {
// To run scheduled tasks (if configured)
sched *scheduler
// Mointor log files for pattern (if configured)
- mon *monitoring
+ cont *continuous
// Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed.
shutdownWaitFor chan struct{}
// Background jobs
@@ -50,7 +50,7 @@ func New() *Server {
tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails),
shutdownWaitFor: make(chan struct{}, 1000),
sched: newScheduler(),
- mon: newMonitoring(),
+ cont: newContinuous(),
background: background.New(),
}
@@ -80,7 +80,7 @@ func (s *Server) Start(ctx context.Context) int {
go s.stats.start(ctx)
go s.sched.start(ctx)
- go s.mon.start(ctx)
+ go s.cont.start(ctx)
go s.listenerLoop(ctx, listener)
select {