From b253eafc1a2c57a0d5b7c35d031d8b8d34cfbabe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20B=C3=BCtow?= Date: Tue, 11 Feb 2020 16:25:55 +0000 Subject: can run scheduled job at time range based on result file already exists or not --- internal/config/server.go | 14 ++++++---- internal/server/scheduler.go | 66 ++++++++++++++++++++++++++++++++++++++------ 2 files changed, 66 insertions(+), 14 deletions(-) diff --git a/internal/config/server.go b/internal/config/server.go index be6d45e..aa1a02e 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -15,12 +15,14 @@ type Permissions struct { // Scheduled allows to configure scheduled mapreduce jobs. type Scheduled struct { - Name string - ConnectionsPerCPU int - Discovery string - Files string - Query string - Servers string + Name string + Enable bool + Files string + Query string + Outfile string + Discovery string `json:",omitempty"` + Servers string + TimeRange [2]int } // ServerConfig represents the server configuration. diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index 3cf55ce..68b2338 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "math/rand" + "os" + "strconv" + "strings" "time" "github.com/mimecast/dtail/internal/clients" @@ -15,7 +18,7 @@ import ( ) const authLength = 64 -const authCharset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@£$%^&*()_+[]" +const authCharset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@$%^&*()_+[]" type scheduler struct { authPayload string @@ -39,9 +42,6 @@ func (s *scheduler) start(ctx context.Context) { select { case <-time.After(time.Second * 10): s.runJobs(ctx) - return - case <-time.After(time.Minute): - s.runJobs(ctx) case <-ctx.Done(): return } @@ -50,17 +50,45 @@ func (s *scheduler) start(ctx context.Context) { func (s *scheduler) runJobs(ctx context.Context) { for _, scheduled := range config.Server.Schedule { + if !scheduled.Enable { + logger.Debug(scheduled.Name, "Not running job as not enabled") + continue + } + + hour, err := strconv.Atoi(time.Now().Format("15")) + if err != nil { + logger.Error(scheduled.Name, "Unable to create scheduled job", err) + continue + } + + if hour < scheduled.TimeRange[0] || hour >= scheduled.TimeRange[1] { + logger.Debug(scheduled.Name, "Not running job out of time range") + continue + } + + files := fillDates(scheduled.Files) + outfile := fillDates(scheduled.Outfile) + + _, err = os.Stat(outfile) + if !os.IsNotExist(err) { + logger.Debug(scheduled.Name, "Not running job as outfile already exists", outfile) + continue + } + args := clients.Args{ - ConnectionsPerCPU: scheduled.ConnectionsPerCPU, + ConnectionsPerCPU: 10, Discovery: scheduled.Discovery, ServersStr: scheduled.Servers, - What: scheduled.Files, + What: files, Mode: omode.MapClient, UserName: config.ScheduledUser, } args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(s.authPayload)) - client, err := clients.NewMaprClient(args, scheduled.Query) + tmpOutfile := fmt.Sprintf("%s.tmp", outfile) + query := fmt.Sprintf("%s outfile %s", scheduled.Query, tmpOutfile) + + client, err := clients.NewMaprClient(args, query) if err != nil { logger.Error(fmt.Sprintf("Unable to create scheduled job %s", scheduled.Name), err) continue @@ -68,7 +96,12 @@ func (s *scheduler) runJobs(ctx context.Context) { logger.Info(fmt.Sprintf("Starting scheduled job %s", scheduled.Name)) status := client.Start(ctx) - logMessage := fmt.Sprintf("Scheduled job %s exited with status %d", scheduled.Name, status) + logMessage := fmt.Sprintf("Job exited with status %d", status) + + if err := os.Rename(tmpOutfile, outfile); err == nil { + logger.Info(scheduled.Name, fmt.Sprintf("Renamed %s to %s", tmpOutfile, outfile)) + } + if status != 0 { logger.Warn(logMessage) continue @@ -76,3 +109,20 @@ func (s *scheduler) runJobs(ctx context.Context) { logger.Info(logMessage) } } + +func fillDates(str string) string { + yyyesterday := time.Now().Add(3 * -24 * time.Hour).Format("20060102") + str = strings.ReplaceAll(str, "$yyyesterday", yyyesterday) + + yyesterday := time.Now().Add(2 * -24 * time.Hour).Format("20060102") + str = strings.ReplaceAll(str, "$yyesterday", yyesterday) + + yesterday := time.Now().Add(1 * -24 * time.Hour).Format("20060102") + str = strings.ReplaceAll(str, "$yesterday", yesterday) + + today := time.Now().Format("20060102") + str = strings.ReplaceAll(str, "$today", today) + + tomorrow := time.Now().Add(1 * 24 * time.Hour).Format("20060102") + return strings.ReplaceAll(str, "$tomorrow", tomorrow) +} -- cgit v1.2.3