summaryrefslogtreecommitdiff
path: root/internal/clients
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-05 22:39:58 +0300
committerPaul Buetow <paul@buetow.org>2021-10-05 22:39:58 +0300
commit9f395a03f25941d8ed98ec43035688daa1e8877f (patch)
treebf3a2a97012c876cced1cc03c006af1a86113a61 /internal/clients
parentf70622f307629a2542ea5eb128dea8c1043d3a40 (diff)
more on this
Diffstat (limited to 'internal/clients')
-rw-r--r--internal/clients/baseclient.go47
-rw-r--r--internal/clients/connectors/serverless.go1
-rw-r--r--internal/clients/handlers/basehandler.go23
-rw-r--r--internal/clients/maprclient.go2
4 files changed, 22 insertions, 51 deletions
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 5ac298f..9574e13 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -9,7 +9,6 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/discovery"
"github.com/mimecast/dtail/internal/io/dlog"
- "github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
"github.com/mimecast/dtail/internal/ssh/client"
@@ -80,13 +79,14 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
// Print client stats every time something on statsCh is recieved.
go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet)
- // Keep count of active connections
- active := make(chan struct{}, len(c.connections))
+ var wg sync.WaitGroup
+ wg.Add(len(c.connections))
var mutex sync.Mutex
for i, conn := range c.connections {
go func(i int, conn connectors.Connector) {
- connStatus := c.startConnection(ctx, active, i, conn)
+ defer wg.Done()
+ connStatus := c.startConnection(ctx, i, conn)
// Update global status.
mutex.Lock()
@@ -97,17 +97,11 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
}(i, conn)
}
- time.Sleep(time.Second * 2)
- c.waitUntilDone(ctx, active)
+ wg.Wait()
return
}
-func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) {
- // Increment connection count
- active <- struct{}{}
- // Derement connection count
- defer func() { <-active }()
-
+func (c *baseClient) startConnection(ctx context.Context, i int, conn connectors.Connector) (status int) {
for {
connCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -127,33 +121,12 @@ func (c *baseClient) startConnection(ctx context.Context, active chan struct{},
}
}
-func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) connectors.Connector {
+func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod,
+ hostKeyCallback client.HostKeyCallback) connectors.Connector {
if c.Args.Serverless {
- return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), c.maker.makeCommands())
+ return connectors.NewServerless(c.UserName, c.maker.makeHandler(server),
+ c.maker.makeCommands())
}
return connectors.NewServerConnection(server, c.UserName, sshAuthMethods,
hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands())
}
-
-func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
- defer dlog.Client.Debug("Terminated connection")
-
- // We want to have at least one active connection
- <-active
- // Put it back on the channel
- active <- struct{}{}
-
- if c.Mode == omode.TailClient && c.retry {
- <-ctx.Done()
- }
-
- // TODO: Rewrite this to use a wait group.
- for {
- numActive := len(active)
- if numActive == 0 {
- return
- }
- dlog.Client.Debug("Active connections", numActive)
- time.Sleep(time.Second * time.Millisecond * 100)
- }
-}
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index ae72c9b..2a1cec4 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -69,6 +69,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
}
terminate := func() {
+ dlog.Client.Debug("Terminating serverless connection")
serverHandler.Shutdown()
cancel()
}
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 8acb45f..04124e7 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -40,14 +40,6 @@ func (h *baseHandler) Status() int {
return h.status
}
-func (h *baseHandler) Done() <-chan struct{} {
- return h.done.Done()
-}
-
-func (h *baseHandler) Shutdown() {
- h.done.Shutdown()
-}
-
// SendMessage to the server.
func (h *baseHandler) SendMessage(command string) error {
encoded := base64.StdEncoding.EncodeToString([]byte(command))
@@ -77,12 +69,6 @@ func (h *baseHandler) Write(p []byte) (n int, err error) {
*/
case '\n', protocol.MessageDelimiter:
message := h.receiveBuf.String()
- /*
- // dcat/grep should actually display empty lines.
- if len(message) == 0 {
- continue
- }
- */
h.handleMessageType(message)
h.receiveBuf.Reset()
default:
@@ -121,5 +107,14 @@ func (h *baseHandler) handleHiddenMessage(message string) {
switch {
case strings.HasPrefix(message, ".syn close connection"):
h.SendMessage(".ack close connection")
+ h.Shutdown()
}
}
+
+func (h *baseHandler) Done() <-chan struct{} {
+ return h.done.Done()
+}
+
+func (h *baseHandler) Shutdown() {
+ h.done.Shutdown()
+}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 19fe119..92bbe39 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -110,6 +110,8 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
return
}
+// NEXT: Make this a callback function rather trying to use polymorphism to call this.
+// This applies to all clients.
func (c MaprClient) makeHandler(server string) handlers.Handler {
return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}