diff options
| author | Paul Bütow <pbuetow@mimecast.com> | 2020-02-22 12:06:09 +0000 |
|---|---|---|
| committer | Paul Bütow <pbuetow@mimecast.com> | 2020-02-22 12:06:09 +0000 |
| commit | 6e176034306026b922c1df4231a1807f36cbe460 (patch) | |
| tree | 96405450466b12240302d306938c7af9b19f21ba | |
| parent | 4d2ab8e6dd645d345fa26d8a067ad6dc14fc1fce (diff) | |
can start commands in background and also cancel those via drun command
| -rw-r--r-- | cmd/drun/main.go | 6 | ||||
| -rw-r--r-- | internal/clients/runclient.go | 21 | ||||
| -rw-r--r-- | internal/server/background/background.go | 52 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 36 | ||||
| -rw-r--r-- | internal/server/server.go | 6 |
5 files changed, 76 insertions, 45 deletions
diff --git a/cmd/drun/main.go b/cmd/drun/main.go index e11900e..1f0e0fe 100644 --- a/cmd/drun/main.go +++ b/cmd/drun/main.go @@ -23,6 +23,8 @@ func main() { var discovery string var displayVersion bool var noColor bool + var background bool + var cancel bool var serversStr string var silentEnable bool var sshPort int @@ -36,6 +38,8 @@ func main() { flag.BoolVar(&noColor, "noColor", false, "Disable ANSII terminal colors") flag.BoolVar(&silentEnable, "silent", false, "Reduce output") flag.BoolVar(&trustAllHosts, "trustAllHosts", false, "Auto trust all unknown host keys") + flag.BoolVar(&background, "background", false, "Command starts in background on the server") + flag.BoolVar(&cancel, "cancel", false, "Command will be cancelled if it runs in background") flag.IntVar(&connectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently") flag.IntVar(&sshPort, "port", 2222, "SSH server port") flag.IntVar(&timeout, "timeout", 0, "Command execution timeout") @@ -69,7 +73,7 @@ func main() { Timeout: timeout, } - client, err := clients.NewRunClient(args) + client, err := clients.NewRunClient(args, background, cancel) if err != nil { panic(err) } diff --git a/internal/clients/runclient.go b/internal/clients/runclient.go index e3be616..c2f6f62 100644 --- a/internal/clients/runclient.go +++ b/internal/clients/runclient.go @@ -11,10 +11,12 @@ import ( // RunClient is a client to run various commands on the server. type RunClient struct { baseClient + background bool + cancel bool } // NewRunClient returns a new cat client. -func NewRunClient(args Args) (*RunClient, error) { +func NewRunClient(args Args, background, cancel bool) (*RunClient, error) { args.Mode = omode.RunClient c := RunClient{ @@ -23,6 +25,8 @@ func NewRunClient(args Args) (*RunClient, error) { throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()), retry: false, }, + background: background, + cancel: cancel, } c.init(c) @@ -34,12 +38,21 @@ func (c RunClient) makeHandler(server string) handlers.Handler { } func (c RunClient) makeCommands() (commands []string) { - // Send "run COMMAND" to server! if c.Timeout > 0 { - commands = append(commands, fmt.Sprintf("timeout %d %s %s", c.Timeout, c.Mode.String(), c.What)) + commands = append(commands, fmt.Sprintf("timeout %d run%s %s", c.Timeout, c.flags(), c.What)) return } - commands = append(commands, fmt.Sprintf("%s %s", c.Mode.String(), c.What)) + commands = append(commands, fmt.Sprintf("run%s %s", c.flags(), c.What)) return } + +func (c RunClient) flags() string { + if c.background { + return ":background.start" + } + if c.cancel { + return ":background.cancel" + } + return "" +} diff --git a/internal/server/background/background.go b/internal/server/background/background.go index 537ccbb..05a502f 100644 --- a/internal/server/background/background.go +++ b/internal/server/background/background.go @@ -2,10 +2,8 @@ package background import ( "context" - "fmt" + "errors" "sync" - - "github.com/mimecast/dtail/internal/io/logger" ) type job struct { @@ -13,27 +11,44 @@ type job struct { done <-chan struct{} } +// Background specifies a job or command run in background on server side. +// This does not require an active DTail client SSH connection/session. type Background struct { - mutex sync.Mutex + mutex *sync.Mutex jobs map[string]job } -func NewBackground() *Background { - return &Background{ - jobs: make(map[string]job), +// New returns a new background manager. +func New() Background { + return Background{ + jobs: make(map[string]job), + mutex: &sync.Mutex{}, } } +// Add a background job. func (b Background) Add(name string, cancel context.CancelFunc, done <-chan struct{}) error { b.mutex.Lock() defer b.mutex.Unlock() if _, ok := b.jobs[name]; ok { - return fmt.Errorf("job '%s' already exists", name) + return errors.New("job already exists") } - logger.Debug("background", name, "adding job") b.jobs[name] = job{cancel, done} + return nil +} + +// Cancel a background job. +func (b Background) Cancel(name string) error { + job, ok := b.get(name) + if !ok { + return errors.New("no job to cancel") + } + + job.cancel() + <-job.done + b.delete(name) return nil } @@ -52,22 +67,3 @@ func (b Background) delete(name string) { delete(b.jobs, name) } - -func (b Background) Stop(name string) { - logger.Debug("background", name, "stopping job") - job, ok := b.get(name) - - if !ok { - logger.Debug("background", name, "no such job") - return - } - - logger.Debug("background", name, "canceling job") - job.cancel() - - logger.Debug("background", name, "waiting for job to complete") - <-job.done - - logger.Debug("background", name, "deleting job") - b.delete(name) -} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index bcd3f85..01e4054 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/base64" + "encoding/hex" "errors" "fmt" "io" @@ -49,11 +50,11 @@ type ServerHandler struct { handlerCtx context.Context done chan struct{} activeCommands int - background *background.Background + background background.Background } // NewServerHandler returns the server handler. -func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) (*ServerHandler, <-chan struct{}) { +func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) (*ServerHandler, <-chan struct{}) { h := ServerHandler{ serverCtx: serverCtx, handlerCtx: handlerCtx, @@ -68,7 +69,7 @@ func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, ca globalServerWaitFor: globalServerWaitFor, regex: ".", user: user, - background: background.NewBackground(), + background: background, } fqdn, err := os.Hostname() @@ -286,12 +287,14 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] case "run": // TODO: Refactor this "run" case, move code to runcommand.go command := newRunCommand(h) + jobName := fmt.Sprintf("%s%%%s", h.user.Name, hash(strings.Join(args[1:], " "))) - checksum := sha256.Sum256([]byte(strings.Join(args, " "))) - name := fmt.Sprintf("%s.%s", h.user.Name, checksum) - - if contains(flags, "background.stop") { - h.background.Stop(name) + if contains(flags, "background.cancel") { + if err := h.background.Cancel(jobName); err != nil { + h.sendServerMessage(logger.Error(h.user, err, jobName, args)) + } else { + h.sendServerMessage(logger.Info(h.user, "job cancelled", jobName)) + } finished() return } @@ -299,17 +302,22 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] done := make(chan struct{}) if contains(flags, "background.start") { - commandCtx, cancel := context.WithTimeout(h.serverCtx, time.Hour) - if err := h.background.Add(name, cancel, done); err != nil { - h.sendServerMessage(logger.Error(h.user, err, args)) + commandCtx, cancel := context.WithCancel(h.serverCtx) + // TODO: For background jobs dont attempt to send data to dtail client as there might be no SSH connection + if err := h.background.Add(jobName, cancel, done); err != nil { + h.sendServerMessage(logger.Error(h.user, err, jobName, args)) finished() return } + go func() { h.globalServerWaitFor <- struct{}{} }() go func() { command.Start(commandCtx, argc, args) close(done) + <-h.globalServerWaitFor }() + + h.sendServerMessage(logger.Info(h.user, jobName, "job started in background")) finished() return } @@ -427,3 +435,9 @@ func contains(haystack []string, needle string) bool { } return false } + +func hash(str string) string { + h := sha256.New() + h.Write([]byte(str)) + return hex.EncodeToString(h.Sum(nil)) +} diff --git a/internal/server/server.go b/internal/server/server.go index 1421540..5ec46e7 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -10,6 +10,7 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" + "github.com/mimecast/dtail/internal/server/background" "github.com/mimecast/dtail/internal/server/handlers" "github.com/mimecast/dtail/internal/ssh/server" user "github.com/mimecast/dtail/internal/user/server" @@ -32,6 +33,8 @@ type Server struct { sched *scheduler // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. shutdownWaitFor chan struct{} + // Background jobs + background background.Background } // New returns a new server. @@ -44,6 +47,7 @@ func New() *Server { tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails), shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), + background: background.New(), } s.sshServerConfig.PasswordCallback = s.backgroundUserCallback @@ -178,7 +182,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch case config.ControlUser: handler, done = handlers.NewControlHandler(handlerCtx, user) default: - handler, done = handlers.NewServerHandler(handlerCtx, ctx, user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor) + handler, done = handlers.NewServerHandler(handlerCtx, ctx, user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor, s.background) } go func() { |
