diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-07-03 14:13:13 +0100 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-08-13 11:37:24 +0100 |
| commit | c5a0ba7d29da7effa0ae18bffa10fc0be359b8e7 (patch) | |
| tree | de4874740a5ddeb6eb29c887f6e121c61a1f8f3c /internal/server | |
| parent | 8f9f9766cecec4a42ffb4d14ba9b7efc2ed204ad (diff) | |
bump up version to 3.0.0. can run continuous background mapreduce queries, useful for log file monitorig for example. breaking protocol change which allows to mapreduce aggreate messages containing the default field separator |. add of more unit tests. add logformat mapreduce query keyword. add set mapreduce clause support and support to evaluate built-in functions such as md5sum() and maskdigits().v3.0.0
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/continuous.go | 121 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 4 | ||||
| -rw-r--r-- | internal/server/scheduler.go | 17 | ||||
| -rw-r--r-- | internal/server/server.go | 67 |
4 files changed, 122 insertions, 87 deletions
diff --git a/internal/server/continuous.go b/internal/server/continuous.go index cf89cdd..f3993a1 100644 --- a/internal/server/continuous.go +++ b/internal/server/continuous.go @@ -3,7 +3,6 @@ package server import ( "context" "fmt" - "os" "strings" "time" @@ -22,71 +21,97 @@ func newContinuous() *continuous { return &continuous{} } -func (s *continuous) start(ctx context.Context) { - // First run after just 10s! +func (c *continuous) start(ctx context.Context) { + logger.Info("Starting continuous job runner after 10s") time.Sleep(time.Second * 10) - s.runJobs(ctx) - for { - select { - case <-time.After(time.Minute): - s.runJobs(ctx) - case <-ctx.Done(): - return - } - } + c.runJobs(ctx) } -func (s *continuous) runJobs(ctx context.Context) { - for _, job := range config.Server.Schedule { +func (c *continuous) runJobs(ctx context.Context) { + for _, job := range config.Server.Continuous { if !job.Enable { logger.Debug(job.Name, "Not running job as not enabled") continue } - files := fillDates(job.Files) - outfile := fillDates(job.Outfile) + go func(job config.Continuous) { + c.runJob(ctx, job) + for { + select { + // Retry after a minute + case <-time.After(time.Minute): + c.runJob(ctx, job) + case <-ctx.Done(): + return + } + } + }(job) + } +} - servers := strings.Join(job.Servers, ",") - if servers == "" { - servers = config.Server.SSHBindAddress - } +func (c *continuous) runJob(ctx context.Context, job config.Continuous) { + logger.Debug(job.Name, "Processing job") - args := clients.Args{ - ConnectionsPerCPU: 10, - Discovery: job.Discovery, - ServersStr: servers, - What: files, - Mode: omode.MapClient, - UserName: config.BackgroundUser, - } + files := fillDates(job.Files) + outfile := fillDates(job.Outfile) - args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name)) + servers := strings.Join(job.Servers, ",") + if servers == "" { + servers = config.Server.SSHBindAddress + } - tmpOutfile := fmt.Sprintf("%s.tmp", outfile) - query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile) + args := clients.Args{ + ConnectionsPerCPU: 10, + Discovery: job.Discovery, + ServersStr: servers, + What: files, + Mode: omode.TailClient, + UserName: config.ContinuousUser, + } - client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode) - if err != nil { - logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err) - continue - } + args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name)) - jobCtx, cancel := context.WithCancel(ctx) - defer cancel() + query := fmt.Sprintf("%s outfile %s", job.Query, outfile) + client, err := clients.NewMaprClient(args, query, clients.NonCumulativeMode) + if err != nil { + logger.Error(fmt.Sprintf("Unable to create job %s", job.Name), err) + return + } - logger.Info(fmt.Sprintf("Starting job job %s", job.Name)) - status := client.Start(jobCtx) - logMessage := fmt.Sprintf("Job exited with status %d", status) + jobCtx, cancel := context.WithCancel(ctx) + defer cancel() - if err := os.Rename(tmpOutfile, outfile); err == nil { - logger.Info(job.Name, fmt.Sprintf("Renamed %s to %s", tmpOutfile, outfile)) - } + if job.RestartOnDayChange { + go func() { + if c.waitForDayChange(ctx) { + logger.Info(fmt.Sprintf("Canceling job %s due to day change", job.Name)) + cancel() + } + }() + } - if status != 0 { - logger.Warn(logMessage) - continue + logger.Info(fmt.Sprintf("Starting job %s", job.Name)) + status := client.Start(jobCtx) + logMessage := fmt.Sprintf("Job exited with status %d", status) + + if status != 0 { + logger.Warn(logMessage) + return + } + logger.Info(logMessage) +} + +func (c *continuous) waitForDayChange(ctx context.Context) bool { + startTime := time.Now() + for { + select { + case <-time.After(time.Second): + if time.Now().Day() != startTime.Day() { + return true + } + case <-ctx.Done(): + return false } - logger.Info(logMessage) } } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 939388c..9b52c85 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -101,7 +101,7 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { case message := <-h.aggregatedMessages: // Send mapreduce-aggregated data as a message. - data := fmt.Sprintf("AGGREGATE|%s|%s\n", h.hostname, message) + data := fmt.Sprintf("AGGREGATE➔%s➔%s\n", h.hostname, message) wholePayload := []byte(data) n = copy(p, wholePayload) return @@ -192,7 +192,7 @@ func (h *ServerHandler) handleProtocolVersion(args []string) ([]string, int, err } if args[1] != version.ProtocolCompat { - err := fmt.Errorf("server with protool version '%s' but client with '%s', please update DTail", version.ProtocolCompat, args[1]) + err := fmt.Errorf("server with protocol version '%s' but client with '%s', please update DTail", version.ProtocolCompat, args[1]) return args, argc, err } diff --git a/internal/server/scheduler.go b/internal/server/scheduler.go index e75077e..3345d69 100644 --- a/internal/server/scheduler.go +++ b/internal/server/scheduler.go @@ -24,6 +24,7 @@ func newScheduler() *scheduler { } func (s *scheduler) start(ctx context.Context) { + logger.Info("Starting scheduled job runner after 10s") // First run after just 10s! time.Sleep(time.Second * 10) s.runJobs(ctx) @@ -47,7 +48,7 @@ func (s *scheduler) runJobs(ctx context.Context) { hour, err := strconv.Atoi(time.Now().Format("15")) if err != nil { - logger.Error(job.Name, "Unable to create job job", err) + logger.Error(job.Name, "Unable to create job", err) continue } @@ -76,31 +77,25 @@ func (s *scheduler) runJobs(ctx context.Context) { ServersStr: servers, What: files, Mode: omode.MapClient, - UserName: config.BackgroundUser, + UserName: config.ScheduleUser, } args.SSHAuthMethods = append(args.SSHAuthMethods, gossh.Password(job.Name)) - tmpOutfile := fmt.Sprintf("%s.tmp", outfile) - query := fmt.Sprintf("%s outfile %s", job.Query, tmpOutfile) - + query := fmt.Sprintf("%s outfile %s", job.Query, outfile) client, err := clients.NewMaprClient(args, query, clients.CumulativeMode) if err != nil { - logger.Error(fmt.Sprintf("Unable to create job job %s", job.Name), err) + logger.Error(fmt.Sprintf("Unable to create job %s", job.Name), err) continue } jobCtx, cancel := context.WithCancel(ctx) defer cancel() - logger.Info(fmt.Sprintf("Starting job job %s", job.Name)) + logger.Info(fmt.Sprintf("Starting job %s", job.Name)) status := client.Start(jobCtx) logMessage := fmt.Sprintf("Job exited with status %d", status) - if err := os.Rename(tmpOutfile, outfile); err == nil { - logger.Info(job.Name, fmt.Sprintf("Renamed %s to %s", tmpOutfile, outfile)) - } - if status != 0 { logger.Warn(logMessage) continue diff --git a/internal/server/server.go b/internal/server/server.go index 486b8c6..693c48d 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -54,7 +54,7 @@ func New() *Server { background: background.New(), } - s.sshServerConfig.PasswordCallback = s.backgroundUserCallback + s.sshServerConfig.PasswordCallback = s.Callback s.sshServerConfig.PublicKeyCallback = server.PublicKeyCallback private, err := gossh.ParsePrivateKey(server.PrivateHostKey()) @@ -241,44 +241,59 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch return nil } -func (s *Server) backgroundUserCallback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Permissions, error) { +// Callback for SSH authentication. +func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Permissions, error) { user := user.New(c.User(), c.RemoteAddr().String()) authInfo := string(authPayload) - if user.Name == config.ControlUser && authInfo == config.ControlUser { - logger.Debug(user, "Granting permissions to control user") - return nil, nil - } + splitted := strings.Split(c.RemoteAddr().String(), ":") + remoteIP := splitted[0] - if user.Name == config.BackgroundUser && s.backgroundJobUserCanHaveSSHSession(c.RemoteAddr().String(), user, authInfo) { - logger.Debug(user, "Granting SSH connection to background user") - return nil, nil + switch user.Name { + case config.ControlUser: + if authInfo == config.ControlUser { + logger.Debug(user, "Granting permissions to control user") + return nil, nil + } + case config.ScheduleUser: + for _, job := range config.Server.Schedule { + if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) { + logger.Debug(user, "Granting SSH connection") + return nil, nil + } + } + case config.ContinuousUser: + for _, job := range config.Server.Continuous { + if s.backgroundCanSSH(user, authInfo, remoteIP, job.Name, job.AllowFrom) { + logger.Debug(user, "Granting SSH connection") + return nil, nil + } + } + default: } return nil, fmt.Errorf("user %s not authorized", user) } -func (s *Server) backgroundJobUserCanHaveSSHSession(addr string, user *user.User, jobName string) bool { - logger.Debug("backgroundJobUserCanHaveSSHSession", user, jobName) - splitted := strings.Split(addr, ":") - ip := splitted[0] +func (s *Server) backgroundCanSSH(user *user.User, jobName, remoteIP, allowedJobName string, allowFrom []string) bool { + logger.Debug("backgroundCanSSH", user, jobName, remoteIP, allowedJobName, allowFrom) - for _, job := range config.Server.Schedule { - if job.Name != jobName { + if jobName != allowedJobName { + logger.Debug(user, jobName, "backgroundCanSSH", "Job name does not match, skipping to next one...", allowedJobName) + return false + } + + for _, myAddr := range allowFrom { + ips, err := net.LookupIP(myAddr) + if err != nil { + logger.Debug(user, jobName, "backgroundCanSSH", "Unable to lookup IP address for allowed hosts lookup, skipping to next one...", myAddr, err) continue } - for _, myAddr := range job.AllowFrom { - myIPs, err := net.LookupIP(myAddr) - if err != nil { - logger.Error(user, myAddr, err) - continue - } - for _, myIP := range myIPs { - logger.Debug("backgroundJobUserCanHaveSSHSession", "Comparing IP addresses", ip, myIP.String()) - if ip == myIP.String() { - return true - } + for _, ip := range ips { + logger.Debug(user, jobName, "backgroundCanSSH", "Comparing IP addresses", remoteIP, ip.String()) + if remoteIP == ip.String() { + return true } } } |
