summaryrefslogtreecommitdiff
path: root/internal/server
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-06 09:50:41 +0300
committerPaul Buetow <paul@buetow.org>2021-10-06 09:50:41 +0300
commitfab5dc3e70434ea0abc7a0976487a1973b662331 (patch)
tree61a06e166f225b69f09966e81ae725f960fd80be /internal/server
parent9f395a03f25941d8ed98ec43035688daa1e8877f (diff)
enable faster shutdown - useful for dgrep/dmap and dcat commands
Diffstat (limited to 'internal/server')
-rw-r--r--internal/server/handlers/basehandler.go20
-rw-r--r--internal/server/handlers/controlhandler.go98
-rw-r--r--internal/server/handlers/healthhandler.go58
-rw-r--r--internal/server/handlers/serverhandler.go16
-rw-r--r--internal/server/server.go10
5 files changed, 82 insertions, 120 deletions
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index b683578..4fa8f00 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -13,6 +13,7 @@ import (
"time"
"github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -21,7 +22,7 @@ import (
user "github.com/mimecast/dtail/internal/user/server"
)
-type handleCommandCb func(context.Context, int, []string)
+type handleCommandCb func(context.Context, int, []string, string, map[string]string)
type baseHandler struct {
done *internal.Done
@@ -157,7 +158,16 @@ func (h *baseHandler) handleCommand(commandStr string) {
cancel()
}()
- h.handleCommandCb(ctx, argc, args)
+ splitted := strings.Split(args[0], ":")
+ commandName := splitted[0]
+
+ options, err := config.DeserializeOptions(splitted[1:])
+ if err != nil {
+ h.send(h.serverMessages, dlog.Server.Error(h.user, err))
+ return
+ }
+
+ h.handleCommandCb(ctx, argc, args, commandName, options)
}
func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) {
@@ -234,19 +244,19 @@ func (h *baseHandler) send(ch chan<- string, message string) {
}
func (h *baseHandler) flush() {
- dlog.Server.Debug(h.user, "flush()")
+ dlog.Server.Trace(h.user, "flush()")
numUnsentMessages := func() int {
return len(h.lines) + len(h.serverMessages) + len(h.maprMessages)
}
- for i := 0; i < 3; i++ {
+ for i := 0; i < 10; i++ {
if numUnsentMessages() == 0 {
dlog.Server.Debug(h.user, "All lines sent", fmt.Sprintf("%p", h))
return
}
dlog.Server.Debug(h.user, "Still lines to be sent")
- time.Sleep(time.Second)
+ time.Sleep(time.Millisecond * 10)
}
dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages())
diff --git a/internal/server/handlers/controlhandler.go b/internal/server/handlers/controlhandler.go
deleted file mode 100644
index ae70675..0000000
--- a/internal/server/handlers/controlhandler.go
+++ /dev/null
@@ -1,98 +0,0 @@
-package handlers
-
-import (
- "fmt"
- "io"
- "os"
- "strings"
-
- "github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/dlog"
- user "github.com/mimecast/dtail/internal/user/server"
-)
-
-// ControlHandler is used for control functions and health monitoring.
-type ControlHandler struct {
- done *internal.Done
- hostname string
- payload []byte
- serverMessages chan string
- user *user.User
-}
-
-// NewControlHandler returns a new control handler.
-func NewControlHandler(user *user.User) *ControlHandler {
- dlog.Server.Debug(user, "Creating control handler")
-
- h := ControlHandler{
- done: internal.NewDone(),
- serverMessages: make(chan string, 10),
- user: user,
- }
-
- fqdn, err := os.Hostname()
- if err != nil {
- dlog.Server.FatalPanic(err)
- }
-
- s := strings.Split(fqdn, ".")
- h.hostname = s[0]
-
- return &h
-}
-
-// Shutdown the handler.
-func (h *ControlHandler) Shutdown() {
- h.done.Shutdown()
-}
-
-// Done channel of the handler.
-func (h *ControlHandler) Done() <-chan struct{} {
- return h.done.Done()
-}
-
-// Read is to send data to the client via the Reader interface.
-func (h *ControlHandler) Read(p []byte) (n int, err error) {
- for {
- select {
- case message := <-h.serverMessages:
- wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s\n", h.hostname, message))
- n = copy(p, wholePayload)
- return
- case <-h.done.Done():
- return 0, io.EOF
- }
- }
-}
-
-// Write is to read data to the client via the Writer interface.
-func (h *ControlHandler) Write(p []byte) (n int, err error) {
- for _, c := range p {
- switch c {
- case ';':
- wholePayload := strings.TrimSpace(string(h.payload))
- h.handleCommand(wholePayload)
- h.payload = nil
-
- default:
- h.payload = append(h.payload, c)
- }
- }
-
- n = len(p)
- return
-}
-
-func (h *ControlHandler) handleCommand(command string) {
- dlog.Server.Info(h.user, command)
- s := strings.Split(command, " ")
- dlog.Server.Debug(h.user, "Receiving command", command, s)
-
- switch s[0] {
- case "health":
- h.serverMessages <- "OK: DTail SSH Server seems fine"
- h.serverMessages <- "done;"
- default:
- h.serverMessages <- dlog.Server.Error(h.user, "Received unknown control command", command, s)
- }
-}
diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go
new file mode 100644
index 0000000..3f3b932
--- /dev/null
+++ b/internal/server/handlers/healthhandler.go
@@ -0,0 +1,58 @@
+package handlers
+
+import (
+ "context"
+ "os"
+ "strings"
+
+ "github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/line"
+ user "github.com/mimecast/dtail/internal/user/server"
+)
+
+// HealthHandler is for the remote health check.
+type HealthHandler struct {
+ baseHandler
+}
+
+// NewHealthHandler returns the server handler.
+func NewHealthHandler(user *user.User) *HealthHandler {
+ dlog.Server.Debug(user, "Creating new server health handler")
+ h := HealthHandler{
+ baseHandler: baseHandler{
+ done: internal.NewDone(),
+ lines: make(chan line.Line, 100),
+ serverMessages: make(chan string, 10),
+ maprMessages: make(chan string, 10),
+ ackCloseReceived: make(chan struct{}),
+ user: user,
+ },
+ }
+ h.handleCommandCb = h.handleHealthCommand
+
+ fqdn, err := os.Hostname()
+ if err != nil {
+ dlog.Server.FatalPanic(err)
+ }
+
+ s := strings.Split(fqdn, ".")
+ h.hostname = s[0]
+
+ return &h
+}
+
+func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int, args []string,
+ commandName string, options map[string]string) {
+ dlog.Server.Debug(h.user, "Handling health command", argc, args)
+
+ switch commandName {
+ case "health":
+ h.send(h.serverMessages, "OK: DTail SSH Server seems fine")
+ case "ack", ".ack":
+ h.handleAckCommand(argc, args)
+ default:
+ h.send(h.serverMessages, dlog.Server.Error(h.user, "Received unknown health command", commandName, argc, args))
+ }
+ h.shutdown()
+}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 25cb8ba..aaffe14 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -6,7 +6,6 @@ import (
"strings"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/omode"
@@ -25,6 +24,7 @@ type ServerHandler struct {
// NewServerHandler returns the server handler.
func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler {
+ dlog.Server.Debug(user, "Creating new server handler")
h := ServerHandler{
baseHandler: baseHandler{
done: internal.NewDone(),
@@ -51,7 +51,9 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S
return &h
}
-func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) {
+func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string,
+ commandName string, options map[string]string) {
+
dlog.Server.Debug(h.user, "Handling user command", argc, args)
h.incrementActiveCommands()
@@ -61,16 +63,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
}
}
- splitted := strings.Split(args[0], ":")
- commandName := splitted[0]
-
- options, err := config.DeserializeOptions(splitted[1:])
- if err != nil {
- h.send(h.serverMessages, dlog.Server.Error(h.user, err))
- commandFinished()
- return
- }
-
if quiet, _ := options["quiet"]; quiet == "true" {
dlog.Server.Debug(h.user, "Enabling quiet mode")
h.quiet = true
diff --git a/internal/server/server.go b/internal/server/server.go
index d1cd57d..b3d4bff 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -162,8 +162,8 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
case "shell":
var handler handlers.Handler
switch user.Name {
- case config.ControlUser:
- handler = handlers.NewControlHandler(user)
+ case config.HealthUser:
+ handler = handlers.NewHealthHandler(user)
default:
handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter)
}
@@ -234,9 +234,9 @@ func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Perm
remoteIP := splitted[0]
switch user.Name {
- case config.ControlUser:
- if authInfo == config.ControlUser {
- dlog.Server.Debug(user, "Granting permissions to control user")
+ case config.HealthUser:
+ if authInfo == config.HealthUser {
+ dlog.Server.Debug(user, "Granting permissions to health user")
return nil, nil
}
case config.ScheduleUser: