summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-05 22:39:58 +0300
committerPaul Buetow <paul@buetow.org>2021-10-05 22:39:58 +0300
commit9f395a03f25941d8ed98ec43035688daa1e8877f (patch)
treebf3a2a97012c876cced1cc03c006af1a86113a61
parentf70622f307629a2542ea5eb128dea8c1043d3a40 (diff)
more on this
-rw-r--r--cmd/dcat/main.go14
-rw-r--r--internal/clients/baseclient.go47
-rw-r--r--internal/clients/connectors/serverless.go1
-rw-r--r--internal/clients/handlers/basehandler.go23
-rw-r--r--internal/clients/maprclient.go2
-rw-r--r--internal/config/client.go4
-rw-r--r--internal/server/handlers/basehandler.go2
-rw-r--r--internal/server/handlers/serverhandler.go16
8 files changed, 39 insertions, 70 deletions
diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go
index 9fe776b..ee851ab 100644
--- a/cmd/dcat/main.go
+++ b/cmd/dcat/main.go
@@ -3,9 +3,14 @@ package main
import (
"context"
"flag"
+ "fmt"
"os"
"sync"
+ "net/http"
+ _ "net/http"
+ _ "net/http/pprof"
+
"github.com/mimecast/dtail/internal/clients"
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/io/dlog"
@@ -19,6 +24,7 @@ import (
func main() {
var args config.Args
var displayVersion bool
+ var pprof int
userName := user.Name()
@@ -29,6 +35,7 @@ func main() {
flag.BoolVar(&displayVersion, "version", false, "Display version")
flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, "How many connections established per CPU core concurrently")
flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port")
+ flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port")
flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path")
flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method")
flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir")
@@ -53,6 +60,13 @@ func main() {
wg.Add(1)
dlog.Start(ctx, &wg, source.Client, config.Common.LogLevel)
+ if pprof > -1 {
+ // For debugging purposes only
+ pprofArgs := fmt.Sprintf("0.0.0.0:%d", pprof)
+ go http.ListenAndServe(pprofArgs, nil)
+ dlog.Client.Info("Started PProf", pprofArgs)
+ }
+
client, err := clients.NewCatClient(args)
if err != nil {
panic(err)
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index 5ac298f..9574e13 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -9,7 +9,6 @@ import (
"github.com/mimecast/dtail/internal/config"
"github.com/mimecast/dtail/internal/discovery"
"github.com/mimecast/dtail/internal/io/dlog"
- "github.com/mimecast/dtail/internal/omode"
"github.com/mimecast/dtail/internal/regex"
"github.com/mimecast/dtail/internal/ssh/client"
@@ -80,13 +79,14 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
// Print client stats every time something on statsCh is recieved.
go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet)
- // Keep count of active connections
- active := make(chan struct{}, len(c.connections))
+ var wg sync.WaitGroup
+ wg.Add(len(c.connections))
var mutex sync.Mutex
for i, conn := range c.connections {
go func(i int, conn connectors.Connector) {
- connStatus := c.startConnection(ctx, active, i, conn)
+ defer wg.Done()
+ connStatus := c.startConnection(ctx, i, conn)
// Update global status.
mutex.Lock()
@@ -97,17 +97,11 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i
}(i, conn)
}
- time.Sleep(time.Second * 2)
- c.waitUntilDone(ctx, active)
+ wg.Wait()
return
}
-func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) {
- // Increment connection count
- active <- struct{}{}
- // Derement connection count
- defer func() { <-active }()
-
+func (c *baseClient) startConnection(ctx context.Context, i int, conn connectors.Connector) (status int) {
for {
connCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -127,33 +121,12 @@ func (c *baseClient) startConnection(ctx context.Context, active chan struct{},
}
}
-func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) connectors.Connector {
+func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod,
+ hostKeyCallback client.HostKeyCallback) connectors.Connector {
if c.Args.Serverless {
- return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), c.maker.makeCommands())
+ return connectors.NewServerless(c.UserName, c.maker.makeHandler(server),
+ c.maker.makeCommands())
}
return connectors.NewServerConnection(server, c.UserName, sshAuthMethods,
hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands())
}
-
-func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
- defer dlog.Client.Debug("Terminated connection")
-
- // We want to have at least one active connection
- <-active
- // Put it back on the channel
- active <- struct{}{}
-
- if c.Mode == omode.TailClient && c.retry {
- <-ctx.Done()
- }
-
- // TODO: Rewrite this to use a wait group.
- for {
- numActive := len(active)
- if numActive == 0 {
- return
- }
- dlog.Client.Debug("Active connections", numActive)
- time.Sleep(time.Second * time.Millisecond * 100)
- }
-}
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
index ae72c9b..2a1cec4 100644
--- a/internal/clients/connectors/serverless.go
+++ b/internal/clients/connectors/serverless.go
@@ -69,6 +69,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro
}
terminate := func() {
+ dlog.Client.Debug("Terminating serverless connection")
serverHandler.Shutdown()
cancel()
}
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 8acb45f..04124e7 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -40,14 +40,6 @@ func (h *baseHandler) Status() int {
return h.status
}
-func (h *baseHandler) Done() <-chan struct{} {
- return h.done.Done()
-}
-
-func (h *baseHandler) Shutdown() {
- h.done.Shutdown()
-}
-
// SendMessage to the server.
func (h *baseHandler) SendMessage(command string) error {
encoded := base64.StdEncoding.EncodeToString([]byte(command))
@@ -77,12 +69,6 @@ func (h *baseHandler) Write(p []byte) (n int, err error) {
*/
case '\n', protocol.MessageDelimiter:
message := h.receiveBuf.String()
- /*
- // dcat/grep should actually display empty lines.
- if len(message) == 0 {
- continue
- }
- */
h.handleMessageType(message)
h.receiveBuf.Reset()
default:
@@ -121,5 +107,14 @@ func (h *baseHandler) handleHiddenMessage(message string) {
switch {
case strings.HasPrefix(message, ".syn close connection"):
h.SendMessage(".ack close connection")
+ h.Shutdown()
}
}
+
+func (h *baseHandler) Done() <-chan struct{} {
+ return h.done.Done()
+}
+
+func (h *baseHandler) Shutdown() {
+ h.done.Shutdown()
+}
diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go
index 19fe119..92bbe39 100644
--- a/internal/clients/maprclient.go
+++ b/internal/clients/maprclient.go
@@ -110,6 +110,8 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i
return
}
+// NEXT: Make this a callback function rather trying to use polymorphism to call this.
+// This applies to all clients.
func (c MaprClient) makeHandler(server string) handlers.Handler {
return handlers.NewMaprHandler(server, c.query, c.globalGroup)
}
diff --git a/internal/config/client.go b/internal/config/client.go
index 152ab15..ecd05c5 100644
--- a/internal/config/client.go
+++ b/internal/config/client.go
@@ -162,8 +162,8 @@ func newDefaultClientConfig() *ClientConfig {
HostnameBg: color.BgCyan,
HostnameFg: color.FgBlack,
TextAttr: color.AttrNone,
- TextBg: color.BgCyan,
- TextFg: color.FgBlack,
+ TextBg: color.BgBlack,
+ TextFg: color.FgWhite,
},
Common: commonTermColors{
SeverityErrorAttr: color.AttrBold,
diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go
index 12fb2b3..b683578 100644
--- a/internal/server/handlers/basehandler.go
+++ b/internal/server/handlers/basehandler.go
@@ -242,7 +242,7 @@ func (h *baseHandler) flush() {
for i := 0; i < 3; i++ {
if numUnsentMessages() == 0 {
- dlog.Server.Debug(h.user, "All lines sent")
+ dlog.Server.Debug(h.user, "All lines sent", fmt.Sprintf("%p", h))
return
}
dlog.Server.Debug(h.user, "Still lines to be sent")
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 2ec4fbf..25cb8ba 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -21,22 +21,6 @@ type ServerHandler struct {
catLimiter chan struct{}
tailLimiter chan struct{}
regex string
- /*
- done *internal.Done
- lines chan line.Line
- aggregate *server.Aggregate
- maprMessages chan string
- serverMessages chan string
- hostname string
- user *user.User
- ackCloseReceived chan struct{}
- activeCommands int32
- quiet bool
- spartan bool
- serverless bool
- readBuf bytes.Buffer
- writeBuf bytes.Buffer
- */
}
// NewServerHandler returns the server handler.