From bad8e04bb4410b94b8e875ccde287f74ab94121a Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 19 Sep 2020 18:38:36 +0100 Subject: server handler context refactoring --- internal/server/handlers/controlhandler.go | 26 +++++++---- internal/server/handlers/handler.go | 2 + internal/server/handlers/serverhandler.go | 73 ++++++++++++++++-------------- internal/server/server.go | 39 +++++++--------- 4 files changed, 74 insertions(+), 66 deletions(-) (limited to 'internal/server') diff --git a/internal/server/handlers/controlhandler.go b/internal/server/handlers/controlhandler.go index daa9835..9a8eb75 100644 --- a/internal/server/handlers/controlhandler.go +++ b/internal/server/handlers/controlhandler.go @@ -1,20 +1,19 @@ package handlers import ( - "context" "fmt" "io" "os" "strings" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/io/logger" user "github.com/mimecast/dtail/internal/user/server" ) // ControlHandler is used for control functions and health monitoring. type ControlHandler struct { - ctx context.Context - done chan struct{} + done *internal.Done hostname string payload []byte serverMessages chan string @@ -22,12 +21,11 @@ type ControlHandler struct { } // NewControlHandler returns a new control handler. -func NewControlHandler(ctx context.Context, user *user.User) (*ControlHandler, <-chan struct{}) { +func NewControlHandler(user *user.User) *ControlHandler { logger.Debug(user, "Creating control handler") h := ControlHandler{ - ctx: ctx, - done: make(chan struct{}), + done: internal.NewDone(), serverMessages: make(chan string, 10), user: user, } @@ -40,7 +38,15 @@ func NewControlHandler(ctx context.Context, user *user.User) (*ControlHandler, < s := strings.Split(fqdn, ".") h.hostname = s[0] - return &h, h.done + return &h +} + +func (h *ControlHandler) Shutdown() { + h.done.Shutdown() +} + +func (h *ControlHandler) Done() <-chan struct{} { + return h.done.Done() } // Read is to send data to the client via the Reader interface. @@ -51,7 +57,7 @@ func (h *ControlHandler) Read(p []byte) (n int, err error) { wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s\n", h.hostname, message)) n = copy(p, wholePayload) return - case <-h.ctx.Done(): + case <-h.done.Done(): return 0, io.EOF } } @@ -63,7 +69,7 @@ func (h *ControlHandler) Write(p []byte) (n int, err error) { switch c { case ';': wholePayload := strings.TrimSpace(string(h.payload)) - h.handleCommand(h.ctx, wholePayload) + h.handleCommand(wholePayload) h.payload = nil default: @@ -75,7 +81,7 @@ func (h *ControlHandler) Write(p []byte) (n int, err error) { return } -func (h *ControlHandler) handleCommand(ctx context.Context, command string) { +func (h *ControlHandler) handleCommand(command string) { logger.Info(h.user, command) s := strings.Split(command, " ") logger.Debug(h.user, "Receiving command", command, s) diff --git a/internal/server/handlers/handler.go b/internal/server/handlers/handler.go index c42ceb9..b04e854 100644 --- a/internal/server/handlers/handler.go +++ b/internal/server/handlers/handler.go @@ -5,4 +5,6 @@ import "io" // Handler interface for server side functionality. type Handler interface { io.ReadWriter + Shutdown() + Done() <-chan struct{} } diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 7017f3e..5af3ebb 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -13,6 +13,7 @@ import ( "sync/atomic" "time" + "github.com/mimecast/dtail/internal" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/logger" @@ -31,33 +32,28 @@ const ( // the Bi-directional communication between SSH client and server. // This handler implements the handler of the SSH server. type ServerHandler struct { - lines chan line.Line - regex string - aggregate *server.Aggregate - aggregatedMessages chan string - serverMessages chan string - payload []byte - hostname string - user *user.User - // TODO: Move all these channels into a separate struct for readability! + done *internal.Done + lines chan line.Line + regex string + aggregate *server.Aggregate + aggregatedMessages chan string + serverMessages chan string + payload []byte + hostname string + user *user.User catLimiter chan struct{} tailLimiter chan struct{} globalServerWaitFor chan struct{} ackCloseReceived chan struct{} - serverCtx context.Context - handlerCtx context.Context - done chan struct{} activeCommands int32 activeReaders int32 background background.Background } // NewServerHandler returns the server handler. -func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) (*ServerHandler, <-chan struct{}) { +func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) *ServerHandler { h := ServerHandler{ - serverCtx: serverCtx, - handlerCtx: handlerCtx, - done: make(chan struct{}), + done: internal.NewDone(), lines: make(chan line.Line, 100), serverMessages: make(chan string, 10), aggregatedMessages: make(chan string, 10), @@ -78,7 +74,15 @@ func NewServerHandler(handlerCtx, serverCtx context.Context, user *user.User, ca s := strings.Split(fqdn, ".") h.hostname = s[0] - return &h, h.done + return &h +} + +func (h *ServerHandler) Shutdown() { + h.done.Shutdown() +} + +func (h *ServerHandler) Done() <-chan struct{} { + return h.done.Done() } // Read is to send data to the dtail client via Reader interface. @@ -120,7 +124,7 @@ func (h *ServerHandler) Read(p []byte) (n int, err error) { case <-time.After(time.Second): // Once in a while check whether we are done. select { - case <-h.handlerCtx.Done(): + case <-h.done.Done(): return 0, io.EOF default: } @@ -134,7 +138,7 @@ func (h *ServerHandler) Write(p []byte) (n int, err error) { switch c { case ';': commandStr := strings.TrimSpace(string(h.payload)) - h.handleCommand(h.handlerCtx, commandStr) + h.handleCommand(commandStr) h.payload = nil default: h.payload = append(h.payload, c) @@ -145,9 +149,10 @@ func (h *ServerHandler) Write(p []byte) (n int, err error) { return } -func (h *ServerHandler) handleCommand(ctx context.Context, commandStr string) { +func (h *ServerHandler) handleCommand(commandStr string) { logger.Debug(h.user, commandStr) var timeout time.Duration + ctx := context.Background() args, argc, err := h.handleProtocolVersion(strings.Split(commandStr, " ")) if err != nil { @@ -172,15 +177,21 @@ func (h *ServerHandler) handleCommand(ctx context.Context, commandStr string) { return } + ctx, cancel := context.WithCancel(ctx) + go func() { + <-h.done.Done() + cancel() + }() + if timeout > 0 { logger.Info(h.user, "Command with timeout context", argc, args, timeout) - commandCtx, cancel := context.WithTimeout(ctx, timeout) + ctx, cancel := context.WithTimeout(ctx, timeout) go func() { - <-commandCtx.Done() + <-ctx.Done() logger.Info(h.user, "Command timed out, canceling it", args, args, timeout) cancel() }() - h.handleUserCommand(commandCtx, argc, args, timeout) + h.handleUserCommand(ctx, argc, args, timeout) return } @@ -348,9 +359,8 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] // Set default background timeout. timeout = time.Hour * 1 } - // Use a new context based on the server context, so that background job does not get - // terminated when handler/SSH connection terminates. - commandCtx, cancel := context.WithTimeout(h.serverCtx, timeout) + + commandCtx, cancel := context.WithTimeout(ctx, timeout) if err := h.background.Add(h.user.Name, jobName, cancel, &wg); err != nil { h.sendServerMessage(logger.Error(h.user, err, jobName, args)) @@ -406,7 +416,7 @@ func (h *ServerHandler) handleAckCommand(argc int, args []string) { func (h *ServerHandler) send(ch chan<- string, message string) { select { case ch <- message: - case <-h.handlerCtx.Done(): + case <-h.done.Done(): } } @@ -447,7 +457,7 @@ func (h *ServerHandler) shutdown() { go func() { select { case h.serverMessageC() <- ".syn close connection": - case <-h.handlerCtx.Done(): + case <-h.done.Done(): } }() @@ -455,13 +465,10 @@ func (h *ServerHandler) shutdown() { case <-h.ackCloseReceived: case <-time.After(time.Second * 5): logger.Debug(h.user, "Shutdown timeout reached, enforcing shutdown") - case <-h.handlerCtx.Done(): + case <-h.done.Done(): } - select { - case h.done <- struct{}{}: - default: - } + h.done.Shutdown() } func (h *ServerHandler) incrementActiveCommands() { diff --git a/internal/server/server.go b/internal/server/server.go index a446738..5e2a521 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -178,53 +178,46 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch switch req.Type { case "shell": - handlerCtx, cancel := context.WithCancel(ctx) - var handler handlers.Handler - var done <-chan struct{} - switch user.Name { case config.ControlUser: - handler, done = handlers.NewControlHandler(handlerCtx, user) + handler = handlers.NewControlHandler(user) default: - handler, done = handlers.NewServerHandler(handlerCtx, ctx, user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor, s.background) + handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor, s.background) } - go func() { - // Handler finished work, cancel all remaining routines - defer cancel() - - <-done - }() + terminate := func() { + handler.Shutdown() + sshConn.Close() + } go func() { // Broken pipe, cancel - defer cancel() - io.Copy(channel, handler) + terminate() }() go func() { // Broken pipe, cancel - defer cancel() - io.Copy(handler, channel) + terminate() }() go func() { - defer cancel() + select { + case <-ctx.Done(): + case <-handler.Done(): + } + terminate() + }() + go func() { if err := sshConn.Wait(); err != nil && err != io.EOF { logger.Error(user, err) } s.stats.decrementConnections() logger.Info(user, "Good bye Mister!") - }() - - go func() { - <-handlerCtx.Done() - sshConn.Close() - logger.Info(user, "Closed SSH connection") + terminate() }() // Only serving shell type -- cgit v1.2.3 From df2ff83897cde61d04b12958c6f6d458c69502f4 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 19 Sep 2020 19:15:08 +0100 Subject: refactor to have no context.Context in server mapreduce aggregate structs --- internal/server/handlers/serverhandler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'internal/server') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 5af3ebb..164a280 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -266,7 +266,7 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] if h.aggregate == nil { return } - h.aggregate.Cancel() + h.aggregate.Shutdown() } } -- cgit v1.2.3 From 7df612f527bd5dc2e785bf766d7d61124c260b94 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 16 Nov 2020 22:11:44 +0000 Subject: remove drun command for simplicity. only focus on interactive commands dealing with log streams --- internal/server/background/background.go | 126 ------------------------------ internal/server/handlers/runcommand.go | 111 -------------------------- internal/server/handlers/serverhandler.go | 85 +------------------- internal/server/server.go | 6 +- 4 files changed, 2 insertions(+), 326 deletions(-) delete mode 100644 internal/server/background/background.go delete mode 100644 internal/server/handlers/runcommand.go (limited to 'internal/server') diff --git a/internal/server/background/background.go b/internal/server/background/background.go deleted file mode 100644 index 3907448..0000000 --- a/internal/server/background/background.go +++ /dev/null @@ -1,126 +0,0 @@ -package background - -import ( - "context" - "errors" - "fmt" - "strings" - "sync" - - "github.com/mimecast/dtail/internal/io/logger" -) - -type job struct { - cancel context.CancelFunc - wg *sync.WaitGroup -} - -// Background specifies a job or command run in background on server side. -// This does not require an active DTail client SSH connection/session. -type Background struct { - mutex *sync.Mutex - jobs map[string]job -} - -// New returns a new background manager. -func New() Background { - return Background{ - jobs: make(map[string]job), - mutex: &sync.Mutex{}, - } -} - -// Add a background job. -func (b Background) Add(userName, jobName string, cancel context.CancelFunc, wg *sync.WaitGroup) error { - key := b.key(userName, jobName) - logger.Debug("background", "Add", key) - - b.mutex.Lock() - defer b.mutex.Unlock() - - if _, ok := b.jobs[key]; ok { - return errors.New("job already exists") - } - - b.jobs[key] = job{cancel, wg} - - // Clean up background job database. - go func() { - wg.Wait() - b.cancel(key) - }() - - return nil -} - -// Cancel a background job. -func (b Background) Cancel(userName, jobName string) error { - key := b.key(userName, jobName) - logger.Debug("background", "Cancel", key) - - return b.cancel(key) -} - -func (b Background) cancel(key string) error { - job, ok := b.get(key) - logger.Debug("background", "cancel", key, job, ok) - - if !ok { - return errors.New("no job to cancel") - } - - logger.Debug("background", "cancel", "run job.cancel()") - job.cancel() - logger.Debug("background", "cancel", "run job.wg.Wait()") - job.wg.Wait() - logger.Debug("background", "cancel", "run b.delete(key)") - b.delete(key) - - return nil -} - -// ListJobsC returns a channel listing all jobs of the given user. -func (b Background) ListJobsC(userName string) <-chan string { - logger.Debug("background", "ListJobC", userName) - - ch := make(chan string) - - go func() { - defer close(ch) - - b.mutex.Lock() - defer b.mutex.Unlock() - - for k := range b.jobs { - logger.Debug("ListJobsC", k, userName) - if strings.HasPrefix(k, fmt.Sprintf("%s.", userName)) { - ch <- k - } - } - }() - - return ch -} - -func (b Background) get(key string) (job, bool) { - logger.Debug("background", "get", key) - - b.mutex.Lock() - defer b.mutex.Unlock() - - job, ok := b.jobs[key] - return job, ok -} - -func (b Background) delete(key string) { - logger.Debug("background", "delete", key) - - b.mutex.Lock() - defer b.mutex.Unlock() - - delete(b.jobs, key) -} - -func (Background) key(userName, jobName string) string { - return fmt.Sprintf("%s.%s", userName, jobName) -} diff --git a/internal/server/handlers/runcommand.go b/internal/server/handlers/runcommand.go deleted file mode 100644 index 8e5895b..0000000 --- a/internal/server/handlers/runcommand.go +++ /dev/null @@ -1,111 +0,0 @@ -package handlers - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "os" - "os/exec" - "strings" - "sync" - "time" - - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/io/run" -) - -type runCommand struct { - server *ServerHandler - run run.Run -} - -func newRunCommand(server *ServerHandler) runCommand { - return runCommand{ - server: server, - } -} - -func (r runCommand) StartBackground(ctx context.Context, wg *sync.WaitGroup, argc int, args, outerArgs []string) error { - if argc < 2 { - return fmt.Errorf("%s: args:%v argc:%d", commandParseWarning, args, argc) - } - - ec := make(chan int, 1) - var pid int - var err error - - command := strings.Join(args[1:], " ") - if strings.Contains(command, ";") || strings.Contains(command, "\n") { - if pid, err = r.startScript(ctx, wg, ec, command, outerArgs); err != nil { - r.server.sendServerMessage(".run exitstatus 255") - return err - } - return nil - } - - if pid, err = r.start(ctx, wg, ec, strings.TrimSpace(command), outerArgs); err != nil { - r.server.sendServerMessage(".run exitstatus 255") - return err - } - - exitCode := <-ec - r.server.sendServerMessage(fmt.Sprintf(".run exitstatus %d", exitCode)) - r.server.sendServerMessage(logger.Info(fmt.Sprintf("Process %d exited with status %d", pid, exitCode))) - - return nil -} - -func (r runCommand) startScript(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, script string, outerArgs []string) (int, error) { - if _, err := os.Stat(config.Common.TmpDir); os.IsNotExist(err) { - return -1, err - } - - timestamp := time.Now().UnixNano() - scriptPath := fmt.Sprintf("%s/%s_%v.sh", config.Common.TmpDir, r.server.user.Name, timestamp) - - // TODO: On dserver startup delete all previously written scripts (there might be left overs due to a crash or so) - logger.Debug(r.server.user, "Writing temp script", scriptPath) - - script = fmt.Sprintf("#!/bin/sh\n%s", script) - if err := ioutil.WriteFile(scriptPath, []byte(script), 0700); err != nil { - return -1, err - } - - pid, err := r.start(ctx, wg, ec, scriptPath, outerArgs) - go func() { - wg.Wait() - logger.Debug("Deleting script", scriptPath) - os.Remove(scriptPath) - }() - - return pid, err -} - -func (r runCommand) start(ctx context.Context, wg *sync.WaitGroup, ec chan<- int, command string, outerArgs []string) (int, error) { - if len(command) == 0 { - return -1, errors.New("Empty command provided") - } - - splitted := strings.Split(command, " ") - path := splitted[0] - args := splitted[1:] - args = append(args, outerArgs...) - - qualifiedPath, err := exec.LookPath(path) - if err != nil { - return -1, err - } - - if !r.server.user.HasFilePermission(qualifiedPath, "runcommands") { - return -1, fmt.Errorf("No permission to execute path: %s", qualifiedPath) - } - - r.run = run.New(qualifiedPath, args) - pid, err := r.run.StartBackground(ctx, wg, ec, r.server.lines) - if err != nil { - return pid, err - } - return pid, nil -} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 164a280..7ad1224 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -9,7 +9,6 @@ import ( "os" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -19,7 +18,6 @@ import ( "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/mapr/server" "github.com/mimecast/dtail/internal/omode" - "github.com/mimecast/dtail/internal/server/background" user "github.com/mimecast/dtail/internal/user/server" "github.com/mimecast/dtail/internal/version" ) @@ -47,11 +45,10 @@ type ServerHandler struct { ackCloseReceived chan struct{} activeCommands int32 activeReaders int32 - background background.Background } // NewServerHandler returns the server handler. -func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) *ServerHandler { +func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) *ServerHandler { h := ServerHandler{ done: internal.NewDone(), lines: make(chan line.Line, 100), @@ -63,7 +60,6 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWait globalServerWaitFor: globalServerWaitFor, regex: ".", user: user, - background: background, } fqdn, err := os.Hostname() @@ -314,85 +310,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] commandFinished() }() - case "run": - // TODO: Refactor this "run" case, move code to runcommand.go - command := newRunCommand(h) - - jobName, _ := options["jobName"] - logger.Debug(h.user, "run", options) - - if val, ok := options["background"]; ok && (val == "cancel" || val == "stop") { - if err := h.background.Cancel(h.user.Name, jobName); err != nil { - h.sendServerMessage(logger.Error(h.user, err, jobName, args)) - } else { - h.sendServerMessage(logger.Info(h.user, "job cancelled", jobName)) - } - commandFinished() - return - } - - if val, ok := options["background"]; ok && val == "list" { - h.sendServerMessage("Listing jobs") - count := 0 - for jobName := range h.background.ListJobsC(h.user.Name) { - h.sendServerMessage(jobName) - count++ - } - h.sendServerMessage(fmt.Sprintf("Found %d jobs", count)) - commandFinished() - return - } - - str, _ := options["outerArgs"] - outerArgs := strings.Split(str, " ") - - var background bool - if val, ok := options["background"]; ok && val == "start" { - background = true - } - - var wg sync.WaitGroup - wg.Add(1) - - if background { - if timeout == 0 { - // Set default background timeout. - timeout = time.Hour * 1 - } - - commandCtx, cancel := context.WithTimeout(ctx, timeout) - - if err := h.background.Add(h.user.Name, jobName, cancel, &wg); err != nil { - h.sendServerMessage(logger.Error(h.user, err, jobName, args)) - commandFinished() - return - } - ctx = commandCtx - } - - if err := command.StartBackground(ctx, &wg, argc, args, outerArgs); err != nil { - h.sendServerMessage(logger.Error(h.user, "Unable to execute command", argc, args, err)) - commandFinished() - return - } - - // Make sure that server waits for all sub-processes to finish on shutdown - go func() { h.globalServerWaitFor <- struct{}{} }() - go func() { - wg.Wait() - <-h.globalServerWaitFor - }() - - if background { - h.sendServerMessage(logger.Info(h.user, jobName, "job started in background")) - commandFinished() - return - } - - // Command run in foreground, wait for it to complete before finishing the connection. - wg.Wait() - commandFinished() - case "ack", ".ack": h.handleAckCommand(argc, args) commandFinished() diff --git a/internal/server/server.go b/internal/server/server.go index 5e2a521..d4255a3 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -11,7 +11,6 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" - "github.com/mimecast/dtail/internal/server/background" "github.com/mimecast/dtail/internal/server/handlers" "github.com/mimecast/dtail/internal/ssh/server" user "github.com/mimecast/dtail/internal/user/server" @@ -36,8 +35,6 @@ type Server struct { cont *continuous // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. shutdownWaitFor chan struct{} - // Background jobs - background background.Background } // New returns a new server. @@ -51,7 +48,6 @@ func New() *Server { shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), cont: newContinuous(), - background: background.New(), } s.sshServerConfig.PasswordCallback = s.Callback @@ -183,7 +179,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch case config.ControlUser: handler = handlers.NewControlHandler(user) default: - handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor, s.background) + handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor) } terminate := func() { -- cgit v1.2.3 From 1626b6237d87a6ae6238b0d297d7bfb5cf7ddd38 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 16 Nov 2020 22:16:10 +0000 Subject: strip out timeout which was used for background command scheduling --- internal/server/handlers/serverhandler.go | 34 ++----------------------------- 1 file changed, 2 insertions(+), 32 deletions(-) (limited to 'internal/server') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 7ad1224..843eabc 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "os" - "strconv" "strings" "sync/atomic" "time" @@ -147,7 +146,6 @@ func (h *ServerHandler) Write(p []byte) (n int, err error) { func (h *ServerHandler) handleCommand(commandStr string) { logger.Debug(h.user, commandStr) - var timeout time.Duration ctx := context.Background() args, argc, err := h.handleProtocolVersion(strings.Split(commandStr, " ")) @@ -162,12 +160,6 @@ func (h *ServerHandler) handleCommand(commandStr string) { return } - args, argc, timeout, err = h.handleTimeout(args, argc) - if err != nil { - h.send(h.serverMessages, logger.Error(h.user, err)) - return - } - if h.user.Name == config.ControlUser { h.handleControlCommand(argc, args) return @@ -179,19 +171,7 @@ func (h *ServerHandler) handleCommand(commandStr string) { cancel() }() - if timeout > 0 { - logger.Info(h.user, "Command with timeout context", argc, args, timeout) - ctx, cancel := context.WithTimeout(ctx, timeout) - go func() { - <-ctx.Done() - logger.Info(h.user, "Command timed out, canceling it", args, args, timeout) - cancel() - }() - h.handleUserCommand(ctx, argc, args, timeout) - return - } - - h.handleUserCommand(ctx, argc, args, timeout) + h.handleUserCommand(ctx, argc, args) } func (h *ServerHandler) handleProtocolVersion(args []string) ([]string, int, error) { @@ -229,16 +209,6 @@ func (h *ServerHandler) handleBase64(args []string, argc int) ([]string, int, er return args, argc, nil } -func (h *ServerHandler) handleTimeout(args []string, argc int) ([]string, int, time.Duration, error) { - if argc <= 2 || args[0] != "timeout" { - // No timeout specified - return args, argc, time.Duration(0) * time.Second, nil - } - - timeout, err := strconv.Atoi(args[1]) - return args[2:], argc - 2, time.Duration(timeout) * time.Second, err -} - func (h *ServerHandler) handleControlCommand(argc int, args []string) { switch args[0] { case "debug": @@ -248,7 +218,7 @@ func (h *ServerHandler) handleControlCommand(argc int, args []string) { } } -func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string, timeout time.Duration) { +func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) { logger.Debug(h.user, "handleUserCommand", argc, args) h.incrementActiveCommands() -- cgit v1.2.3 From 1bc1725bac7cda1f9bc3fcbcf02bf4e6413cf058 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sun, 22 Nov 2020 09:21:44 +0000 Subject: add dockerfile for building a dtail server dev/test container --- internal/server/server.go | 1 + 1 file changed, 1 insertion(+) (limited to 'internal/server') diff --git a/internal/server/server.go b/internal/server/server.go index d4255a3..31fa85d 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -34,6 +34,7 @@ type Server struct { // Mointor log files for pattern (if configured) cont *continuous // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. + // TODO: Remove this counter. shutdownWaitFor chan struct{} } -- cgit v1.2.3 From ae8ffc84331ca72f0d33fff69edd85d6e03d29ae Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Mon, 28 Dec 2020 09:40:38 +0000 Subject: refactor --- internal/server/handlers/serverhandler.go | 50 +++++++++++++++---------------- internal/server/server.go | 20 +------------ 2 files changed, 25 insertions(+), 45 deletions(-) (limited to 'internal/server') diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 843eabc..db917bd 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -29,36 +29,34 @@ const ( // the Bi-directional communication between SSH client and server. // This handler implements the handler of the SSH server. type ServerHandler struct { - done *internal.Done - lines chan line.Line - regex string - aggregate *server.Aggregate - aggregatedMessages chan string - serverMessages chan string - payload []byte - hostname string - user *user.User - catLimiter chan struct{} - tailLimiter chan struct{} - globalServerWaitFor chan struct{} - ackCloseReceived chan struct{} - activeCommands int32 - activeReaders int32 + done *internal.Done + lines chan line.Line + regex string + aggregate *server.Aggregate + aggregatedMessages chan string + serverMessages chan string + payload []byte + hostname string + user *user.User + catLimiter chan struct{} + tailLimiter chan struct{} + ackCloseReceived chan struct{} + activeCommands int32 + activeReaders int32 } // NewServerHandler returns the server handler. -func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) *ServerHandler { +func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler { h := ServerHandler{ - done: internal.NewDone(), - lines: make(chan line.Line, 100), - serverMessages: make(chan string, 10), - aggregatedMessages: make(chan string, 10), - ackCloseReceived: make(chan struct{}), - catLimiter: catLimiter, - tailLimiter: tailLimiter, - globalServerWaitFor: globalServerWaitFor, - regex: ".", - user: user, + done: internal.NewDone(), + lines: make(chan line.Line, 100), + serverMessages: make(chan string, 10), + aggregatedMessages: make(chan string, 10), + ackCloseReceived: make(chan struct{}), + catLimiter: catLimiter, + tailLimiter: tailLimiter, + regex: ".", + user: user, } fqdn, err := os.Hostname() diff --git a/internal/server/server.go b/internal/server/server.go index 31fa85d..d8d43c9 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -7,7 +7,6 @@ import ( "io" "net" "strings" - "time" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/logger" @@ -33,9 +32,6 @@ type Server struct { sched *scheduler // Mointor log files for pattern (if configured) cont *continuous - // Wait counter, e.g. there might be still subprocesses (forked by drun) to be killed. - // TODO: Remove this counter. - shutdownWaitFor chan struct{} } // New returns a new server. @@ -46,7 +42,6 @@ func New() *Server { sshServerConfig: &gossh.ServerConfig{}, catLimiter: make(chan struct{}, config.Server.MaxConcurrentCats), tailLimiter: make(chan struct{}, config.Server.MaxConcurrentTails), - shutdownWaitFor: make(chan struct{}, 1000), sched: newScheduler(), cont: newContinuous(), } @@ -82,25 +77,12 @@ func (s *Server) Start(ctx context.Context) int { select { case <-ctx.Done(): - // Wait until all commands/jobs/children are no more! - s.wait() } // For future use. return 0 } -func (s *Server) wait() { - for { - num := len(s.shutdownWaitFor) - logger.Debug("Waiting for stuff to finish", num) - if num <= 0 { - return - } - time.Sleep(time.Second) - } -} - func (s *Server) listenerLoop(ctx context.Context, listener net.Listener) { logger.Debug("Starting listener loop") @@ -180,7 +162,7 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch case config.ControlUser: handler = handlers.NewControlHandler(user) default: - handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter, s.shutdownWaitFor) + handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter) } terminate := func() { -- cgit v1.2.3