summaryrefslogtreecommitdiff
path: root/internal/server/handlers/serverhandler.go
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-11-20 16:55:01 +0000
committerPaul Buetow <pbuetow@mimecast.com>2020-11-20 16:55:01 +0000
commit6cd32a40fed4b82ffcbfbf22edf94baf674badf7 (patch)
treec52e2747d2bad163f8ac2fcde26a75fb13d9e7c7 /internal/server/handlers/serverhandler.go
parent46746e4630f3d7cd968c74645a28cc64c6930767 (diff)
parent585ee676cbba0f36d60fc8db2aa2fd82ba378d13 (diff)
Merge branch 'develop' of https://github.com/snonux/dtail into develop
Diffstat (limited to 'internal/server/handlers/serverhandler.go')
-rw-r--r--internal/server/handlers/serverhandler.go119
1 files changed, 3 insertions, 116 deletions
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 164a280..843eabc 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -7,9 +7,7 @@ import (
"fmt"
"io"
"os"
- "strconv"
"strings"
- "sync"
"sync/atomic"
"time"
@@ -19,7 +17,6 @@ import (
"github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/mapr/server"
"github.com/mimecast/dtail/internal/omode"
- "github.com/mimecast/dtail/internal/server/background"
user "github.com/mimecast/dtail/internal/user/server"
"github.com/mimecast/dtail/internal/version"
)
@@ -47,11 +44,10 @@ type ServerHandler struct {
ackCloseReceived chan struct{}
activeCommands int32
activeReaders int32
- background background.Background
}
// NewServerHandler returns the server handler.
-func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}, background background.Background) *ServerHandler {
+func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWaitFor chan struct{}) *ServerHandler {
h := ServerHandler{
done: internal.NewDone(),
lines: make(chan line.Line, 100),
@@ -63,7 +59,6 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter, globalServerWait
globalServerWaitFor: globalServerWaitFor,
regex: ".",
user: user,
- background: background,
}
fqdn, err := os.Hostname()
@@ -151,7 +146,6 @@ func (h *ServerHandler) Write(p []byte) (n int, err error) {
func (h *ServerHandler) handleCommand(commandStr string) {
logger.Debug(h.user, commandStr)
- var timeout time.Duration
ctx := context.Background()
args, argc, err := h.handleProtocolVersion(strings.Split(commandStr, " "))
@@ -166,12 +160,6 @@ func (h *ServerHandler) handleCommand(commandStr string) {
return
}
- args, argc, timeout, err = h.handleTimeout(args, argc)
- if err != nil {
- h.send(h.serverMessages, logger.Error(h.user, err))
- return
- }
-
if h.user.Name == config.ControlUser {
h.handleControlCommand(argc, args)
return
@@ -183,19 +171,7 @@ func (h *ServerHandler) handleCommand(commandStr string) {
cancel()
}()
- if timeout > 0 {
- logger.Info(h.user, "Command with timeout context", argc, args, timeout)
- ctx, cancel := context.WithTimeout(ctx, timeout)
- go func() {
- <-ctx.Done()
- logger.Info(h.user, "Command timed out, canceling it", args, args, timeout)
- cancel()
- }()
- h.handleUserCommand(ctx, argc, args, timeout)
- return
- }
-
- h.handleUserCommand(ctx, argc, args, timeout)
+ h.handleUserCommand(ctx, argc, args)
}
func (h *ServerHandler) handleProtocolVersion(args []string) ([]string, int, error) {
@@ -233,16 +209,6 @@ func (h *ServerHandler) handleBase64(args []string, argc int) ([]string, int, er
return args, argc, nil
}
-func (h *ServerHandler) handleTimeout(args []string, argc int) ([]string, int, time.Duration, error) {
- if argc <= 2 || args[0] != "timeout" {
- // No timeout specified
- return args, argc, time.Duration(0) * time.Second, nil
- }
-
- timeout, err := strconv.Atoi(args[1])
- return args[2:], argc - 2, time.Duration(timeout) * time.Second, err
-}
-
func (h *ServerHandler) handleControlCommand(argc int, args []string) {
switch args[0] {
case "debug":
@@ -252,7 +218,7 @@ func (h *ServerHandler) handleControlCommand(argc int, args []string) {
}
}
-func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string, timeout time.Duration) {
+func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) {
logger.Debug(h.user, "handleUserCommand", argc, args)
h.incrementActiveCommands()
@@ -314,85 +280,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
commandFinished()
}()
- case "run":
- // TODO: Refactor this "run" case, move code to runcommand.go
- command := newRunCommand(h)
-
- jobName, _ := options["jobName"]
- logger.Debug(h.user, "run", options)
-
- if val, ok := options["background"]; ok && (val == "cancel" || val == "stop") {
- if err := h.background.Cancel(h.user.Name, jobName); err != nil {
- h.sendServerMessage(logger.Error(h.user, err, jobName, args))
- } else {
- h.sendServerMessage(logger.Info(h.user, "job cancelled", jobName))
- }
- commandFinished()
- return
- }
-
- if val, ok := options["background"]; ok && val == "list" {
- h.sendServerMessage("Listing jobs")
- count := 0
- for jobName := range h.background.ListJobsC(h.user.Name) {
- h.sendServerMessage(jobName)
- count++
- }
- h.sendServerMessage(fmt.Sprintf("Found %d jobs", count))
- commandFinished()
- return
- }
-
- str, _ := options["outerArgs"]
- outerArgs := strings.Split(str, " ")
-
- var background bool
- if val, ok := options["background"]; ok && val == "start" {
- background = true
- }
-
- var wg sync.WaitGroup
- wg.Add(1)
-
- if background {
- if timeout == 0 {
- // Set default background timeout.
- timeout = time.Hour * 1
- }
-
- commandCtx, cancel := context.WithTimeout(ctx, timeout)
-
- if err := h.background.Add(h.user.Name, jobName, cancel, &wg); err != nil {
- h.sendServerMessage(logger.Error(h.user, err, jobName, args))
- commandFinished()
- return
- }
- ctx = commandCtx
- }
-
- if err := command.StartBackground(ctx, &wg, argc, args, outerArgs); err != nil {
- h.sendServerMessage(logger.Error(h.user, "Unable to execute command", argc, args, err))
- commandFinished()
- return
- }
-
- // Make sure that server waits for all sub-processes to finish on shutdown
- go func() { h.globalServerWaitFor <- struct{}{} }()
- go func() {
- wg.Wait()
- <-h.globalServerWaitFor
- }()
-
- if background {
- h.sendServerMessage(logger.Info(h.user, jobName, "job started in background"))
- commandFinished()
- return
- }
-
- // Command run in foreground, wait for it to complete before finishing the connection.
- wg.Wait()
- commandFinished()
-
case "ack", ".ack":
h.handleAckCommand(argc, args)
commandFinished()