summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-06 09:50:41 +0300
committerPaul Buetow <paul@buetow.org>2021-10-06 09:50:41 +0300
commitfab5dc3e70434ea0abc7a0976487a1973b662331 (patch)
tree61a06e166f225b69f09966e81ae725f960fd80be
parent9f395a03f25941d8ed98ec43035688daa1e8877f (diff)
enable faster shutdown - useful for dgrep/dmap and dcat commands
-rw-r--r--cmd/dtail/main.go2
-rw-r--r--internal/clients/baseclient.go14
-rw-r--r--internal/clients/connectors/serverless.go15
-rw-r--r--internal/clients/handlers/basehandler.go8
-rw-r--r--internal/clients/handlers/healthhandler.go14
-rw-r--r--internal/clients/handlers/maprhandler.go2
-rw-r--r--internal/clients/healthclient.go4
-rw-r--r--internal/config/config.go4
-rw-r--r--internal/io/dlog/dlog.go2
-rw-r--r--internal/io/fs/permissions/permission.go2
-rw-r--r--internal/server/handlers/basehandler.go20
-rw-r--r--internal/server/handlers/controlhandler.go98
-rw-r--r--internal/server/handlers/healthhandler.go58
-rw-r--r--internal/server/handlers/serverhandler.go16
-rw-r--r--internal/server/server.go10
15 files changed, 119 insertions, 150 deletions
diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go
index 820323c..d333825 100644
--- a/cmd/dtail/main.go
+++ b/cmd/dtail/main.go
@@ -97,8 +97,8 @@ func main() {
dlog.Start(ctx, &wg, sourceProcess, config.Common.LogLevel)
if checkHealth {
+ defer cancel()
healthClient, _ := clients.NewHealthClient(args)
- cancel()
os.Exit(healthClient.Start(ctx, signal.InterruptCh(ctx)))
}
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 9574e13..b474208 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -39,8 +39,7 @@ type baseClient struct {
}
func (c *baseClient) init() {
- dlog.Client.Debug("Initiating base client")
- dlog.Client.Debug(c.Args.String())
+ dlog.Client.Debug("Initiating base client", c.Args.String())
flag := regex.Default
if c.Args.RegexInvert {
@@ -48,15 +47,16 @@ func (c *baseClient) init() {
}
regex, err := regex.New(c.Args.RegexStr, flag)
if err != nil {
- dlog.Client.FatalPanic(c.Regex, "invalid regex!", err, regex)
+ dlog.Client.FatalPanic(c.Regex, "Invalid regex!", err, regex)
}
c.Regex = regex
- dlog.Client.Debug("Regex", c.Regex)
if c.Args.Serverless {
return
}
- c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, c.throttleCh, c.Args.PrivateKeyPathFile)
+ c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(
+ c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts,
+ c.throttleCh, c.Args.PrivateKeyPathFile)
}
func (c *baseClient) makeConnections(maker maker) {
@@ -71,6 +71,7 @@ func (c *baseClient) makeConnections(maker maker) {
}
func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
+ dlog.Client.Trace("Starting base client")
// Can be nil when serverless.
if c.hostKeyCallback != nil {
// Periodically check for unknown hosts, and ask the user whether to trust them or not.
@@ -81,13 +82,12 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
var wg sync.WaitGroup
wg.Add(len(c.connections))
-
var mutex sync.Mutex
+
for i, conn := range c.connections {
go func(i int, conn connectors.Connector) {
defer wg.Done()
connStatus := c.startConnection(ctx, i, conn)
-
// Update global status.
mutex.Lock()
defer mutex.Unlock()
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index 2a1cec4..768a5ce 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -37,6 +37,7 @@ func (s *Serverless) Handler() handlers.Handler {
}
func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) {
+ dlog.Client.Debug("Starting serverless connector")
go func() {
defer cancel()
@@ -44,7 +45,6 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt
dlog.Client.Warn(err)
}
}()
-
<-ctx.Done()
}
@@ -58,9 +58,11 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
var serverHandler serverHandlers.Handler
switch s.userName {
- case config.ControlUser:
- serverHandler = serverHandlers.NewControlHandler(user)
+ case config.HealthUser:
+ dlog.Client.Debug("Creating serverless health handler")
+ serverHandler = serverHandlers.NewHealthHandler(user)
default:
+ dlog.Client.Debug("Creating serverless server handler")
serverHandler = serverHandlers.NewServerHandler(
user,
make(chan struct{}, config.Server.MaxConcurrentCats),
@@ -76,29 +78,34 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
go func() {
io.Copy(serverHandler, s.handler)
+ dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done")
terminate()
}()
go func() {
io.Copy(s.handler, serverHandler)
+ dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done")
terminate()
}()
go func() {
select {
case <-s.handler.Done():
+ dlog.Client.Trace("<-s.handler.Done()")
case <-ctx.Done():
+ dlog.Client.Trace("<-ctx.Done()")
}
terminate()
}()
// Send all commands to client.
for _, command := range s.commands {
- dlog.Client.Debug(command)
+ dlog.Client.Debug("Sending command to serverless server", command)
s.handler.SendMessage(command)
}
<-ctx.Done()
+ dlog.Client.Trace("s.handler.Shutdown()")
s.handler.Shutdown()
return nil
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 04124e7..b520c25 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -69,7 +69,7 @@ func (h *baseHandler) Write(p []byte) (n int, err error) {
*/
case '\n', protocol.MessageDelimiter:
message := h.receiveBuf.String()
- h.handleMessageType(message)
+ h.handleMessage(message)
h.receiveBuf.Reset()
default:
h.receiveBuf.WriteByte(b)
@@ -90,9 +90,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
return
}
-// Handle various message types.
-func (h *baseHandler) handleMessageType(message string) {
- // Hidden server commands starti with a dot "."
+func (h *baseHandler) handleMessage(message string) {
if len(message) > 0 && message[0] == '.' {
h.handleHiddenMessage(message)
return
@@ -106,7 +104,7 @@ func (h *baseHandler) handleMessageType(message string) {
func (h *baseHandler) handleHiddenMessage(message string) {
switch {
case strings.HasPrefix(message, ".syn close connection"):
- h.SendMessage(".ack close connection")
+ go h.SendMessage(".ack close connection")
h.Shutdown()
}
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 4949985..4b16ce4 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -12,7 +12,6 @@ import (
// HealthHandler is the handler used on the client side for running mapreduce aggregations.
type HealthHandler struct {
baseHandler
- HealthStatusCh chan<- int
}
// NewHealthHandler returns a new health client handler.
@@ -26,7 +25,6 @@ func NewHealthHandler(server string) *HealthHandler {
status: -1,
done: internal.NewDone(),
},
- HealthStatusCh: make(chan int),
}
}
@@ -34,12 +32,10 @@ func NewHealthHandler(server string) *HealthHandler {
func (h *HealthHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
switch b {
- case '\n':
- continue
- case protocol.MessageDelimiter:
+ case '\n', protocol.MessageDelimiter:
message := h.baseHandler.receiveBuf.String()
dlog.Client.Debug(message)
- h.handleHealthMessage(message)
+ h.handleMessage(message)
h.baseHandler.receiveBuf.Reset()
default:
h.baseHandler.receiveBuf.WriteByte(b)
@@ -49,7 +45,11 @@ func (h *HealthHandler) Write(p []byte) (n int, err error) {
return len(p), nil
}
-func (h *HealthHandler) handleHealthMessage(message string) {
+func (h *HealthHandler) handleMessage(message string) {
+ if len(message) > 0 && message[0] == '.' {
+ h.baseHandler.handleHiddenMessage(message)
+ return
+ }
s := strings.Split(message, protocol.FieldDelimiter)
message = s[len(s)-1]
status := strings.Split(message, ":")
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index 848e7f0..d1acfbd 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -44,7 +44,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
if message[0] == 'A' {
h.handleAggregateMessage(message)
} else {
- h.baseHandler.handleMessageType(message)
+ h.baseHandler.handleMessage(message)
}
h.baseHandler.receiveBuf.Reset()
default:
diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go
index df919ae..e2c8ccb 100644
--- a/internal/clients/healthclient.go
+++ b/internal/clients/healthclient.go
@@ -18,7 +18,7 @@ type HealthClient struct {
// NewHealthClient returns a new health client.
func NewHealthClient(args config.Args) (*HealthClient, error) {
args.Mode = omode.HealthClient
- args.UserName = config.ControlUser
+ args.UserName = config.HealthUser
c := HealthClient{
baseClient: baseClient{
Args: args,
@@ -28,7 +28,7 @@ func NewHealthClient(args config.Args) (*HealthClient, error) {
}
c.init()
- c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser))
+ c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser))
c.makeConnections(c)
return &c, nil
diff --git a/internal/config/config.go b/internal/config/config.go
index d58162f..f216688 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -3,8 +3,8 @@ package config
import "github.com/mimecast/dtail/internal/source"
const (
- // ControlUser is used for various DTail specific operations.
- ControlUser string = "DTAIL-CONTROL"
+ // HealthUser is used for the health check
+ HealthUser string = "DTAIL-HEALTH"
// ScheduleUser is used for non-interactive scheduled mapreduce queries.
ScheduleUser string = "DTAIL-SCHEDULE"
// ContinuousUser is used for non-interactive continuous mapreduce queries.
diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go
index db99307..a17d6e9 100644
--- a/internal/io/dlog/dlog.go
+++ b/internal/io/dlog/dlog.go
@@ -201,6 +201,8 @@ func (d *DLog) Debug(args ...interface{}) string {
}
func (d *DLog) Trace(args ...interface{}) string {
+ _, file, line, _ := runtime.Caller(1)
+ args = append(args, fmt.Sprintf("at %s:%d", file, line))
return d.log(TRACE, args)
}
diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go
index bbcb74e..e80dbb2 100644
--- a/internal/io/fs/permissions/permission.go
+++ b/internal/io/fs/permissions/permission.go
@@ -9,6 +9,6 @@ import (
// ToRead is to check whether user has read permissions to a given file.
func ToRead(user, filePath string) (bool, error) {
// Only implemented for Linux, always expect true
- dlog.Common.Warn(user, filePath, "Not performing ACL check, not supported on this platform")
+ dlog.Common.Warn(user, filePath, "Not performing ACL check as not compiled in")
return true, nil
}
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index b683578..4fa8f00 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -13,6 +13,7 @@ import (
"time"
"github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/io/pool"
@@ -21,7 +22,7 @@ import (
user "github.com/mimecast/dtail/internal/user/server"
)
-type handleCommandCb func(context.Context, int, []string)
+type handleCommandCb func(context.Context, int, []string, string, map[string]string)
type baseHandler struct {
done *internal.Done
@@ -157,7 +158,16 @@ func (h *baseHandler) handleCommand(commandStr string) {
cancel()
}()
- h.handleCommandCb(ctx, argc, args)
+ splitted := strings.Split(args[0], ":")
+ commandName := splitted[0]
+
+ options, err := config.DeserializeOptions(splitted[1:])
+ if err != nil {
+ h.send(h.serverMessages, dlog.Server.Error(h.user, err))
+ return
+ }
+
+ h.handleCommandCb(ctx, argc, args, commandName, options)
}
func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) {
@@ -234,19 +244,19 @@ func (h *baseHandler) send(ch chan<- string, message string) {
}
func (h *baseHandler) flush() {
- dlog.Server.Debug(h.user, "flush()")
+ dlog.Server.Trace(h.user, "flush()")
numUnsentMessages := func() int {
return len(h.lines) + len(h.serverMessages) + len(h.maprMessages)
}
- for i := 0; i < 3; i++ {
+ for i := 0; i < 10; i++ {
if numUnsentMessages() == 0 {
dlog.Server.Debug(h.user, "All lines sent", fmt.Sprintf("%p", h))
return
}
dlog.Server.Debug(h.user, "Still lines to be sent")
- time.Sleep(time.Second)
+ time.Sleep(time.Millisecond * 10)
}
dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages())
diff --git a/internal/server/handlers/controlhandler.go b/internal/server/handlers/controlhandler.go
deleted file mode 100644
index ae70675..0000000
--- a/internal/server/handlers/controlhandler.go
+++ /dev/null
@@ -1,98 +0,0 @@
-package handlers
-
-import (
- "fmt"
- "io"
- "os"
- "strings"
-
- "github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/io/dlog"
- user "github.com/mimecast/dtail/internal/user/server"
-)
-
-// ControlHandler is used for control functions and health monitoring.
-type ControlHandler struct {
- done *internal.Done
- hostname string
- payload []byte
- serverMessages chan string
- user *user.User
-}
-
-// NewControlHandler returns a new control handler.
-func NewControlHandler(user *user.User) *ControlHandler {
- dlog.Server.Debug(user, "Creating control handler")
-
- h := ControlHandler{
- done: internal.NewDone(),
- serverMessages: make(chan string, 10),
- user: user,
- }
-
- fqdn, err := os.Hostname()
- if err != nil {
- dlog.Server.FatalPanic(err)
- }
-
- s := strings.Split(fqdn, ".")
- h.hostname = s[0]
-
- return &h
-}
-
-// Shutdown the handler.
-func (h *ControlHandler) Shutdown() {
- h.done.Shutdown()
-}
-
-// Done channel of the handler.
-func (h *ControlHandler) Done() <-chan struct{} {
- return h.done.Done()
-}
-
-// Read is to send data to the client via the Reader interface.
-func (h *ControlHandler) Read(p []byte) (n int, err error) {
- for {
- select {
- case message := <-h.serverMessages:
- wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s\n", h.hostname, message))
- n = copy(p, wholePayload)
- return
- case <-h.done.Done():
- return 0, io.EOF
- }
- }
-}
-
-// Write is to read data to the client via the Writer interface.
-func (h *ControlHandler) Write(p []byte) (n int, err error) {
- for _, c := range p {
- switch c {
- case ';':
- wholePayload := strings.TrimSpace(string(h.payload))
- h.handleCommand(wholePayload)
- h.payload = nil
-
- default:
- h.payload = append(h.payload, c)
- }
- }
-
- n = len(p)
- return
-}
-
-func (h *ControlHandler) handleCommand(command string) {
- dlog.Server.Info(h.user, command)
- s := strings.Split(command, " ")
- dlog.Server.Debug(h.user, "Receiving command", command, s)
-
- switch s[0] {
- case "health":
- h.serverMessages <- "OK: DTail SSH Server seems fine"
- h.serverMessages <- "done;"
- default:
- h.serverMessages <- dlog.Server.Error(h.user, "Received unknown control command", command, s)
- }
-}
diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go
new file mode 100644
index 0000000..3f3b932
--- /dev/null
+++ b/internal/server/handlers/healthhandler.go
@@ -0,0 +1,58 @@
+package handlers
+
+import (
+ "context"
+ "os"
+ "strings"
+
+ "github.com/mimecast/dtail/internal"
+ "github.com/mimecast/dtail/internal/io/dlog"
+ "github.com/mimecast/dtail/internal/io/line"
+ user "github.com/mimecast/dtail/internal/user/server"
+)
+
+// HealthHandler is for the remote health check.
+type HealthHandler struct {
+ baseHandler
+}
+
+// NewHealthHandler returns the server handler.
+func NewHealthHandler(user *user.User) *HealthHandler {
+ dlog.Server.Debug(user, "Creating new server health handler")
+ h := HealthHandler{
+ baseHandler: baseHandler{
+ done: internal.NewDone(),
+ lines: make(chan line.Line, 100),
+ serverMessages: make(chan string, 10),
+ maprMessages: make(chan string, 10),
+ ackCloseReceived: make(chan struct{}),
+ user: user,
+ },
+ }
+ h.handleCommandCb = h.handleHealthCommand
+
+ fqdn, err := os.Hostname()
+ if err != nil {
+ dlog.Server.FatalPanic(err)
+ }
+
+ s := strings.Split(fqdn, ".")
+ h.hostname = s[0]
+
+ return &h
+}
+
+func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int, args []string,
+ commandName string, options map[string]string) {
+ dlog.Server.Debug(h.user, "Handling health command", argc, args)
+
+ switch commandName {
+ case "health":
+ h.send(h.serverMessages, "OK: DTail SSH Server seems fine")
+ case "ack", ".ack":
+ h.handleAckCommand(argc, args)
+ default:
+ h.send(h.serverMessages, dlog.Server.Error(h.user, "Received unknown health command", commandName, argc, args))
+ }
+ h.shutdown()
+}
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 25cb8ba..aaffe14 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -6,7 +6,6 @@ import (
"strings"
"github.com/mimecast/dtail/internal"
- "github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
"github.com/mimecast/dtail/internal/io/line"
"github.com/mimecast/dtail/internal/omode"
@@ -25,6 +24,7 @@ type ServerHandler struct {
// NewServerHandler returns the server handler.
func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler {
+ dlog.Server.Debug(user, "Creating new server handler")
h := ServerHandler{
baseHandler: baseHandler{
done: internal.NewDone(),
@@ -51,7 +51,9 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S
return &h
}
-func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) {
+func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string,
+ commandName string, options map[string]string) {
+
dlog.Server.Debug(h.user, "Handling user command", argc, args)
h.incrementActiveCommands()
@@ -61,16 +63,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []
}
}
- splitted := strings.Split(args[0], ":")
- commandName := splitted[0]
-
- options, err := config.DeserializeOptions(splitted[1:])
- if err != nil {
- h.send(h.serverMessages, dlog.Server.Error(h.user, err))
- commandFinished()
- return
- }
-
if quiet, _ := options["quiet"]; quiet == "true" {
dlog.Server.Debug(h.user, "Enabling quiet mode")
h.quiet = true
diff --git a/internal/server/server.go b/internal/server/server.go
index d1cd57d..b3d4bff 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -162,8 +162,8 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch
case "shell":
var handler handlers.Handler
switch user.Name {
- case config.ControlUser:
- handler = handlers.NewControlHandler(user)
+ case config.HealthUser:
+ handler = handlers.NewHealthHandler(user)
default:
handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter)
}
@@ -234,9 +234,9 @@ func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Perm
remoteIP := splitted[0]
switch user.Name {
- case config.ControlUser:
- if authInfo == config.ControlUser {
- dlog.Server.Debug(user, "Granting permissions to control user")
+ case config.HealthUser:
+ if authInfo == config.HealthUser {
+ dlog.Server.Debug(user, "Granting permissions to health user")
return nil, nil
}
case config.ScheduleUser: