From c5a0ba7d29da7effa0ae18bffa10fc0be359b8e7 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Fri, 3 Jul 2020 14:13:13 +0100 Subject: bump up version to 3.0.0. can run continuous background mapreduce queries, useful for log file monitorig for example. breaking protocol change which allows to mapreduce aggreate messages containing the default field separator |. add of more unit tests. add logformat mapreduce query keyword. add set mapreduce clause support and support to evaluate built-in functions such as md5sum() and maskdigits(). --- internal/server/continuous.go | 121 +++++++++++++++++++++++++----------------- 1 file changed, 73 insertions(+), 48 deletions(-) (limited to 'internal/server/continuous.go') diff --git a/internal/server/continuous.go b/internal/server/continuous.go index cf89cdd..f3993a1 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -3,7 +3,6 @@ package server import ( "context" "fmt" - "os" "strings" "time" @@ -22,71 +21,97 @@ func newContinuous() *continuous { return &continuous{} } -func (s *continuous) start(ctx context.Context) { - // First run after just 10s! +func (c *continuous) start(ctx context.Context) { + logger.Info("Starting continuous job runner after 10s") time.Sleep(time.Second * 10) - s.runJobs(ctx) - for { - select { - case <-time.After(time.Minute): - s.runJobs(ctx) - case <-ctx.Done(): - return - } - } + c.runJobs(ctx) } -func (s *continuous) runJobs(ctx context.Context) { - for _, job := range config.Server.Schedule { +func (c *continuous) runJobs(ctx context.Context) { + for _, job := range config.Server.Continuous { if !job.Enable { logger.Debug(job.Name, "Not running job as not enabled") continue } - files := fillDates(job.Files) - outfile := fillDates(job.Outfile) + go func(job config.Continuous) { + c.runJob(ctx, job) + for { + select { + // Retry after a minute + case <-time.After(time.Minute): + c.runJob(ctx, job) + case <-ctx.Done(): + return + } + } + }(job) + } +} - servers := strings.Join(job.Servers, ",") - if servers == "" { - servers = config.Server.SSHBindAddress - } +func (c *continuous) runJob(ctx context.Context, job config.Continuous) { + logger.Debug(job.Name, "Processing job") - args := clients.Args{ - ConnectionsPerCPU: 10, - Discovery: job.Discovery, - ServersStr: servers, - What: files, - Mode: omode.MapClient, - UserName: config.BackgroundUser, - } + files := fillDates(job.Files) + outfile := fillDates(job.Outfile) - args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name)) + servers := strings.Join(job.Servers, ",") + if servers == "" { + servers = config.Server.SSHBindAddress + } - tmpOutfile := fmt.Sprintf("%s.tmp", outfile) - query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile) + args := clients.Args{ + ConnectionsPerCPU: 10, + Discovery: job.Discovery, + ServersStr: servers, + What: files, + Mode: omode.TailClient, + UserName: config.ContinuousUser, + } - client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode) - if err != nil { - logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err) - continue - } + args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name)) - jobCtx, cancel := context.WithCancel(ctx) - defer cancel() + query := fmt.Sprintf("%s outfile %s", job.Query, outfile) + client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode) + if err != nil { + logger.Error(fmt.Sprintf("Unable to create job %s", job.Name), err) + return + } - logger.Info(fmt.Sprintf("Starting job job %s", job.Name)) - status := client.Start(jobCtx) - logMessage := fmt.Sprintf("Job exited with status %d", status) + jobCtx, cancel := context.WithCancel(ctx) + defer cancel() - if err := os.Rename(tmpOutfile, outfile); err == nil { - logger.Info(job.Name, fmt.Sprintf("Renamed %s to %s", tmpOutfile, outfile)) - } + if job.RestartOnDayChange { + go func() { + if c.waitForDayChange(ctx) { + logger.Info(fmt.Sprintf("Canceling job %s due to day change", job.Name)) + cancel() + } + }() + } - if status != 0 { - logger.Warn(logMessage) - continue + logger.Info(fmt.Sprintf("Starting job %s", job.Name)) + status := client.Start(jobCtx) + logMessage := fmt.Sprintf("Job exited with status %d", status) + + if status != 0 { + logger.Warn(logMessage) + return + } + logger.Info(logMessage) +} + +func (c *continuous) waitForDayChange(ctx context.Context) bool { + startTime := time.Now() + for { + select { + case <-time.After(time.Second): + if time.Now().Day() != startTime.Day() { + return true + } + case <-ctx.Done(): + return false } - logger.Info(logMessage) } } -- cgit v1.2.3