diff options
| author | Paul Buetow <paul@buetow.org> | 2021-10-05 22:39:58 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2021-10-05 22:39:58 +0300 |
| commit | 9f395a03f25941d8ed98ec43035688daa1e8877f (patch) | |
| tree | bf3a2a97012c876cced1cc03c006af1a86113a61 | |
| parent | f70622f307629a2542ea5eb128dea8c1043d3a40 (diff) | |
more on this
| -rw-r--r-- | cmd/dcat/main.go | 14 | ||||
| -rw-r--r-- | internal/clients/baseclient.go | 47 | ||||
| -rw-r--r-- | internal/clients/connectors/serverless.go | 1 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 23 | ||||
| -rw-r--r-- | internal/clients/maprclient.go | 2 | ||||
| -rw-r--r-- | internal/config/client.go | 4 | ||||
| -rw-r--r-- | internal/server/handlers/basehandler.go | 2 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 16 |
8 files changed, 39 insertions, 70 deletions
diff --git a/cmd/dcat/main.go b/cmd/dcat/main.go index 9fe776b..ee851ab 100644 --- a/cmd/dcat/main.go +++ b/cmd/dcat/main.go @@ -3,9 +3,14 @@ package main import ( "context" "flag" + "fmt" "os" "sync" + "net/http" + _ "net/http" + _ "net/http/pprof" + "github.com/mimecast/dtail/internal/clients" "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/io/dlog" @@ -19,6 +24,7 @@ import ( func main() { var args config.Args var displayVersion bool + var pprof int userName := user.Name() @@ -29,6 +35,7 @@ func main() { flag.BoolVar(&displayVersion, "version", false, "Display version") flag.IntVar(&args.ConnectionsPerCPU, "cpc", config.DefaultConnectionsPerCPU, "How many connections established per CPU core concurrently") flag.IntVar(&args.SSHPort, "port", config.DefaultSSHPort, "SSH server port") + flag.IntVar(&pprof, "pprof", -1, "Start PProf server this port") flag.StringVar(&args.ConfigFile, "cfg", "", "Config file path") flag.StringVar(&args.Discovery, "discovery", "", "Server discovery method") flag.StringVar(&args.LogDir, "logDir", "~/log", "Log dir") @@ -53,6 +60,13 @@ func main() { wg.Add(1) dlog.Start(ctx, &wg, source.Client, config.Common.LogLevel) + if pprof > -1 { + // For debugging purposes only + pprofArgs := fmt.Sprintf("0.0.0.0:%d", pprof) + go http.ListenAndServe(pprofArgs, nil) + dlog.Client.Info("Started PProf", pprofArgs) + } + client, err := clients.NewCatClient(args) if err != nil { panic(err) diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index 5ac298f..9574e13 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -9,7 +9,6 @@ import ( "github.com/mimecast/dtail/internal/config" "github.com/mimecast/dtail/internal/discovery" "github.com/mimecast/dtail/internal/io/dlog" - "github.com/mimecast/dtail/internal/omode" "github.com/mimecast/dtail/internal/regex" "github.com/mimecast/dtail/internal/ssh/client" @@ -80,13 +79,14 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i // Print client stats every time something on statsCh is recieved. go c.stats.Start(ctx, c.throttleCh, statsCh, c.Args.Quiet) - // Keep count of active connections - active := make(chan struct{}, len(c.connections)) + var wg sync.WaitGroup + wg.Add(len(c.connections)) var mutex sync.Mutex for i, conn := range c.connections { go func(i int, conn connectors.Connector) { - connStatus := c.startConnection(ctx, active, i, conn) + defer wg.Done() + connStatus := c.startConnection(ctx, i, conn) // Update global status. mutex.Lock() @@ -97,17 +97,11 @@ func (c *baseClient) Start(ctx context.Context, statsCh <-chan string) (status i }(i, conn) } - time.Sleep(time.Second * 2) - c.waitUntilDone(ctx, active) + wg.Wait() return } -func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, i int, conn connectors.Connector) (status int) { - // Increment connection count - active <- struct{}{} - // Derement connection count - defer func() { <-active }() - +func (c *baseClient) startConnection(ctx context.Context, i int, conn connectors.Connector) (status int) { for { connCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -127,33 +121,12 @@ func (c *baseClient) startConnection(ctx context.Context, active chan struct{}, } } -func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) connectors.Connector { +func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, + hostKeyCallback client.HostKeyCallback) connectors.Connector { if c.Args.Serverless { - return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), c.maker.makeCommands()) + return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), + c.maker.makeCommands()) } return connectors.NewServerConnection(server, c.UserName, sshAuthMethods, hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands()) } - -func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { - defer dlog.Client.Debug("Terminated connection") - - // We want to have at least one active connection - <-active - // Put it back on the channel - active <- struct{}{} - - if c.Mode == omode.TailClient && c.retry { - <-ctx.Done() - } - - // TODO: Rewrite this to use a wait group. - for { - numActive := len(active) - if numActive == 0 { - return - } - dlog.Client.Debug("Active connections", numActive) - time.Sleep(time.Second * time.Millisecond * 100) - } -} diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go index ae72c9b..2a1cec4 100644 --- a/internal/clients/connectors/serverless.go +++ b/internal/clients/connectors/serverless.go @@ -69,6 +69,7 @@ func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) erro } terminate := func() { + dlog.Client.Debug("Terminating serverless connection") serverHandler.Shutdown() cancel() } diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 8acb45f..04124e7 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -40,14 +40,6 @@ func (h *baseHandler) Status() int { return h.status } -func (h *baseHandler) Done() <-chan struct{} { - return h.done.Done() -} - -func (h *baseHandler) Shutdown() { - h.done.Shutdown() -} - // SendMessage to the server. func (h *baseHandler) SendMessage(command string) error { encoded := base64.StdEncoding.EncodeToString([]byte(command)) @@ -77,12 +69,6 @@ func (h *baseHandler) Write(p []byte) (n int, err error) { */ case '\n', protocol.MessageDelimiter: message := h.receiveBuf.String() - /* - // dcat/grep should actually display empty lines. - if len(message) == 0 { - continue - } - */ h.handleMessageType(message) h.receiveBuf.Reset() default: @@ -121,5 +107,14 @@ func (h *baseHandler) handleHiddenMessage(message string) { switch { case strings.HasPrefix(message, ".syn close connection"): h.SendMessage(".ack close connection") + h.Shutdown() } } + +func (h *baseHandler) Done() <-chan struct{} { + return h.done.Done() +} + +func (h *baseHandler) Shutdown() { + h.done.Shutdown() +} diff --git a/internal/clients/maprclient.go b/internal/clients/maprclient.go index 19fe119..92bbe39 100644 --- a/internal/clients/maprclient.go +++ b/internal/clients/maprclient.go @@ -110,6 +110,8 @@ func (c *MaprClient) Start(ctx context.Context, statsCh <-chan string) (status i return } +// NEXT: Make this a callback function rather trying to use polymorphism to call this. +// This applies to all clients. func (c MaprClient) makeHandler(server string) handlers.Handler { return handlers.NewMaprHandler(server, c.query, c.globalGroup) } diff --git a/internal/config/client.go b/internal/config/client.go index 152ab15..ecd05c5 100644 --- a/internal/config/client.go +++ b/internal/config/client.go @@ -162,8 +162,8 @@ func newDefaultClientConfig() *ClientConfig { HostnameBg: color.BgCyan, HostnameFg: color.FgBlack, TextAttr: color.AttrNone, - TextBg: color.BgCyan, - TextFg: color.FgBlack, + TextBg: color.BgBlack, + TextFg: color.FgWhite, }, Common: commonTermColors{ SeverityErrorAttr: color.AttrBold, diff --git a/internal/server/handlers/basehandler.go b/internal/server/handlers/basehandler.go index 12fb2b3..b683578 100644 --- a/internal/server/handlers/basehandler.go +++ b/internal/server/handlers/basehandler.go @@ -242,7 +242,7 @@ func (h *baseHandler) flush() { for i := 0; i < 3; i++ { if numUnsentMessages() == 0 { - dlog.Server.Debug(h.user, "All lines sent") + dlog.Server.Debug(h.user, "All lines sent", fmt.Sprintf("%p", h)) return } dlog.Server.Debug(h.user, "Still lines to be sent") diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 2ec4fbf..25cb8ba 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -21,22 +21,6 @@ type ServerHandler struct { catLimiter chan struct{} tailLimiter chan struct{} regex string - /* - done *internal.Done - lines chan line.Line - aggregate *server.Aggregate - maprMessages chan string - serverMessages chan string - hostname string - user *user.User - ackCloseReceived chan struct{} - activeCommands int32 - quiet bool - spartan bool - serverless bool - readBuf bytes.Buffer - writeBuf bytes.Buffer - */ } // NewServerHandler returns the server handler. |
