diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-02-22 12:06:09 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-02-22 12:06:09 +0000 |
| commit | 6e176034306026b922c1df4231a1807f36cbe460 (patch) | |
| tree | 96405450466b12240302d306938c7af9b19f21ba /internal/server | |
| parent | 4d2ab8e6dd645d345fa26d8a067ad6dc14fc1fce (diff) | |
can start commands in background and also cancel those via drun command
Diffstat (limited to 'internal/server')
| -rw-r--r-- | internal/server/background/background.go | 52 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 36 | ||||
| -rw-r--r-- | internal/server/server.go | 6 |
3 files changed, 54 insertions, 40 deletions
diff --git a/internal/server/background/background.go b/internal/server/background/background.go index 537ccbb..05a502f 100644 --- a/internal/server/background/background.go +++ b/internal/server/background/background.go @@ -2,10 +2,8 @@ package background import ( "context" - "fmt" + "errors" "sync" - - "github.com/mimecast/dtail/internal/io/logger" ) type job struct { @@ -13,27 +11,44 @@ type job struct { done <-chan struct{} } +// Background specifies a job or command run in background on server side. +// This does not require an active DTail client SSH connection/session. type Background struct { - mutex sync.Mutex + mutex *sync.Mutex jobs map[string]job } -func NewBackground() *Background { - return &Background{ - jobs: make(map[string]job), +// New returns a new background manager. +func New() Background { + return Background{ + jobs: make(map[string]job), + mutex: &sync.Mutex{}, } } +// Add a background job. func (b Background) Add(name string, cancel context.CancelFunc, done <-chan struct{}) error { b.mutex.Lock() defer b.mutex.Unlock() if _, ok := b.jobs[name]; ok { - return fmt.Errorf("job '%s' already exists", name) + return errors.New("job already exists") } - logger.Debug("background", name, "adding job") b.jobs[name] = job{cancel, done} + return nil +} + +// Cancel a background job. +func (b Background) Cancel(name string) error { + job, ok := b.get(name) + if !ok { + return errors.New("no job to cancel") + } + + job.cancel() + <-job.done + b.delete(name) return nil } @@ -52,22 +67,3 @@ func (b Background) delete(name string) { delete(b.jobs, name) } - -func (b Background) Stop(name string) { - logger.Debug("background", name, "stopping job") - job, ok := b.get(name) - - if !ok { - logger.Debug("background", name, "no such job") - return - } - - logger.Debug("background", name, "canceling job") - job.cancel() - - logger.Debug("background", name, "waiting for job to complete") - <-job.done - - logger.Debug("background", name, "deleting job") - b.delete(name) -} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index bcd3f85..01e4054 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/base64" + "encoding/hex" "errors" "fmt" "io" @@ -49,11 +50,11 @@ type ServerHandler struct { handlerCtx context.Context done chan struct{} activeCommands int - background *background.Background + background background.Background } // NewServerHandler returns the server handler. -func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) (*ServerHandler, <-chan struct{}) { +func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) (*ServerHandler, <-chan struct{}) { h := ServerHandler{ serverCtx: serverCtx, handlerCtx: handlerCtx, @@ -68,7 +69,7 @@ func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, ca globalServerWaitFor: globalServerWaitFor, regex: ".", user: user, - background: background.NewBackground(), + background: background, } fqdn, err := os.Hostname() @@ -286,12 +287,14 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] case "run": // TODO: Refactor this "run" case, move code to runcommand.go command := newRunCommand(h) + jobName := fmt.Sprintf("%s%%%s", h.user.Name, hash(strings.Join(args[1:], " "))) - checksum := sha256.Sum256([]byte(strings.Join(args, " "))) - name := fmt.Sprintf("%s.%s", h.user.Name, checksum) - - if contains(flags, "background.stop") { - h.background.Stop(name) + if contains(flags, "background.cancel") { + if err := h.background.Cancel(jobName); err != nil { + h.sendServerMessage(logger.Error(h.user, err, jobName, args)) + } else { + h.sendServerMessage(logger.Info(h.user, "job cancelled", jobName)) + } finished() return } @@ -299,17 +302,22 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] done := make(chan struct{}) if contains(flags, "background.start") { - commandCtx, cancel := context.WithTimeout(h.serverCtx, time.Hour) - if err := h.background.Add(name, cancel, done); err != nil { - h.sendServerMessage(logger.Error(h.user, err, args)) + commandCtx, cancel := context.WithCancel(h.serverCtx) + // TODO: For background jobs dont attempt to send data to dtail client as there might be no SSH connection + if err := h.background.Add(jobName, cancel, done); err != nil { + h.sendServerMessage(logger.Error(h.user, err, jobName, args)) finished() return } + go func() { h.globalServerWaitFor <- struct{}{} }() go func() { command.Start(commandCtx, argc, args) close(done) + <-h.globalServerWaitFor }() + + h.sendServerMessage(logger.Info(h.user, jobName, "job started in background")) finished() return } @@ -427,3 +435,9 @@ func contains(haystack []string, needle string) bool { } return false } + +func hash(str string) string { + h := sha256.New() + h.Write([]byte(str)) + return hex.EncodeToString(h.Sum(nil)) +} diff --git a/internal/server/server.go b/internal/server/server.go index 1421540..5ec46e7 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -10,6 +10,7 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/server/background" "github.com/mimecast/dtail/internal/server/handlers" "github.com/mimecast/dtail/internal/ssh/server" user "github.com/mimecast/dtail/internal/user/server" @@ -32,6 +33,8 @@ type Server struct { sched *scheduler // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. shutdownWaitFor chan struct{} + // Background jobs + background background.Background } // New returns a new server. @@ -44,6 +47,7 @@ func New() *Server { tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails), shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), + background: background.New(), } s.sshServerConfig.PasswordCallback = s.backgroundUserCallback @@ -178,7 +182,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch case config.ControlUser: handler, done = handlers.NewControlHandler(handlerCtx, user) default: - handler, done = handlers.NewServerHandler(handlerCtx, ctx, user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor) + handler, done = handlers.NewServerHandler(handlerCtx, ctx, user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor, s.background) } go func() { |
