summaryrefslogtreecommitdiff
path: root/internal/server/scheduler.go
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-07-03 14:13:13 +0100
committerPaul Buetow <pbuetow@mimecast.com>2020-08-13 11:37:24 +0100
commitc5a0ba7d29da7effa0ae18bffa10fc0be359b8e7 (patch)
treede4874740a5ddeb6eb29c887f6e121c61a1f8f3c /internal/server/scheduler.go
parent8f9f9766cecec4a42ffb4d14ba9b7efc2ed204ad (diff)
bump up version to 3.0.0. can run continuous background mapreduce queries, useful for log file monitorig for example. breaking protocol change which allows to mapreduce aggreate messages containing the default field separator |. add of more unit tests. add logformat mapreduce query keyword. add set mapreduce clause support and support to evaluate built-in functions such as md5sum() and maskdigits().v3.0.0
Diffstat (limited to 'internal/server/scheduler.go')
-rw-r--r--internal/server/scheduler.go17
1 files changed, 6 insertions, 11 deletions
diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go
index e75077e..3345d69 100644
--- a/internal/server/scheduler.go
+++ b/internal/server/scheduler.go
@@ -24,6 +24,7 @@ func newScheduler() *scheduler {
}
func (s *scheduler) start(ctx context.Context) {
+ logger.Info("Starting scheduled job runner after 10s")
// First run after just 10s!
time.Sleep(time.Second * 10)
s.runJobs(ctx)
@@ -47,7 +48,7 @@ func (s *scheduler) runJobs(ctx context.Context) {
hour, err := strconv.Atoi(time.Now().Format("15"))
if err != nil {
- logger.Error(job.Name, "Unable to create job job", err)
+ logger.Error(job.Name, "Unable to create job", err)
continue
}
@@ -76,31 +77,25 @@ func (s *scheduler) runJobs(ctx context.Context) {
ServersStr: servers,
What: files,
Mode: omode.MapClient,
- UserName: config.BackgroundUser,
+ UserName: config.ScheduleUser,
}
args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name))
- tmpOutfile := fmt.Sprintf("%s.tmp", outfile)
- query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile)
-
+ query := fmt.Sprintf("%s outfile %s", job.Query, outfile)
client, err := clients.NewMaprClient(args, query, clients.CumulativeMode)
if err != nil {
- logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err)
+ logger.Error(fmt.Sprintf("Unable to create job %s", job.Name), err)
continue
}
jobCtx, cancel := context.WithCancel(ctx)
defer cancel()
- logger.Info(fmt.Sprintf("Starting job job %s", job.Name))
+ logger.Info(fmt.Sprintf("Starting job %s", job.Name))
status := client.Start(jobCtx)
logMessage := fmt.Sprintf("Job exited with status %d", status)
- if err := os.Rename(tmpOutfile, outfile); err == nil {
- logger.Info(job.Name, fmt.Sprintf("Renamed %s to %s", tmpOutfile, outfile))
- }
-
if status != 0 {
logger.Warn(logMessage)
continue