diff options
| -rw-r--r-- | TODO.md | 7 | ||||
| -rw-r--r-- | docker/.gitignore | 1 | ||||
| -rw-r--r-- | docker/Makefile | 9 | ||||
| -rw-r--r-- | internal/clients/args.go | 43 | ||||
| -rw-r--r-- | internal/clients/baseclient.go | 9 | ||||
| -rw-r--r-- | internal/clients/connectors/serverconnection.go | 38 | ||||
| -rw-r--r-- | internal/clients/connectors/serverless.go | 90 | ||||
| -rw-r--r-- | internal/clients/handlers/basehandler.go | 10 | ||||
| -rw-r--r-- | internal/done.go | 9 | ||||
| -rw-r--r-- | internal/server/handlers/serverhandler.go | 6 |
10 files changed, 156 insertions, 66 deletions
@@ -22,6 +22,13 @@ This is a loose list of what to do. Maybe for the next releae or maybe for a lat [ ] Document the two things above [x] Implement spartan mode [x] Make sure that diff is the same (plain file fs dcatted-file) in spartan mode +[ ] Document servless mode +[x] Implement serverless mode +[ ] Fix serverless mode (e.g. dmap doesn't aggregate all lines) [ ] document spartan mode [ ] Default client log dir is ~/log not ./log [ ] Make sure dmap results aren't in color in local log file +[ ] Unit test for dcat in serverless mode +[ ] Unit test for dgrep in serverless mode +[ ] Unit test for dmap in serverless mode +[ ] Separate logger into server logger and client logger for serverless operation (e.g. server info logs are all Debug) diff --git a/docker/.gitignore b/docker/.gitignore index 1ea3b1c..ef60a31 100644 --- a/docker/.gitignore +++ b/docker/.gitignore @@ -2,3 +2,4 @@ dserver mapr_testdata.log log *.csv +*.out diff --git a/docker/Makefile b/docker/Makefile index 75aed79..71fd249 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -18,12 +18,21 @@ dgrep: ../dgrep --servers serverlist.txt --files '/var/log/dserver/*' --regex MAPREDUCE --trustAllHosts dcat: ../dcat --servers serverlist.txt --files '/etc/passwd' --trustAllHosts +dcat2: + ../dcat /etc/passwd dmap: ../dmap --servers serverlist.txt --files '/var/log/dserver/*' --trustAllHosts --query 'from stats select avg(goroutines),max(goroutines),min(goroutines),last(goroutines),count($$hostname),$$hostname group by $$hostname order by avg(goroutines)' dmap2: ../dmap --servers serverlist.txt --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-A.csv' ../dmap --servers serverlist.txt --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-B.csv' + ../dmap --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-serverless.csv' ./mapr_testdata.log @echo Expecting zero diff! diff -u <(sort dmap2-A.csv) <(sort dmap2-B.csv) + diff -u <(sort dmap2-A.csv) <(sort dmap2-serverless.csv) +dmap3: + ../dmap --servers serverlist.txt --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-A.csv' + ../dmap --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-serverless.csv' ./mapr_testdata.log + @echo Expecting zero diff! + diff -u <(sort dmap2-A.csv) <(sort dmap2-serverless.csv) spinup1: docker run -p 2222:2222 dserver:develop diff --git a/internal/clients/args.go b/internal/clients/args.go index 081b911..dc71e83 100644 --- a/internal/clients/args.go +++ b/internal/clients/args.go @@ -3,12 +3,8 @@ package clients import ( "flag" "fmt" - "os" - "path/filepath" "strings" - "github.com/mimecast/dtail/internal/config" - "github.com/mimecast/dtail/internal/io/logger" "github.com/mimecast/dtail/internal/omode" gossh "golang.org/x/crypto/ssh" @@ -33,6 +29,7 @@ type Args struct { Quiet bool Spartan bool NoColor bool + Serverless bool } // Transform the arguments based on certain conditions. @@ -50,45 +47,15 @@ func (a *Args) Transform(args []string) { a.Quiet = true a.NoColor = true } -} -// TransformAfterConfigFile same as Transform, but after the config file has been read. -func (a *Args) TransformAfterConfigFile() { if a.Discovery == "" && a.ServersStr == "" { - a.handleEmptyServer() + a.Serverless = true } } -func (a *Args) handleEmptyServer() { - fqdn, err := os.Hostname() - if err != nil { - logger.FatalExit(err) - } - a.ServersStr = fmt.Sprintf("%s:%d", fqdn, config.Common.SSHPort) - // I am trusting my own hostname. - a.TrustAllHosts = true - logger.Debug("Will connect to local server", a.ServersStr) - - cleanPath := func(dirtyPath string) string { - cleanPath, err := filepath.EvalSymlinks(dirtyPath) - if err != nil { - logger.FatalExit("Unable to evaluate symlinks", dirtyPath, err) - } - cleanPath, err = filepath.Abs(cleanPath) - if err != nil { - logger.FatalExit("Unable to make file path absolute", dirtyPath, cleanPath, err) - } - return cleanPath - } - - logger.Debug("Dirty file paths", a.What) - var filePaths []string - for _, dirtyPath := range strings.Split(a.What, ",") { - filePaths = append(filePaths, cleanPath(dirtyPath)) - } - - a.What = strings.Join(filePaths, ",") - logger.Debug("Clean file paths", a.What) +// TransformAfterConfigFile same as Transform, but after the config file has been read. +func (a *Args) TransformAfterConfigFile() { + // TODO: Remove this method. It's not used. } func (a *Args) SerializeOptions() string { diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go index d0631fc..5523052 100644 --- a/internal/clients/baseclient.go +++ b/internal/clients/baseclient.go @@ -113,12 +113,17 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con time.Sleep(time.Second * 2) logger.Debug(conn.Server(), "Reconnecting") - c.connections[i] = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback) + conn = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback) + c.connections[i] = conn } } func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) connectors.Connector { - return connectors.NewServerConnection(server, c.UserName, sshAuthMethods, hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands()) + if c.Args.Serverless { + return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), c.maker.makeCommands()) + } + return connectors.NewServerConnection(server, c.UserName, sshAuthMethods, + hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands()) } func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) { diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index fab2f87..0904ba1 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -16,31 +16,21 @@ import ( "golang.org/x/crypto/ssh" ) -// ServerConnection represents a client connection connection to a single server. +// ServerConnection represents a connection to a single remote dtail server via SSH protocol. type ServerConnection struct { - // The remote server's hostname connected to. - server string - // The remote server's port connected to. - port int - // The SSH client configuration used. - config *ssh.ClientConfig - // The SSH client handler to use. - handler handlers.Handler - // DTail commands sent from client to server. When client loses - // connection to the server it re-connects automatically and sends the - // same commands again. - commands []string - // Is it a persistent connection or a one-off? - isOneOff bool - // To deal with SSH server host keys + server string + port int + config *ssh.ClientConfig + handler handlers.Handler + commands []string + isOneOff bool hostKeyCallback client.HostKeyCallback - // To determine if connection throttling has finished or not - throttlingDone bool + throttlingDone bool } -// NewServerConnection returns a new connection. +// NewServerConnection returns a new DTail SSH server connection. func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection { - logger.Debug(server, "Creating new connection") + logger.Debug(server, "Creating new connection", server, handler, commands) c := ServerConnection{ hostKeyCallback: hostKeyCallback, @@ -77,12 +67,10 @@ func NewOneOffServerConnection(server string, userName string, authMethods []ssh return &c } -// Server hostname func (c *ServerConnection) Server() string { return c.server } -// Handler for the connection func (c *ServerConnection) Handler() handlers.Handler { return c.handler } @@ -103,7 +91,6 @@ func (c *ServerConnection) initServerPort() { } } -// Start the server connection. Build up SSH session and send some DTail commands. func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { // Throttle how many connections can be established concurrently (based on ch length) logger.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh)) @@ -161,7 +148,7 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc, // Create the SSH session. Close the session in case of an error. func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error { - logger.Debug(c.server, "session") + logger.Debug(c.server, "Creating SSH session") session, err := client.NewSession() if err != nil { @@ -173,7 +160,7 @@ func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFun } func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error { - logger.Debug(c.server, "handle") + logger.Debug(c.server, "Creating handler for SSH session") stdinPipe, err := session.StdinPipe() if err != nil { @@ -221,5 +208,6 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc <-ctx.Done() c.handler.Shutdown() + return nil } diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go new file mode 100644 index 0000000..0500645 --- /dev/null +++ b/internal/clients/connectors/serverless.go @@ -0,0 +1,90 @@ +package connectors + +import ( + "context" + "io" + + "github.com/mimecast/dtail/internal/clients/handlers" + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/logger" + serverHandlers "github.com/mimecast/dtail/internal/server/handlers" + user "github.com/mimecast/dtail/internal/user/server" +) + +// Serverless creates a server object directly without TCP. +type Serverless struct { + handler handlers.Handler + commands []string + userName string +} + +// NewServerConnection returns a new connection. +func NewServerless(userName string, handler handlers.Handler, commands []string) *Serverless { + s := Serverless{ + userName: userName, + handler: handler, + commands: commands, + } + + logger.Debug("Creating new serverless connector", handler, commands) + return &s +} + +func (s *Serverless) Server() string { + return "local(serverless)" +} + +func (s *Serverless) Handler() handlers.Handler { + return s.handler +} + +func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) { + go func() { + defer cancel() + + if err := s.handle(ctx, cancel); err != nil { + logger.Warn(err) + } + }() + + <-ctx.Done() +} + +func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error { + logger.Debug("Creating server handler for a serverless session") + + serverHandler := serverHandlers.NewServerHandler( + user.New(s.userName, s.Server()), + make(chan struct{}, config.Server.MaxConcurrentCats), + make(chan struct{}, config.Server.MaxConcurrentTails), + ) + + go func() { + io.Copy(serverHandler, s.handler) + cancel() + }() + + go func() { + io.Copy(s.handler, serverHandler) + cancel() + }() + + go func() { + select { + case <-s.handler.Done(): + case <-ctx.Done(): + } + cancel() + }() + + // Send all commands to client. + for _, command := range s.commands { + logger.Debug(command) + s.handler.SendMessage(command) + } + + <-ctx.Done() + s.handler.Shutdown() + + return nil +} diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go index 0f2d1b5..af1ad62 100644 --- a/internal/clients/handlers/basehandler.go +++ b/internal/clients/handlers/basehandler.go @@ -22,6 +22,16 @@ type baseHandler struct { status int } +func (h *baseHandler) String() string { + return fmt.Sprintf("baseHandler(%s,server:%s,shellStarted:%v,status:%d)@%p", + h.done, + h.server, + h.shellStarted, + h.status, + h, + ) +} + func (h *baseHandler) Server() string { return h.server } diff --git a/internal/done.go b/internal/done.go index 54e5e8e..5ea22a0 100644 --- a/internal/done.go +++ b/internal/done.go @@ -17,6 +17,15 @@ func NewDone() *Done { } } +func (d *Done) String() string { + select { + case <-d.Done(): + return "Done(yes)" + default: + return "Done(no)" + } +} + // Done returns the done channel (closed when done) func (d *Done) Done() <-chan struct{} { return d.ch diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go index 2f3b73b..4820476 100644 --- a/internal/server/handlers/serverhandler.go +++ b/internal/server/handlers/serverhandler.go @@ -331,7 +331,11 @@ func (h *ServerHandler) handleAckCommand(argc int, args []string) { return } if args[1] == "close" && args[2] == "connection" { - close(h.ackCloseReceived) + select { + case <-h.ackCloseReceived: + default: + close(h.ackCloseReceived) + } } } |
