diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-07-03 12:15:59 +0100 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-07-03 12:15:59 +0100 |
| commit | eb9c8d4ae7f8fb7e65f912ff4838c7737b5487d0 (patch) | |
| tree | f5e6c0be15200b18f306878037c3896e1084cf53 /internal/server | |
| parent | 912f7dce7222d345dc6cc6cc593a45ee7e2e15f8 (diff) | |
refactor mapr client
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/continuous.go | 20 | ||||
| -rw-r--r-- | internal/server/scheduler.go | 2 | ||||
| -rw-r--r-- | internal/server/server.go | 6 |
3 files changed, 5 insertions, 23 deletions
diff --git a/internal/server/continuous.go b/internal/server/continuous.go index bdb4079..cf89cdd 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "os" - "strconv" "strings" "time" @@ -45,26 +44,9 @@ func (s *continuous) runJobs(ctx context.Context) { continue } - hour, err := strconv.Atoi(time.Now().Format("15")) - if err != nil { - logger.Error(job.Name, "Unable to create job job", err) - continue - } - - if hour < job.TimeRange[0] || hour >= job.TimeRange[1] { - logger.Debug(job.Name, "Not running job out of time range") - continue - } - files := fillDates(job.Files) outfile := fillDates(job.Outfile) - _, err = os.Stat(outfile) - if !os.IsNotExist(err) { - logger.Debug(job.Name, "Not running job as outfile already exists", outfile) - continue - } - servers := strings.Join(job.Servers, ",") if servers == "" { servers = config.Server.SSHBindAddress @@ -84,7 +66,7 @@ func (s *continuous) runJobs(ctx context.Context) { tmpOutfile := fmt.Sprintf("%s.tmp", outfile) query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile) - client, err := clients.NewMaprClient(args, query) + client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode) if err != nil { logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err) continue diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index 1fdbeea..e75077e 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -84,7 +84,7 @@ func (s *scheduler) runJobs(ctx context.Context) { tmpOutfile := fmt.Sprintf("%s.tmp", outfile) query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile) - client, err := clients.NewMaprClient(args, query) + client, err := clients.NewMaprClient(args, query, clients.CumulativeMode) if err != nil { logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err) continue diff --git a/internal/server/server.go b/internal/server/server.go index 0377598..486b8c6 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -33,7 +33,7 @@ type Server struct { // To run scheduled tasks (if configured) sched *scheduler // Mointor log files for pattern (if configured) - mon *monitoring + cont *continuous // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. shutdownWaitFor chan struct{} // Background jobs @@ -50,7 +50,7 @@ func New() *Server { tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails), shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), - mon: newMonitoring(), + cont: newContinuous(), background: background.New(), } @@ -80,7 +80,7 @@ func (s *Server) Start(ctx context.Context) int { go s.stats.start(ctx) go s.sched.start(ctx) - go s.mon.start(ctx) + go s.cont.start(ctx) go s.listenerLoop(ctx, listener) select { |
