From 9f395a03f25941d8ed98ec43035688daa1e8877f Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Tue, 5 Oct 2021 22:39:58 +0300 Subject: more on this --- internal/clients/baseclient.go | 47 +++++++++--------------------------------- 1 file changed, 10 insertions(+), 37 deletions(-) (limited to 'internal/clients/baseclient.go') diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 5ac298f..9574e13 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -9,7 +9,6 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/discovery" "github.com/mimecast/dtail/internal/io/dlog" - "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" "github.com/mimecast/dtail/internal/ssh/client" @@ -80,13 +79,14 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i // Print client stats every time something on statsCh is recieved. go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) - // Keep count of active connections - active := make(chan struct{}, len(c.connections)) + var wg sync.WaitGroup + wg.Add(len(c.connections)) var mutex sync.Mutex for i, conn := range c.connections { go func(i int, conn connectors.Connector) { - connStatus := c.startConnection(ctx, active, i, conn) + defer wg.Done() + connStatus := c.startConnection(ctx, i, conn) // Update global status. mutex.Lock() @@ -97,17 +97,11 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i }(i, conn) } - time.Sleep(time.Second * 2) - c.waitUntilDone(ctx, active) + wg.Wait() return } -func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) { - // Increment connection count - active <- struct{}{} - // Derement connection count - defer func() { <-active }() - +func (c *baseClient) startConnection(ctx context.Context, i int, conn connectors.Connector) (status int) { for { connCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -127,33 +121,12 @@ func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, } } -func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) connectors.Connector { +func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, + hostKeyCallback client.HostKeyCallback) connectors.Connector { if c.Args.Serverless { - return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), c.maker.makeCommands()) + return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), + c.maker.makeCommands()) } return connectors.NewServerConnection(server, c.UserName, sshAuthMethods, hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands()) } - -func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { - defer dlog.Client.Debug("Terminated connection") - - // We want to have at least one active connection - <-active - // Put it back on the channel - active <- struct{}{} - - if c.Mode == omode.TailClient && c.retry { - <-ctx.Done() - } - - // TODO: Rewrite this to use a wait group. - for { - numActive := len(active) - if numActive == 0 { - return - } - dlog.Client.Debug("Active connections", numActive) - time.Sleep(time.Second * time.Millisecond * 100) - } -} -- cgit v1.2.3