diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-07-03 12:15:59 +0100 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-07-03 12:15:59 +0100 |
| commit | eb9c8d4ae7f8fb7e65f912ff4838c7737b5487d0 (patch) | |
| tree | f5e6c0be15200b18f306878037c3896e1084cf53 /internal | |
| parent | 912f7dce7222d345dc6cc6cc593a45ee7e2e15f8 (diff) | |
refactor mapr client
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/clients/maprclient.go | 89 | ||||
| -rw-r--r-- | internal/config/server.go | 30 | ||||
| -rw-r--r-- | internal/mapr/groupset.go | 19 | ||||
| -rw-r--r-- | internal/mapr/query.go | 4 | ||||
| -rw-r--r-- | internal/server/continuous.go | 20 | ||||
| -rw-r--r-- | internal/server/scheduler.go | 2 | ||||
| -rw-r--r-- | internal/server/server.go | 6 |
7 files changed, 96 insertions, 74 deletions
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index c4e445b..e28dadb 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -14,6 +14,14 @@ import ( "github.com/mimecast/dtail/internal/omode" ) +type MaprClientMode int + +const ( + DefaultMode MaprClientMode = iota + CumulativeMode MaprClientMode = iota + NonCumulativeMode MaprClientMode = iota +) + // MaprClient is used for running mapreduce aggregations on remote files. type MaprClient struct { baseClient @@ -23,12 +31,12 @@ type MaprClient struct { globalGroup *mapr.GlobalGroupSet // The query object (constructed from queryStr) query *mapr.Query - // Additative result or new result every run? + // Additative result or new result every interval run? cumulative bool } // NewMaprClient returns a new mapreduce client. -func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { +func NewMaprClient(args Args, queryStr string, maprClientMode MaprClientMode) (*MaprClient, error) { if queryStr == "" { return nil, errors.New("No mapreduce query specified, use '-query' flag") } @@ -39,10 +47,18 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { } // Don't retry connection if in tail mode and no outfile specified. - retry := args.Mode == omode.TailClient && query.Outfile == "" - - // Result is comulative if we are in MapClient mode or with outfile - cumulative := args.Mode == omode.MapClient || query.Outfile != "" + retry := args.Mode == omode.TailClient && !query.HasOutfile() + + var cumulative bool + switch maprClientMode { + case CumulativeMode: + cumulative = true + case NonCumulativeMode: + cumulative = false + default: + // Result is comulative if we are in MapClient mode or with outfile + cumulative = args.Mode == omode.MapClient || query.HasOutfile() + } c := MaprClient{ baseClient: baseClient{ @@ -72,14 +88,12 @@ func NewMaprClient(args Args, queryStr string) (*MaprClient, error) { // Start starts the mapreduce client. func (c *MaprClient) Start(ctx context.Context) (status int) { - if c.query.Outfile == "" { - // Only print out periodic results if we don't write an outfile - go c.periodicPrintResults(ctx) - } + go c.periodicReportResults(ctx) status = c.baseClient.Start(ctx) if c.cumulative { - c.recievedFinalResult() + logger.Info("Received final mapreduce result") + c.reportResults() } return @@ -108,35 +122,27 @@ func (c MaprClient) makeCommands() (commands []string) { return } -func (c *MaprClient) recievedFinalResult() { - logger.Info("Received final mapreduce result") - - if c.query.Outfile == "" { - c.printResults() - return - } - - logger.Info(fmt.Sprintf("Writing final mapreduce result to '%s'", c.query.Outfile)) - err := c.globalGroup.WriteResult(c.query) - if err != nil { - logger.FatalExit(err) - return - } - logger.Info(fmt.Sprintf("Wrote final mapreduce result to '%s'", c.query.Outfile)) -} - -func (c *MaprClient) periodicPrintResults(ctx context.Context) { +func (c *MaprClient) periodicReportResults(ctx context.Context) { for { select { case <-time.After(c.query.Interval): logger.Info("Gathering interim mapreduce result") - c.printResults() + c.reportResults() case <-ctx.Done(): return } } } +func (c *MaprClient) reportResults() { + if c.query.HasOutfile() { + c.writeResultsToOutfile() + return + } + + c.printResults() +} + func (c *MaprClient) printResults() { var result string var err error @@ -150,8 +156,25 @@ func (c *MaprClient) printResults() { if err != nil { logger.FatalExit(err) } - if numLines > 0 { - logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) - logger.Raw(result) + + if numLines == 0 { + logger.Info("Empty result set this time...") + return + } + + logger.Raw(fmt.Sprintf("%s\n", c.query.RawQuery)) + logger.Raw(result) +} + +func (c *MaprClient) writeResultsToOutfile() { + if c.cumulative { + if err := c.globalGroup.WriteResult(c.query); err != nil { + logger.FatalExit(err) + } + return + } + + if err := c.globalGroup.SwapOut().WriteResult(c.query); err != nil { + logger.FatalExit(err) } } diff --git a/internal/config/server.go b/internal/config/server.go index 43b4c34..4166fd3 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -13,8 +13,8 @@ type Permissions struct { Users map[string][]string } -// Scheduled allows to configure scheduled mapreduce jobs. -type Scheduled struct { +// JobCommons summarises common job fields +type JobCommons struct { Name string Enable bool Files string @@ -22,21 +22,19 @@ type Scheduled struct { Outfile string `json:",omitempty"` Discovery string `json:",omitempty"` Servers []string `json:",omitempty"` - TimeRange [2]int AllowFrom []string `json:",omitempty"` } -// Monitoring on log files. -type Monitoring struct { - Name string - Enable bool - Files string - Query string - ExcludeRegexes []string `json:",omitempty"` - Outfile string `json:",omitempty"` - Discovery string `json:",omitempty"` - Servers []string `json:",omitempty"` - AllowFrom []string `json:",omitempty"` +// Scheduled allows to configure scheduled mapreduce jobs. +type Scheduled struct { + JobCommons + TimeRange [2]int +} + +// Continuous allows to configure continuous running mapreduce jobs. +type Continuous struct { + JobCommons + RestartOnDayChange bool `json:",omitempty"` } // ServerConfig represents the server configuration. @@ -59,8 +57,8 @@ type ServerConfig struct { HostKeyBits int // Scheduled mapreduce jobs. Schedule []Scheduled `json:",omitempty"` - // Monitoring on log files. - Monitoring []Monitoring `json:",omitempty"` + // Continuous mapreduce jobs + Continuous []Continuous `json:",omitempty"` } // Create a new default server configuration. diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go index e9e0d37..6ee2811 100644 --- a/internal/mapr/groupset.go +++ b/internal/mapr/groupset.go @@ -5,9 +5,12 @@ import ( "errors" "fmt" "io/ioutil" + "os" "sort" "strconv" "strings" + + "github.com/mimecast/dtail/internal/io/logger" ) // GroupSet represents a map of aggregate sets. The group sets @@ -60,7 +63,7 @@ func (g *GroupSet) Result(query *Query) (string, int, error) { // WriteResult writes the result to an outfile. func (g *GroupSet) WriteResult(query *Query) error { - if query.Outfile == "" { + if !query.HasOutfile() { return errors.New("No outfile specified") } @@ -70,7 +73,19 @@ func (g *GroupSet) WriteResult(query *Query) error { return err } - return ioutil.WriteFile(query.Outfile, []byte(result), 0644) + logger.Info("Writing outfile", query.Outfile) + tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile) + + if err := ioutil.WriteFile(tmpOutfile, []byte(result), 0644); err != nil { + return err + } + + if err := os.Rename(tmpOutfile, query.Outfile); err != nil { + os.Remove(tmpOutfile) + return err + } + + return nil } // Return a nicely formated result of the query from the group set. diff --git a/internal/mapr/query.go b/internal/mapr/query.go index be766d1..6dff792 100644 --- a/internal/mapr/query.go +++ b/internal/mapr/query.go @@ -68,6 +68,10 @@ func NewQuery(queryStr string) (*Query, error) { return &q, err } +func (q *Query) HasOutfile() bool { + return q.Outfile != "" +} + func (q *Query) parse(tokens []token) error { var found []token var err error diff --git a/internal/server/continuous.go b/internal/server/continuous.go index bdb4079..cf89cdd 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "strconv" "strings" "time" @@ -45,26 +44,9 @@ func (s *continuous) runJobs(ctx context.Context) { continue } - hour, err := strconv.Atoi(time.Now().Format("15")) - if err != nil { - logger.Error(job.Name, "Unable to create job job", err) - continue - } - - if hour < job.TimeRange[0] || hour >= job.TimeRange[1] { - logger.Debug(job.Name, "Not running job out of time range") - continue - } - files := fillDates(job.Files) outfile := fillDates(job.Outfile) - _, err = os.Stat(outfile) - if !os.IsNotExist(err) { - logger.Debug(job.Name, "Not running job as outfile already exists", outfile) - continue - } - servers := strings.Join(job.Servers, ",") if servers == "" { servers = config.Server.SSHBindAddress @@ -84,7 +66,7 @@ func (s *continuous) runJobs(ctx context.Context) { tmpOutfile := fmt.Sprintf("%s.tmp", outfile) query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile) - client, err := clients.NewMaprClient(args, query) + client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode) if err != nil { logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err) continue diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index 1fdbeea..e75077e 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -84,7 +84,7 @@ func (s *scheduler) runJobs(ctx context.Context) { tmpOutfile := fmt.Sprintf("%s.tmp", outfile) query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile) - client, err := clients.NewMaprClient(args, query) + client, err := clients.NewMaprClient(args, query, clients.CumulativeMode) if err != nil { logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err) continue diff --git a/internal/server/server.go b/internal/server/server.go index 0377598..486b8c6 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -33,7 +33,7 @@ type Server struct { // To run scheduled tasks (if configured) sched *scheduler // Mointor log files for pattern (if configured) - mon *monitoring + cont *continuous // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. shutdownWaitFor chan struct{} // Background jobs @@ -50,7 +50,7 @@ func New() *Server { tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails), shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), - mon: newMonitoring(), + cont: newContinuous(), background: background.New(), } @@ -80,7 +80,7 @@ func (s *Server) Start(ctx context.Context) int { go s.stats.start(ctx) go s.sched.start(ctx) - go s.mon.start(ctx) + go s.cont.start(ctx) go s.listenerLoop(ctx, listener) select { |
