summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2020-02-22 12:06:09 +0000
committerPaul Bütow <pbuetow@mimecast.com>2020-02-22 12:06:09 +0000
commit6e176034306026b922c1df4231a1807f36cbe460 (patch)
tree96405450466b12240302d306938c7af9b19f21ba
parent4d2ab8e6dd645d345fa26d8a067ad6dc14fc1fce (diff)
can start commands in background and also cancel those via drun command
-rw-r--r--cmd/drun/main.go6
-rw-r--r--internal/clients/runclient.go21
-rw-r--r--internal/server/background/background.go52
-rw-r--r--internal/server/handlers/serverhandler.go36
-rw-r--r--internal/server/server.go6
5 files changed, 76 insertions, 45 deletions
diff --git a/cmd/drun/main.go b/cmd/drun/main.go
index e11900e..1f0e0fe 100644
--- a/cmd/drun/main.go
+++ b/cmd/drun/main.go
@@ -23,6 +23,8 @@ func main() {
var discovery string
var displayVersion bool
var noColor bool
+ var background bool
+ var cancel bool
var serversStr string
var silentEnable bool
var sshPort int
@@ -36,6 +38,8 @@ func main() {
flag.BoolVar(&noColor, "noColor", false, "Disable ANSII terminal colors")
flag.BoolVar(&silentEnable, "silent", false, "Reduce output")
flag.BoolVar(&trustAllHosts, "trustAllHosts", false, "Auto trust all unknown host keys")
+ flag.BoolVar(&background, "background", false, "Command starts in background on the server")
+ flag.BoolVar(&cancel, "cancel", false, "Command will be cancelled if it runs in background")
flag.IntVar(&connectionsPerCPU, "cpc", 10, "How many connections established per CPU core concurrently")
flag.IntVar(&sshPort, "port", 2222, "SSH server port")
flag.IntVar(&timeout, "timeout", 0, "Command execution timeout")
@@ -69,7 +73,7 @@ func main() {
Timeout: timeout,
}
- client, err := clients.NewRunClient(args)
+ client, err := clients.NewRunClient(args, background, cancel)
if err != nil {
panic(err)
}
diff --git a/internal/clients/runclient.go b/internal/clients/runclient.go
index e3be616..c2f6f62 100644
--- a/internal/clients/runclient.go
+++ b/internal/clients/runclient.go
@@ -11,10 +11,12 @@ import (
// RunClient is a client to run various commands on the server.
type RunClient struct {
baseClient
+ background bool
+ cancel bool
}
// NewRunClient returns a new cat client.
-func NewRunClient(args Args) (*RunClient, error) {
+func NewRunClient(args Args, background, cancel bool) (*RunClient, error) {
args.Mode = omode.RunClient
c := RunClient{
@@ -23,6 +25,8 @@ func NewRunClient(args Args) (*RunClient, error) {
throttleCh: make(chan struct{}, args.ConnectionsPerCPU*runtime.NumCPU()),
retry: false,
},
+ background: background,
+ cancel: cancel,
}
c.init(c)
@@ -34,12 +38,21 @@ func (c RunClient) makeHandler(server string) handlers.Handler {
}
func (c RunClient) makeCommands() (commands []string) {
- // Send "run COMMAND" to server!
if c.Timeout > 0 {
- commands = append(commands, fmt.Sprintf("timeout %d %s %s", c.Timeout, c.Mode.String(), c.What))
+ commands = append(commands, fmt.Sprintf("timeout %d run%s %s", c.Timeout, c.flags(), c.What))
return
}
- commands = append(commands, fmt.Sprintf("%s %s", c.Mode.String(), c.What))
+ commands = append(commands, fmt.Sprintf("run%s %s", c.flags(), c.What))
return
}
+
+func (c RunClient) flags() string {
+ if c.background {
+ return ":background.start"
+ }
+ if c.cancel {
+ return ":background.cancel"
+ }
+ return ""
+}
diff --git a/internal/server/background/background.go b/internal/server/background/background.go
index 537ccbb..05a502f 100644
--- a/internal/server/background/background.go
+++ b/internal/server/background/background.go
@@ -2,10 +2,8 @@ package background
import (
"context"
- "fmt"
+ "errors"
"sync"
-
- "github.com/mimecast/dtail/internal/io/logger"
)
type job struct {
@@ -13,27 +11,44 @@ type job struct {
done <-chan struct{}
}
+// Background specifies a job or command run in background on server side.
+// This does not require an active DTail client SSH connection/session.
type Background struct {
- mutex sync.Mutex
+ mutex *sync.Mutex
jobs map[string]job
}
-func NewBackground() *Background {
- return &Background{
- jobs: make(map[string]job),
+// New returns a new background manager.
+func New() Background {
+ return Background{
+ jobs: make(map[string]job),
+ mutex: &sync.Mutex{},
}
}
+// Add a background job.
func (b Background) Add(name string, cancel context.CancelFunc, done <-chan struct{}) error {
b.mutex.Lock()
defer b.mutex.Unlock()
if _, ok := b.jobs[name]; ok {
- return fmt.Errorf("job '%s' already exists", name)
+ return errors.New("job already exists")
}
- logger.Debug("background", name, "adding job")
b.jobs[name] = job{cancel, done}
+ return nil
+}
+
+// Cancel a background job.
+func (b Background) Cancel(name string) error {
+ job, ok := b.get(name)
+ if !ok {
+ return errors.New("no job to cancel")
+ }
+
+ job.cancel()
+ <-job.done
+ b.delete(name)
return nil
}
@@ -52,22 +67,3 @@ func (b Background) delete(name string) {
delete(b.jobs, name)
}
-
-func (b Background) Stop(name string) {
- logger.Debug("background", name, "stopping job")
- job, ok := b.get(name)
-
- if !ok {
- logger.Debug("background", name, "no such job")
- return
- }
-
- logger.Debug("background", name, "canceling job")
- job.cancel()
-
- logger.Debug("background", name, "waiting for job to complete")
- <-job.done
-
- logger.Debug("background", name, "deleting job")
- b.delete(name)
-}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index bcd3f85..01e4054 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
+ "encoding/hex"
"errors"
"fmt"
"io"
@@ -49,11 +50,11 @@ type ServerHandler struct {
handlerCtx context.Context
done chan struct{}
activeCommands int
- background *background.Background
+ background background.Background
}
// NewServerHandler returns the server handler.
-func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) (*ServerHandler, <-chan struct{}) {
+func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) (*ServerHandler, <-chan struct{}) {
h := ServerHandler{
serverCtx: serverCtx,
handlerCtx: handlerCtx,
@@ -68,7 +69,7 @@ func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, ca
globalServerWaitFor: globalServerWaitFor,
regex: ".",
user: user,
- background: background.NewBackground(),
+ background: background,
}
fqdn, err := os.Hostname()
@@ -286,12 +287,14 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
case "run":
// TODO: Refactor this "run" case, move code to runcommand.go
command := newRunCommand(h)
+ jobName := fmt.Sprintf("%s%%%s", h.user.Name, hash(strings.Join(args[1:], " ")))
- checksum := sha256.Sum256([]byte(strings.Join(args, " ")))
- name := fmt.Sprintf("%s.%s", h.user.Name, checksum)
-
- if contains(flags, "background.stop") {
- h.background.Stop(name)
+ if contains(flags, "background.cancel") {
+ if err := h.background.Cancel(jobName); err != nil {
+ h.sendServerMessage(logger.Error(h.user, err, jobName, args))
+ } else {
+ h.sendServerMessage(logger.Info(h.user, "job cancelled", jobName))
+ }
finished()
return
}
@@ -299,17 +302,22 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
done := make(chan struct{})
if contains(flags, "background.start") {
- commandCtx, cancel := context.WithTimeout(h.serverCtx, time.Hour)
- if err := h.background.Add(name, cancel, done); err != nil {
- h.sendServerMessage(logger.Error(h.user, err, args))
+ commandCtx, cancel := context.WithCancel(h.serverCtx)
+ // TODO: For background jobs dont attempt to send data to dtail client as there might be no SSH connection
+ if err := h.background.Add(jobName, cancel, done); err != nil {
+ h.sendServerMessage(logger.Error(h.user, err, jobName, args))
finished()
return
}
+ go func() { h.globalServerWaitFor <- struct{}{} }()
go func() {
command.Start(commandCtx, argc, args)
close(done)
+ <-h.globalServerWaitFor
}()
+
+ h.sendServerMessage(logger.Info(h.user, jobName, "job started in background"))
finished()
return
}
@@ -427,3 +435,9 @@ func contains(haystack []string, needle string) bool {
}
return false
}
+
+func hash(str string) string {
+ h := sha256.New()
+ h.Write([]byte(str))
+ return hex.EncodeToString(h.Sum(nil))
+}
diff --git a/internal/server/server.go b/internal/server/server.go
index 1421540..5ec46e7 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -10,6 +10,7 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/logger"
+ "github.com/mimecast/dtail/internal/server/background"
"github.com/mimecast/dtail/internal/server/handlers"
"github.com/mimecast/dtail/internal/ssh/server"
user "github.com/mimecast/dtail/internal/user/server"
@@ -32,6 +33,8 @@ type Server struct {
sched *scheduler
// Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed.
shutdownWaitFor chan struct{}
+ // Background jobs
+ background background.Background
}
// New returns a new server.
@@ -44,6 +47,7 @@ func New() *Server {
tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails),
shutdownWaitFor: make(chan struct{}, 1000),
sched: newScheduler(),
+ background: background.New(),
}
s.sshServerConfig.PasswordCallback = s.backgroundUserCallback
@@ -178,7 +182,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
case config.ControlUser:
handler, done = handlers.NewControlHandler(handlerCtx, user)
default:
- handler, done = handlers.NewServerHandler(handlerCtx, ctx, user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor)
+ handler, done = handlers.NewServerHandler(handlerCtx, ctx, user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor, s.background)
}
go func() {