summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-06 09:50:41 +0300
committerPaul Buetow <paul@buetow.org>2021-10-06 09:50:41 +0300
commitfab5dc3e70434ea0abc7a0976487a1973b662331 (patch)
tree61a06e166f225b69f09966e81ae725f960fd80be /internal/clients
parent9f395a03f25941d8ed98ec43035688daa1e8877f (diff)
enable faster shutdown - useful for dgrep/dmap and dcat commands
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/baseclient.go14
-rw-r--r--internal/clients/connectors/serverless.go15
-rw-r--r--internal/clients/handlers/basehandler.go8
-rw-r--r--internal/clients/handlers/healthhandler.go14
-rw-r--r--internal/clients/handlers/maprhandler.go2
-rw-r--r--internal/clients/healthclient.go4
6 files changed, 31 insertions, 26 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 9574e13..b474208 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -39,8 +39,7 @@ type baseClient struct {
}
func (c *baseClient) init() {
- dlog.Client.Debug("Initiating base client")
- dlog.Client.Debug(c.Args.String())
+ dlog.Client.Debug("Initiating base client", c.Args.String())
flag := regex.Default
if c.Args.RegexInvert {
@@ -48,15 +47,16 @@ func (c *baseClient) init() {
}
regex, err := regex.New(c.Args.RegexStr, flag)
if err != nil {
- dlog.Client.FatalPanic(c.Regex, "invalid regex!", err, regex)
+ dlog.Client.FatalPanic(c.Regex, "Invalid regex!", err, regex)
}
c.Regex = regex
- dlog.Client.Debug("Regex", c.Regex)
if c.Args.Serverless {
return
}
- c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, c.throttleCh, c.Args.PrivateKeyPathFile)
+ c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(
+ c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts,
+ c.throttleCh, c.Args.PrivateKeyPathFile)
}
func (c *baseClient) makeConnections(maker maker) {
@@ -71,6 +71,7 @@ func (c *baseClient) makeConnections(maker maker) {
}
func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) {
+ dlog.Client.Trace("Starting base client")
// Can be nil when serverless.
if c.hostKeyCallback != nil {
// Periodically check for unknown hosts, and ask the user whether to trust them or not.
@@ -81,13 +82,12 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
var wg sync.WaitGroup
wg.Add(len(c.connections))
-
var mutex sync.Mutex
+
for i, conn := range c.connections {
go func(i int, conn connectors.Connector) {
defer wg.Done()
connStatus := c.startConnection(ctx, i, conn)
-
// Update global status.
mutex.Lock()
defer mutex.Unlock()
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index 2a1cec4..768a5ce 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -37,6 +37,7 @@ func (s *Serverless) Handler() handlers.Handler {
}
func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) {
+ dlog.Client.Debug("Starting serverless connector")
go func() {
defer cancel()
@@ -44,7 +45,6 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt
dlog.Client.Warn(err)
}
}()
-
<-ctx.Done()
}
@@ -58,9 +58,11 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
var serverHandler serverHandlers.Handler
switch s.userName {
- case config.ControlUser:
- serverHandler = serverHandlers.NewControlHandler(user)
+ case config.HealthUser:
+ dlog.Client.Debug("Creating serverless health handler")
+ serverHandler = serverHandlers.NewHealthHandler(user)
default:
+ dlog.Client.Debug("Creating serverless server handler")
serverHandler = serverHandlers.NewServerHandler(
user,
make(chan struct{}, config.Server.MaxConcurrentCats),
@@ -76,29 +78,34 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
go func() {
io.Copy(serverHandler, s.handler)
+ dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done")
terminate()
}()
go func() {
io.Copy(s.handler, serverHandler)
+ dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done")
terminate()
}()
go func() {
select {
case <-s.handler.Done():
+ dlog.Client.Trace("<-s.handler.Done()")
case <-ctx.Done():
+ dlog.Client.Trace("<-ctx.Done()")
}
terminate()
}()
// Send all commands to client.
for _, command := range s.commands {
- dlog.Client.Debug(command)
+ dlog.Client.Debug("Sending command to serverless server", command)
s.handler.SendMessage(command)
}
<-ctx.Done()
+ dlog.Client.Trace("s.handler.Shutdown()")
s.handler.Shutdown()
return nil
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 04124e7..b520c25 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -69,7 +69,7 @@ func (h *baseHandler) Write(p []byte) (n int, err error) {
*/
case '\n', protocol.MessageDelimiter:
message := h.receiveBuf.String()
- h.handleMessageType(message)
+ h.handleMessage(message)
h.receiveBuf.Reset()
default:
h.receiveBuf.WriteByte(b)
@@ -90,9 +90,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) {
return
}
-// Handle various message types.
-func (h *baseHandler) handleMessageType(message string) {
- // Hidden server commands starti with a dot "."
+func (h *baseHandler) handleMessage(message string) {
if len(message) > 0 && message[0] == '.' {
h.handleHiddenMessage(message)
return
@@ -106,7 +104,7 @@ func (h *baseHandler) handleMessageType(message string) {
func (h *baseHandler) handleHiddenMessage(message string) {
switch {
case strings.HasPrefix(message, ".syn close connection"):
- h.SendMessage(".ack close connection")
+ go h.SendMessage(".ack close connection")
h.Shutdown()
}
}
diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go
index 4949985..4b16ce4 100644
--- a/internal/clients/handlers/healthhandler.go
+++ b/internal/clients/handlers/healthhandler.go
@@ -12,7 +12,6 @@ import (
// HealthHandler is the handler used on the client side for running mapreduce aggregations.
type HealthHandler struct {
baseHandler
- HealthStatusCh chan<- int
}
// NewHealthHandler returns a new health client handler.
@@ -26,7 +25,6 @@ func NewHealthHandler(server string) *HealthHandler {
status: -1,
done: internal.NewDone(),
},
- HealthStatusCh: make(chan int),
}
}
@@ -34,12 +32,10 @@ func NewHealthHandler(server string) *HealthHandler {
func (h *HealthHandler) Write(p []byte) (n int, err error) {
for _, b := range p {
switch b {
- case '\n':
- continue
- case protocol.MessageDelimiter:
+ case '\n', protocol.MessageDelimiter:
message := h.baseHandler.receiveBuf.String()
dlog.Client.Debug(message)
- h.handleHealthMessage(message)
+ h.handleMessage(message)
h.baseHandler.receiveBuf.Reset()
default:
h.baseHandler.receiveBuf.WriteByte(b)
@@ -49,7 +45,11 @@ func (h *HealthHandler) Write(p []byte) (n int, err error) {
return len(p), nil
}
-func (h *HealthHandler) handleHealthMessage(message string) {
+func (h *HealthHandler) handleMessage(message string) {
+ if len(message) > 0 && message[0] == '.' {
+ h.baseHandler.handleHiddenMessage(message)
+ return
+ }
s := strings.Split(message, protocol.FieldDelimiter)
message = s[len(s)-1]
status := strings.Split(message, ":")
diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go
index 848e7f0..d1acfbd 100644
--- a/internal/clients/handlers/maprhandler.go
+++ b/internal/clients/handlers/maprhandler.go
@@ -44,7 +44,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) {
if message[0] == 'A' {
h.handleAggregateMessage(message)
} else {
- h.baseHandler.handleMessageType(message)
+ h.baseHandler.handleMessage(message)
}
h.baseHandler.receiveBuf.Reset()
default:
diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go
index df919ae..e2c8ccb 100644
--- a/internal/clients/healthclient.go
+++ b/internal/clients/healthclient.go
@@ -18,7 +18,7 @@ type HealthClient struct {
// NewHealthClient returns a new health client.
func NewHealthClient(args config.Args) (*HealthClient, error) {
args.Mode = omode.HealthClient
- args.UserName = config.ControlUser
+ args.UserName = config.HealthUser
c := HealthClient{
baseClient: baseClient{
Args: args,
@@ -28,7 +28,7 @@ func NewHealthClient(args config.Args) (*HealthClient, error) {
}
c.init()
- c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser))
+ c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser))
c.makeConnections(c)
return &c, nil