summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-07-03 12:15:59 +0100
committerPaul Buetow <pbuetow@mimecast.com>2020-07-03 12:15:59 +0100
commiteb9c8d4ae7f8fb7e65f912ff4838c7737b5487d0 (patch)
treef5e6c0be15200b18f306878037c3896e1084cf53 /internal/server
parent912f7dce7222d345dc6cc6cc593a45ee7e2e15f8 (diff)
refactor mapr client
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/continuous.go20
-rw-r--r--internal/server/scheduler.go2
-rw-r--r--internal/server/server.go6
3 files changed, 5 insertions, 23 deletions
diff --git a/internal/server/continuous.go b/internal/server/continuous.go
index bdb4079..cf89cdd 100644
--- a/internal/server/continuous.go
+++ b/internal/server/continuous.go
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"os"
- "strconv"
"strings"
"time"
@@ -45,26 +44,9 @@ func (s *continuous) runJobs(ctx context.Context) {
continue
}
- hour, err := strconv.Atoi(time.Now().Format("15"))
- if err != nil {
- logger.Error(job.Name, "Unable to create job job", err)
- continue
- }
-
- if hour < job.TimeRange[0] || hour >= job.TimeRange[1] {
- logger.Debug(job.Name, "Not running job out of time range")
- continue
- }
-
files := fillDates(job.Files)
outfile := fillDates(job.Outfile)
- _, err = os.Stat(outfile)
- if !os.IsNotExist(err) {
- logger.Debug(job.Name, "Not running job as outfile already exists", outfile)
- continue
- }
-
servers := strings.Join(job.Servers, ",")
if servers == "" {
servers = config.Server.SSHBindAddress
@@ -84,7 +66,7 @@ func (s *continuous) runJobs(ctx context.Context) {
tmpOutfile := fmt.Sprintf("%s.tmp", outfile)
query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile)
- client, err := clients.NewMaprClient(args, query)
+ client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode)
if err != nil {
logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err)
continue
diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go
index 1fdbeea..e75077e 100644
--- a/internal/server/scheduler.go
+++ b/internal/server/scheduler.go
@@ -84,7 +84,7 @@ func (s *scheduler) runJobs(ctx context.Context) {
tmpOutfile := fmt.Sprintf("%s.tmp", outfile)
query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile)
- client, err := clients.NewMaprClient(args, query)
+ client, err := clients.NewMaprClient(args, query, clients.CumulativeMode)
if err != nil {
logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err)
continue
diff --git a/internal/server/server.go b/internal/server/server.go
index 0377598..486b8c6 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -33,7 +33,7 @@ type Server struct {
// To run scheduled tasks (if configured)
sched *scheduler
// Mointor log files for pattern (if configured)
- mon *monitoring
+ cont *continuous
// Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed.
shutdownWaitFor chan struct{}
// Background jobs
@@ -50,7 +50,7 @@ func New() *Server {
tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails),
shutdownWaitFor: make(chan struct{}, 1000),
sched: newScheduler(),
- mon: newMonitoring(),
+ cont: newContinuous(),
background: background.New(),
}
@@ -80,7 +80,7 @@ func (s *Server) Start(ctx context.Context) int {
go s.stats.start(ctx)
go s.sched.start(ctx)
- go s.mon.start(ctx)
+ go s.cont.start(ctx)
go s.listenerLoop(ctx, listener)
select {