diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-06 09:50:41 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-06 09:50:41 +0300 |
| commit | fab5dc3e70434ea0abc7a0976487a1973b662331 (patch) | |
| tree | 61a06e166f225b69f09966e81ae725f960fd80be | |
| parent | 9f395a03f25941d8ed98ec43035688daa1e8877f (diff) | |
enable faster shutdown - useful for dgrep/dmap and dcat commands
| -rw-r--r-- | cmd/dtail/main.go | 2 | ||||
| -rw-r--r-- | internal/clients/baseclient.go | 14 | ||||
| -rw-r--r-- | internal/clients/connectors/serverless.go | 15 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 8 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 14 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 2 | ||||
| -rw-r--r-- | internal/clients/healthclient.go | 4 | ||||
| -rw-r--r-- | internal/config/config.go | 4 | ||||
| -rw-r--r-- | internal/io/dlog/dlog.go | 2 | ||||
| -rw-r--r-- | internal/io/fs/permissions/permission.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/basehandler.go | 20 | ||||
| -rw-r--r-- | internal/server/handlers/controlhandler.go | 98 | ||||
| -rw-r--r-- | internal/server/handlers/healthhandler.go | 58 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 16 | ||||
| -rw-r--r-- | internal/server/server.go | 10 |
15 files changed, 119 insertions, 150 deletions
diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go index 820323c..d333825 100644 --- a/cmd/dtail/main.go +++ b/cmd/dtail/main.go @@ -97,8 +97,8 @@ func main() { dlog.Start(ctx, &wg, sourceProcess, config.Common.LogLevel) if checkHealth { + defer cancel() healthClient, _ := clients.NewHealthClient(args) - cancel() os.Exit(healthClient.Start(ctx, signal.InterruptCh(ctx))) } diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 9574e13..b474208 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -39,8 +39,7 @@ type baseClient struct { } func (c *baseClient) init() { - dlog.Client.Debug("Initiating base client") - dlog.Client.Debug(c.Args.String()) + dlog.Client.Debug("Initiating base client", c.Args.String()) flag := regex.Default if c.Args.RegexInvert { @@ -48,15 +47,16 @@ func (c *baseClient) init() { } regex, err := regex.New(c.Args.RegexStr, flag) if err != nil { - dlog.Client.FatalPanic(c.Regex, "invalid regex!", err, regex) + dlog.Client.FatalPanic(c.Regex, "Invalid regex!", err, regex) } c.Regex = regex - dlog.Client.Debug("Regex", c.Regex) if c.Args.Serverless { return } - c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, c.throttleCh, c.Args.PrivateKeyPathFile) + c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods( + c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, + c.throttleCh, c.Args.PrivateKeyPathFile) } func (c *baseClient) makeConnections(maker maker) { @@ -71,6 +71,7 @@ func (c *baseClient) makeConnections(maker maker) { } func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) { + dlog.Client.Trace("Starting base client") // Can be nil when serverless. if c.hostKeyCallback != nil { // Periodically check for unknown hosts, and ask the user whether to trust them or not. @@ -81,13 +82,12 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i var wg sync.WaitGroup wg.Add(len(c.connections)) - var mutex sync.Mutex + for i, conn := range c.connections { go func(i int, conn connectors.Connector) { defer wg.Done() connStatus := c.startConnection(ctx, i, conn) - // Update global status. mutex.Lock() defer mutex.Unlock() diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index 2a1cec4..768a5ce 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -37,6 +37,7 @@ func (s *Serverless) Handler() handlers.Handler { } func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { + dlog.Client.Debug("Starting serverless connector") go func() { defer cancel() @@ -44,7 +45,6 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt dlog.Client.Warn(err) } }() - <-ctx.Done() } @@ -58,9 +58,11 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro var serverHandler serverHandlers.Handler switch s.userName { - case config.ControlUser: - serverHandler = serverHandlers.NewControlHandler(user) + case config.HealthUser: + dlog.Client.Debug("Creating serverless health handler") + serverHandler = serverHandlers.NewHealthHandler(user) default: + dlog.Client.Debug("Creating serverless server handler") serverHandler = serverHandlers.NewServerHandler( user, make(chan struct{}, config.Server.MaxConcurrentCats), @@ -76,29 +78,34 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro go func() { io.Copy(serverHandler, s.handler) + dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done") terminate() }() go func() { io.Copy(s.handler, serverHandler) + dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done") terminate() }() go func() { select { case <-s.handler.Done(): + dlog.Client.Trace("<-s.handler.Done()") case <-ctx.Done(): + dlog.Client.Trace("<-ctx.Done()") } terminate() }() // Send all commands to client. for _, command := range s.commands { - dlog.Client.Debug(command) + dlog.Client.Debug("Sending command to serverless server", command) s.handler.SendMessage(command) } <-ctx.Done() + dlog.Client.Trace("s.handler.Shutdown()") s.handler.Shutdown() return nil diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 04124e7..b520c25 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -69,7 +69,7 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { */ case '\n', protocol.MessageDelimiter: message := h.receiveBuf.String() - h.handleMessageType(message) + h.handleMessage(message) h.receiveBuf.Reset() default: h.receiveBuf.WriteByte(b) @@ -90,9 +90,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { return } -// Handle various message types. -func (h *baseHandler) handleMessageType(message string) { - // Hidden server commands starti with a dot "." +func (h *baseHandler) handleMessage(message string) { if len(message) > 0 && message[0] == '.' { h.handleHiddenMessage(message) return @@ -106,7 +104,7 @@ func (h *baseHandler) handleMessageType(message string) { func (h *baseHandler) handleHiddenMessage(message string) { switch { case strings.HasPrefix(message, ".syn close connection"): - h.SendMessage(".ack close connection") + go h.SendMessage(".ack close connection") h.Shutdown() } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 4949985..4b16ce4 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -12,7 +12,6 @@ import ( // HealthHandler is the handler used on the client side for running mapreduce aggregations. type HealthHandler struct { baseHandler - HealthStatusCh chan<- int } // NewHealthHandler returns a new health client handler. @@ -26,7 +25,6 @@ func NewHealthHandler(server string) *HealthHandler { status: -1, done: internal.NewDone(), }, - HealthStatusCh: make(chan int), } } @@ -34,12 +32,10 @@ func NewHealthHandler(server string) *HealthHandler { func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { switch b { - case '\n': - continue - case protocol.MessageDelimiter: + case '\n', protocol.MessageDelimiter: message := h.baseHandler.receiveBuf.String() dlog.Client.Debug(message) - h.handleHealthMessage(message) + h.handleMessage(message) h.baseHandler.receiveBuf.Reset() default: h.baseHandler.receiveBuf.WriteByte(b) @@ -49,7 +45,11 @@ func (h *HealthHandler) Write(p []byte) (n int, err error) { return len(p), nil } -func (h *HealthHandler) handleHealthMessage(message string) { +func (h *HealthHandler) handleMessage(message string) { + if len(message) > 0 && message[0] == '.' { + h.baseHandler.handleHiddenMessage(message) + return + } s := strings.Split(message, protocol.FieldDelimiter) message = s[len(s)-1] status := strings.Split(message, ":") diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 848e7f0..d1acfbd 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -44,7 +44,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { if message[0] == 'A' { h.handleAggregateMessage(message) } else { - h.baseHandler.handleMessageType(message) + h.baseHandler.handleMessage(message) } h.baseHandler.receiveBuf.Reset() default: diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index df919ae..e2c8ccb 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -18,7 +18,7 @@ type HealthClient struct { // NewHealthClient returns a new health client. func NewHealthClient(args config.Args) (*HealthClient, error) { args.Mode = omode.HealthClient - args.UserName = config.ControlUser + args.UserName = config.HealthUser c := HealthClient{ baseClient: baseClient{ Args: args, @@ -28,7 +28,7 @@ func NewHealthClient(args config.Args) (*HealthClient, error) { } c.init() - c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser)) + c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser)) c.makeConnections(c) return &c, nil diff --git a/internal/config/config.go b/internal/config/config.go index d58162f..f216688 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,8 +3,8 @@ package config import "github.com/mimecast/dtail/internal/source" const ( - // ControlUser is used for various DTail specific operations. - ControlUser string = "DTAIL-CONTROL" + // HealthUser is used for the health check + HealthUser string = "DTAIL-HEALTH" // ScheduleUser is used for non-interactive scheduled mapreduce queries. ScheduleUser string = "DTAIL-SCHEDULE" // ContinuousUser is used for non-interactive continuous mapreduce queries. diff --git a/internal/io/dlog/dlog.go b/internal/io/dlog/dlog.go index db99307..a17d6e9 100644 --- a/internal/io/dlog/dlog.go +++ b/internal/io/dlog/dlog.go @@ -201,6 +201,8 @@ func (d *DLog) Debug(args ...interface{}) string { } func (d *DLog) Trace(args ...interface{}) string { + _, file, line, _ := runtime.Caller(1) + args = append(args, fmt.Sprintf("at %s:%d", file, line)) return d.log(TRACE, args) } diff --git a/internal/io/fs/permissions/permission.go b/internal/io/fs/permissions/permission.go index bbcb74e..e80dbb2 100644 --- a/internal/io/fs/permissions/permission.go +++ b/internal/io/fs/permissions/permission.go @@ -9,6 +9,6 @@ import ( // ToRead is to check whether user has read permissions to a given file. func ToRead(user, filePath string) (bool, error) { // Only implemented for Linux, always expect true - dlog.Common.Warn(user, filePath, "Not performing ACL check, not supported on this platform") + dlog.Common.Warn(user, filePath, "Not performing ACL check as not compiled in") return true, nil } diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index b683578..4fa8f00 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -13,6 +13,7 @@ import ( "time" "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/io/pool" @@ -21,7 +22,7 @@ import ( user "github.com/mimecast/dtail/internal/user/server" ) -type handleCommandCb func(context.Context, int, []string) +type handleCommandCb func(context.Context, int, []string, string, map[string]string) type baseHandler struct { done *internal.Done @@ -157,7 +158,16 @@ func (h *baseHandler) handleCommand(commandStr string) { cancel() }() - h.handleCommandCb(ctx, argc, args) + splitted := strings.Split(args[0], ":") + commandName := splitted[0] + + options, err := config.DeserializeOptions(splitted[1:]) + if err != nil { + h.send(h.serverMessages, dlog.Server.Error(h.user, err)) + return + } + + h.handleCommandCb(ctx, argc, args, commandName, options) } func (h *baseHandler) handleProtocolVersion(args []string) ([]string, int, string, error) { @@ -234,19 +244,19 @@ func (h *baseHandler) send(ch chan<- string, message string) { } func (h *baseHandler) flush() { - dlog.Server.Debug(h.user, "flush()") + dlog.Server.Trace(h.user, "flush()") numUnsentMessages := func() int { return len(h.lines) + len(h.serverMessages) + len(h.maprMessages) } - for i := 0; i < 3; i++ { + for i := 0; i < 10; i++ { if numUnsentMessages() == 0 { dlog.Server.Debug(h.user, "All lines sent", fmt.Sprintf("%p", h)) return } dlog.Server.Debug(h.user, "Still lines to be sent") - time.Sleep(time.Second) + time.Sleep(time.Millisecond * 10) } dlog.Server.Warn(h.user, "Some lines remain unsent", numUnsentMessages()) diff --git a/internal/server/handlers/controlhandler.go b/internal/server/handlers/controlhandler.go deleted file mode 100644 index ae70675..0000000 --- a/internal/server/handlers/controlhandler.go +++ /dev/null @@ -1,98 +0,0 @@ -package handlers - -import ( - "fmt" - "io" - "os" - "strings" - - "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/io/dlog" - user "github.com/mimecast/dtail/internal/user/server" -) - -// ControlHandler is used for control functions and health monitoring. -type ControlHandler struct { - done *internal.Done - hostname string - payload []byte - serverMessages chan string - user *user.User -} - -// NewControlHandler returns a new control handler. -func NewControlHandler(user *user.User) *ControlHandler { - dlog.Server.Debug(user, "Creating control handler") - - h := ControlHandler{ - done: internal.NewDone(), - serverMessages: make(chan string, 10), - user: user, - } - - fqdn, err := os.Hostname() - if err != nil { - dlog.Server.FatalPanic(err) - } - - s := strings.Split(fqdn, ".") - h.hostname = s[0] - - return &h -} - -// Shutdown the handler. -func (h *ControlHandler) Shutdown() { - h.done.Shutdown() -} - -// Done channel of the handler. -func (h *ControlHandler) Done() <-chan struct{} { - return h.done.Done() -} - -// Read is to send data to the client via the Reader interface. -func (h *ControlHandler) Read(p []byte) (n int, err error) { - for { - select { - case message := <-h.serverMessages: - wholePayload := []byte(fmt.Sprintf("SERVER|%s|%s\n", h.hostname, message)) - n = copy(p, wholePayload) - return - case <-h.done.Done(): - return 0, io.EOF - } - } -} - -// Write is to read data to the client via the Writer interface. -func (h *ControlHandler) Write(p []byte) (n int, err error) { - for _, c := range p { - switch c { - case ';': - wholePayload := strings.TrimSpace(string(h.payload)) - h.handleCommand(wholePayload) - h.payload = nil - - default: - h.payload = append(h.payload, c) - } - } - - n = len(p) - return -} - -func (h *ControlHandler) handleCommand(command string) { - dlog.Server.Info(h.user, command) - s := strings.Split(command, " ") - dlog.Server.Debug(h.user, "Receiving command", command, s) - - switch s[0] { - case "health": - h.serverMessages <- "OK: DTail SSH Server seems fine" - h.serverMessages <- "done;" - default: - h.serverMessages <- dlog.Server.Error(h.user, "Received unknown control command", command, s) - } -} diff --git a/internal/server/handlers/healthhandler.go b/internal/server/handlers/healthhandler.go new file mode 100644 index 0000000..3f3b932 --- /dev/null +++ b/internal/server/handlers/healthhandler.go @@ -0,0 +1,58 @@ +package handlers + +import ( + "context" + "os" + "strings" + + "github.com/mimecast/dtail/internal" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/io/line" + user "github.com/mimecast/dtail/internal/user/server" +) + +// HealthHandler is for the remote health check. +type HealthHandler struct { + baseHandler +} + +// NewHealthHandler returns the server handler. +func NewHealthHandler(user *user.User) *HealthHandler { + dlog.Server.Debug(user, "Creating new server health handler") + h := HealthHandler{ + baseHandler: baseHandler{ + done: internal.NewDone(), + lines: make(chan line.Line, 100), + serverMessages: make(chan string, 10), + maprMessages: make(chan string, 10), + ackCloseReceived: make(chan struct{}), + user: user, + }, + } + h.handleCommandCb = h.handleHealthCommand + + fqdn, err := os.Hostname() + if err != nil { + dlog.Server.FatalPanic(err) + } + + s := strings.Split(fqdn, ".") + h.hostname = s[0] + + return &h +} + +func (h *HealthHandler) handleHealthCommand(ctx context.Context, argc int, args []string, + commandName string, options map[string]string) { + dlog.Server.Debug(h.user, "Handling health command", argc, args) + + switch commandName { + case "health": + h.send(h.serverMessages, "OK: DTail SSH Server seems fine") + case "ack", ".ack": + h.handleAckCommand(argc, args) + default: + h.send(h.serverMessages, dlog.Server.Error(h.user, "Received unknown health command", commandName, argc, args)) + } + h.shutdown() +} diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 25cb8ba..aaffe14 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -6,7 +6,6 @@ import ( "strings" "github.com/mimecast/dtail/internal" - "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" "github.com/mimecast/dtail/internal/io/line" "github.com/mimecast/dtail/internal/omode" @@ -25,6 +24,7 @@ type ServerHandler struct { // NewServerHandler returns the server handler. func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *ServerHandler { + dlog.Server.Debug(user, "Creating new server handler") h := ServerHandler{ baseHandler: baseHandler{ done: internal.NewDone(), @@ -51,7 +51,9 @@ func NewServerHandler(user *user.User, catLimiter, tailLimiter chan struct{}) *S return &h } -func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string) { +func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args []string, + commandName string, options map[string]string) { + dlog.Server.Debug(h.user, "Handling user command", argc, args) h.incrementActiveCommands() @@ -61,16 +63,6 @@ func (h *ServerHandler) handleUserCommand(ctx context.Context, argc int, args [] } } - splitted := strings.Split(args[0], ":") - commandName := splitted[0] - - options, err := config.DeserializeOptions(splitted[1:]) - if err != nil { - h.send(h.serverMessages, dlog.Server.Error(h.user, err)) - commandFinished() - return - } - if quiet, _ := options["quiet"]; quiet == "true" { dlog.Server.Debug(h.user, "Enabling quiet mode") h.quiet = true diff --git a/internal/server/server.go b/internal/server/server.go index d1cd57d..b3d4bff 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -162,8 +162,8 @@ func (s *Server) handleRequests(ctx context.Context, sshConn gossh.Conn, in <-ch case "shell": var handler handlers.Handler switch user.Name { - case config.ControlUser: - handler = handlers.NewControlHandler(user) + case config.HealthUser: + handler = handlers.NewHealthHandler(user) default: handler = handlers.NewServerHandler(user, s.catLimiter, s.tailLimiter) } @@ -234,9 +234,9 @@ func (s *Server) Callback(c gossh.ConnMetadata, authPayload []byte) (*gossh.Perm remoteIP := splitted[0] switch user.Name { - case config.ControlUser: - if authInfo == config.ControlUser { - dlog.Server.Debug(user, "Granting permissions to control user") + case config.HealthUser: + if authInfo == config.HealthUser { + dlog.Server.Debug(user, "Granting permissions to health user") return nil, nil } case config.ScheduleUser: |
