diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-06 09:50:41 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-06 09:50:41 +0300 |
| commit | fab5dc3e70434ea0abc7a0976487a1973b662331 (patch) | |
| tree | 61a06e166f225b69f09966e81ae725f960fd80be /internal/clients | |
| parent | 9f395a03f25941d8ed98ec43035688daa1e8877f (diff) | |
enable faster shutdown - useful for dgrep/dmap and dcat commands
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/baseclient.go | 14 | ||||
| -rw-r--r-- | internal/clients/connectors/serverless.go | 15 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 8 | ||||
| -rw-r--r-- | internal/clients/handlers/healthhandler.go | 14 | ||||
| -rw-r--r-- | internal/clients/handlers/maprhandler.go | 2 | ||||
| -rw-r--r-- | internal/clients/healthclient.go | 4 |
6 files changed, 31 insertions, 26 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 9574e13..b474208 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -39,8 +39,7 @@ type baseClient struct { } func (c *baseClient) init() { - dlog.Client.Debug("Initiating base client") - dlog.Client.Debug(c.Args.String()) + dlog.Client.Debug("Initiating base client", c.Args.String()) flag := regex.Default if c.Args.RegexInvert { @@ -48,15 +47,16 @@ func (c *baseClient) init() { } regex, err := regex.New(c.Args.RegexStr, flag) if err != nil { - dlog.Client.FatalPanic(c.Regex, "invalid regex!", err, regex) + dlog.Client.FatalPanic(c.Regex, "Invalid regex!", err, regex) } c.Regex = regex - dlog.Client.Debug("Regex", c.Regex) if c.Args.Serverless { return } - c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods(c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, c.throttleCh, c.Args.PrivateKeyPathFile) + c.sshAuthMethods, c.hostKeyCallback = client.InitSSHAuthMethods( + c.Args.SSHAuthMethods, c.Args.SSHHostKeyCallback, c.Args.TrustAllHosts, + c.throttleCh, c.Args.PrivateKeyPathFile) } func (c *baseClient) makeConnections(maker maker) { @@ -71,6 +71,7 @@ func (c *baseClient) makeConnections(maker maker) { } func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status int) { + dlog.Client.Trace("Starting base client") // Can be nil when serverless. if c.hostKeyCallback != nil { // Periodically check for unknown hosts, and ask the user whether to trust them or not. @@ -81,13 +82,12 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i var wg sync.WaitGroup wg.Add(len(c.connections)) - var mutex sync.Mutex + for i, conn := range c.connections { go func(i int, conn connectors.Connector) { defer wg.Done() connStatus := c.startConnection(ctx, i, conn) - // Update global status. mutex.Lock() defer mutex.Unlock() diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index 2a1cec4..768a5ce 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -37,6 +37,7 @@ func (s *Serverless) Handler() handlers.Handler { } func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { + dlog.Client.Debug("Starting serverless connector") go func() { defer cancel() @@ -44,7 +45,6 @@ func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throt dlog.Client.Warn(err) } }() - <-ctx.Done() } @@ -58,9 +58,11 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro var serverHandler serverHandlers.Handler switch s.userName { - case config.ControlUser: - serverHandler = serverHandlers.NewControlHandler(user) + case config.HealthUser: + dlog.Client.Debug("Creating serverless health handler") + serverHandler = serverHandlers.NewHealthHandler(user) default: + dlog.Client.Debug("Creating serverless server handler") serverHandler = serverHandlers.NewServerHandler( user, make(chan struct{}, config.Server.MaxConcurrentCats), @@ -76,29 +78,34 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro go func() { io.Copy(serverHandler, s.handler) + dlog.Client.Trace("io.Copy(serverHandler, s.handler) => done") terminate() }() go func() { io.Copy(s.handler, serverHandler) + dlog.Client.Trace("io.Copy(s.handler, serverHandler) => done") terminate() }() go func() { select { case <-s.handler.Done(): + dlog.Client.Trace("<-s.handler.Done()") case <-ctx.Done(): + dlog.Client.Trace("<-ctx.Done()") } terminate() }() // Send all commands to client. for _, command := range s.commands { - dlog.Client.Debug(command) + dlog.Client.Debug("Sending command to serverless server", command) s.handler.SendMessage(command) } <-ctx.Done() + dlog.Client.Trace("s.handler.Shutdown()") s.handler.Shutdown() return nil diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 04124e7..b520c25 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -69,7 +69,7 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { */ case '\n', protocol.MessageDelimiter: message := h.receiveBuf.String() - h.handleMessageType(message) + h.handleMessage(message) h.receiveBuf.Reset() default: h.receiveBuf.WriteByte(b) @@ -90,9 +90,7 @@ func (h *baseHandler) Read(p []byte) (n int, err error) { return } -// Handle various message types. -func (h *baseHandler) handleMessageType(message string) { - // Hidden server commands starti with a dot "." +func (h *baseHandler) handleMessage(message string) { if len(message) > 0 && message[0] == '.' { h.handleHiddenMessage(message) return @@ -106,7 +104,7 @@ func (h *baseHandler) handleMessageType(message string) { func (h *baseHandler) handleHiddenMessage(message string) { switch { case strings.HasPrefix(message, ".syn close connection"): - h.SendMessage(".ack close connection") + go h.SendMessage(".ack close connection") h.Shutdown() } } diff --git a/internal/clients/handlers/healthhandler.go b/internal/clients/handlers/healthhandler.go index 4949985..4b16ce4 100644 --- a/internal/clients/handlers/healthhandler.go +++ b/internal/clients/handlers/healthhandler.go @@ -12,7 +12,6 @@ import ( // HealthHandler is the handler used on the client side for running mapreduce aggregations. type HealthHandler struct { baseHandler - HealthStatusCh chan<- int } // NewHealthHandler returns a new health client handler. @@ -26,7 +25,6 @@ func NewHealthHandler(server string) *HealthHandler { status: -1, done: internal.NewDone(), }, - HealthStatusCh: make(chan int), } } @@ -34,12 +32,10 @@ func NewHealthHandler(server string) *HealthHandler { func (h *HealthHandler) Write(p []byte) (n int, err error) { for _, b := range p { switch b { - case '\n': - continue - case protocol.MessageDelimiter: + case '\n', protocol.MessageDelimiter: message := h.baseHandler.receiveBuf.String() dlog.Client.Debug(message) - h.handleHealthMessage(message) + h.handleMessage(message) h.baseHandler.receiveBuf.Reset() default: h.baseHandler.receiveBuf.WriteByte(b) @@ -49,7 +45,11 @@ func (h *HealthHandler) Write(p []byte) (n int, err error) { return len(p), nil } -func (h *HealthHandler) handleHealthMessage(message string) { +func (h *HealthHandler) handleMessage(message string) { + if len(message) > 0 && message[0] == '.' { + h.baseHandler.handleHiddenMessage(message) + return + } s := strings.Split(message, protocol.FieldDelimiter) message = s[len(s)-1] status := strings.Split(message, ":") diff --git a/internal/clients/handlers/maprhandler.go b/internal/clients/handlers/maprhandler.go index 848e7f0..d1acfbd 100644 --- a/internal/clients/handlers/maprhandler.go +++ b/internal/clients/handlers/maprhandler.go @@ -44,7 +44,7 @@ func (h *MaprHandler) Write(p []byte) (n int, err error) { if message[0] == 'A' { h.handleAggregateMessage(message) } else { - h.baseHandler.handleMessageType(message) + h.baseHandler.handleMessage(message) } h.baseHandler.receiveBuf.Reset() default: diff --git a/internal/clients/healthclient.go b/internal/clients/healthclient.go index df919ae..e2c8ccb 100644 --- a/internal/clients/healthclient.go +++ b/internal/clients/healthclient.go @@ -18,7 +18,7 @@ type HealthClient struct { // NewHealthClient returns a new health client. func NewHealthClient(args config.Args) (*HealthClient, error) { args.Mode = omode.HealthClient - args.UserName = config.ControlUser + args.UserName = config.HealthUser c := HealthClient{ baseClient: baseClient{ Args: args, @@ -28,7 +28,7 @@ func NewHealthClient(args config.Args) (*HealthClient, error) { } c.init() - c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.ControlUser)) + c.sshAuthMethods = append(c.sshAuthMethods, gossh.Password(config.HealthUser)) c.makeConnections(c) return &c, nil |
