summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TODO.md7
-rw-r--r--docker/.gitignore1
-rw-r--r--docker/Makefile9
-rw-r--r--internal/clients/args.go43
-rw-r--r--internal/clients/baseclient.go9
-rw-r--r--internal/clients/connectors/serverconnection.go38
-rw-r--r--internal/clients/connectors/serverless.go90
-rw-r--r--internal/clients/handlers/basehandler.go10
-rw-r--r--internal/done.go9
-rw-r--r--internal/server/handlers/serverhandler.go6
10 files changed, 156 insertions, 66 deletions
diff --git a/TODO.md b/TODO.md
index 7674577..156c745 100644
--- a/TODO.md
+++ b/TODO.md
@@ -22,6 +22,13 @@ This is a loose list of what to do. Maybe for the next releae or maybe for a lat
[ ] Document the two things above
[x] Implement spartan mode
[x] Make sure that diff is the same (plain file fs dcatted-file) in spartan mode
+[ ] Document servless mode
+[x] Implement serverless mode
+[ ] Fix serverless mode (e.g. dmap doesn't aggregate all lines)
[ ] document spartan mode
[ ] Default client log dir is ~/log not ./log
[ ] Make sure dmap results aren't in color in local log file
+[ ] Unit test for dcat in serverless mode
+[ ] Unit test for dgrep in serverless mode
+[ ] Unit test for dmap in serverless mode
+[ ] Separate logger into server logger and client logger for serverless operation (e.g. server info logs are all Debug)
diff --git a/docker/.gitignore b/docker/.gitignore
index 1ea3b1c..ef60a31 100644
--- a/docker/.gitignore
+++ b/docker/.gitignore
@@ -2,3 +2,4 @@ dserver
mapr_testdata.log
log
*.csv
+*.out
diff --git a/docker/Makefile b/docker/Makefile
index 75aed79..71fd249 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -18,12 +18,21 @@ dgrep:
../dgrep --servers serverlist.txt --files '/var/log/dserver/*' --regex MAPREDUCE --trustAllHosts
dcat:
../dcat --servers serverlist.txt --files '/etc/passwd' --trustAllHosts
+dcat2:
+ ../dcat /etc/passwd
dmap:
../dmap --servers serverlist.txt --files '/var/log/dserver/*' --trustAllHosts --query 'from stats select avg(goroutines),max(goroutines),min(goroutines),last(goroutines),count($$hostname),$$hostname group by $$hostname order by avg(goroutines)'
dmap2:
../dmap --servers serverlist.txt --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-A.csv'
../dmap --servers serverlist.txt --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-B.csv'
+ ../dmap --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-serverless.csv' ./mapr_testdata.log
@echo Expecting zero diff!
diff -u <(sort dmap2-A.csv) <(sort dmap2-B.csv)
+ diff -u <(sort dmap2-A.csv) <(sort dmap2-serverless.csv)
+dmap3:
+ ../dmap --servers serverlist.txt --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-A.csv'
+ ../dmap --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-serverless.csv' ./mapr_testdata.log
+ @echo Expecting zero diff!
+ diff -u <(sort dmap2-A.csv) <(sort dmap2-serverless.csv)
spinup1:
docker run -p 2222:2222 dserver:develop
diff --git a/internal/clients/args.go b/internal/clients/args.go
index 081b911..dc71e83 100644
--- a/internal/clients/args.go
+++ b/internal/clients/args.go
@@ -3,12 +3,8 @@ package clients
import (
"flag"
"fmt"
- "os"
- "path/filepath"
"strings"
- "github.com/mimecast/dtail/internal/config"
- "github.com/mimecast/dtail/internal/io/logger"
"github.com/mimecast/dtail/internal/omode"
gossh "golang.org/x/crypto/ssh"
@@ -33,6 +29,7 @@ type Args struct {
Quiet bool
Spartan bool
NoColor bool
+ Serverless bool
}
// Transform the arguments based on certain conditions.
@@ -50,45 +47,15 @@ func (a *Args) Transform(args []string) {
a.Quiet = true
a.NoColor = true
}
-}
-// TransformAfterConfigFile same as Transform, but after the config file has been read.
-func (a *Args) TransformAfterConfigFile() {
if a.Discovery == "" && a.ServersStr == "" {
- a.handleEmptyServer()
+ a.Serverless = true
}
}
-func (a *Args) handleEmptyServer() {
- fqdn, err := os.Hostname()
- if err != nil {
- logger.FatalExit(err)
- }
- a.ServersStr = fmt.Sprintf("%s:%d", fqdn, config.Common.SSHPort)
- // I am trusting my own hostname.
- a.TrustAllHosts = true
- logger.Debug("Will connect to local server", a.ServersStr)
-
- cleanPath := func(dirtyPath string) string {
- cleanPath, err := filepath.EvalSymlinks(dirtyPath)
- if err != nil {
- logger.FatalExit("Unable to evaluate symlinks", dirtyPath, err)
- }
- cleanPath, err = filepath.Abs(cleanPath)
- if err != nil {
- logger.FatalExit("Unable to make file path absolute", dirtyPath, cleanPath, err)
- }
- return cleanPath
- }
-
- logger.Debug("Dirty file paths", a.What)
- var filePaths []string
- for _, dirtyPath := range strings.Split(a.What, ",") {
- filePaths = append(filePaths, cleanPath(dirtyPath))
- }
-
- a.What = strings.Join(filePaths, ",")
- logger.Debug("Clean file paths", a.What)
+// TransformAfterConfigFile same as Transform, but after the config file has been read.
+func (a *Args) TransformAfterConfigFile() {
+ // TODO: Remove this method. It's not used.
}
func (a *Args) SerializeOptions() string {
diff --git a/internal/clients/baseclient.go b/internal/clients/baseclient.go
index d0631fc..5523052 100644
--- a/internal/clients/baseclient.go
+++ b/internal/clients/baseclient.go
@@ -113,12 +113,17 @@ func (c *baseClient) start(ctx context.Context, active chan struct{}, i int, con
time.Sleep(time.Second * 2)
logger.Debug(conn.Server(), "Reconnecting")
- c.connections[i] = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback)
+ conn = c.makeConnection(conn.Server(), c.sshAuthMethods, c.hostKeyCallback)
+ c.connections[i] = conn
}
}
func (c *baseClient) makeConnection(server string, sshAuthMethods []gossh.AuthMethod, hostKeyCallback client.HostKeyCallback) connectors.Connector {
- return connectors.NewServerConnection(server, c.UserName, sshAuthMethods, hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands())
+ if c.Args.Serverless {
+ return connectors.NewServerless(c.UserName, c.maker.makeHandler(server), c.maker.makeCommands())
+ }
+ return connectors.NewServerConnection(server, c.UserName, sshAuthMethods,
+ hostKeyCallback, c.maker.makeHandler(server), c.maker.makeCommands())
}
func (c *baseClient) waitUntilDone(ctx context.Context, active chan struct{}) {
diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go
index fab2f87..0904ba1 100644
--- a/internal/clients/connectors/serverconnection.go
+++ b/internal/clients/connectors/serverconnection.go
@@ -16,31 +16,21 @@ import (
"golang.org/x/crypto/ssh"
)
-// ServerConnection represents a client connection connection to a single server.
+// ServerConnection represents a connection to a single remote dtail server via SSH protocol.
type ServerConnection struct {
- // The remote server's hostname connected to.
- server string
- // The remote server's port connected to.
- port int
- // The SSH client configuration used.
- config *ssh.ClientConfig
- // The SSH client handler to use.
- handler handlers.Handler
- // DTail commands sent from client to server. When client loses
- // connection to the server it re-connects automatically and sends the
- // same commands again.
- commands []string
- // Is it a persistent connection or a one-off?
- isOneOff bool
- // To deal with SSH server host keys
+ server string
+ port int
+ config *ssh.ClientConfig
+ handler handlers.Handler
+ commands []string
+ isOneOff bool
hostKeyCallback client.HostKeyCallback
- // To determine if connection throttling has finished or not
- throttlingDone bool
+ throttlingDone bool
}
-// NewServerConnection returns a new connection.
+// NewServerConnection returns a new DTail SSH server connection.
func NewServerConnection(server string, userName string, authMethods []ssh.AuthMethod, hostKeyCallback client.HostKeyCallback, handler handlers.Handler, commands []string) *ServerConnection {
- logger.Debug(server, "Creating new connection")
+ logger.Debug(server, "Creating new connection", server, handler, commands)
c := ServerConnection{
hostKeyCallback: hostKeyCallback,
@@ -77,12 +67,10 @@ func NewOneOffServerConnection(server string, userName string, authMethods []ssh
return &c
}
-// Server hostname
func (c *ServerConnection) Server() string {
return c.server
}
-// Handler for the connection
func (c *ServerConnection) Handler() handlers.Handler {
return c.handler
}
@@ -103,7 +91,6 @@ func (c *ServerConnection) initServerPort() {
}
}
-// Start the server connection. Build up SSH session and send some DTail commands.
func (c *ServerConnection) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) {
// Throttle how many connections can be established concurrently (based on ch length)
logger.Debug(c.server, "Throttling connection", len(throttleCh), cap(throttleCh))
@@ -161,7 +148,7 @@ func (c *ServerConnection) dial(ctx context.Context, cancel context.CancelFunc,
// Create the SSH session. Close the session in case of an error.
func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFunc, client *ssh.Client, throttleCh chan struct{}) error {
- logger.Debug(c.server, "session")
+ logger.Debug(c.server, "Creating SSH session")
session, err := client.NewSession()
if err != nil {
@@ -173,7 +160,7 @@ func (c *ServerConnection) session(ctx context.Context, cancel context.CancelFun
}
func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc, session *ssh.Session, throttleCh chan struct{}) error {
- logger.Debug(c.server, "handle")
+ logger.Debug(c.server, "Creating handler for SSH session")
stdinPipe, err := session.StdinPipe()
if err != nil {
@@ -221,5 +208,6 @@ func (c *ServerConnection) handle(ctx context.Context, cancel context.CancelFunc
<-ctx.Done()
c.handler.Shutdown()
+
return nil
}
diff --git a/internal/clients/connectors/serverless.go b/internal/clients/connectors/serverless.go
new file mode 100644
index 0000000..0500645
--- /dev/null
+++ b/internal/clients/connectors/serverless.go
@@ -0,0 +1,90 @@
+package connectors
+
+import (
+ "context"
+ "io"
+
+ "github.com/mimecast/dtail/internal/clients/handlers"
+ "github.com/mimecast/dtail/internal/config"
+ "github.com/mimecast/dtail/internal/io/logger"
+ serverHandlers "github.com/mimecast/dtail/internal/server/handlers"
+ user "github.com/mimecast/dtail/internal/user/server"
+)
+
+// Serverless creates a server object directly without TCP.
+type Serverless struct {
+ handler handlers.Handler
+ commands []string
+ userName string
+}
+
+// NewServerConnection returns a new connection.
+func NewServerless(userName string, handler handlers.Handler, commands []string) *Serverless {
+ s := Serverless{
+ userName: userName,
+ handler: handler,
+ commands: commands,
+ }
+
+ logger.Debug("Creating new serverless connector", handler, commands)
+ return &s
+}
+
+func (s *Serverless) Server() string {
+ return "local(serverless)"
+}
+
+func (s *Serverless) Handler() handlers.Handler {
+ return s.handler
+}
+
+func (s *Serverless) Start(ctx context.Context, cancel context.CancelFunc, throttleCh, statsCh chan struct{}) {
+ go func() {
+ defer cancel()
+
+ if err := s.handle(ctx, cancel); err != nil {
+ logger.Warn(err)
+ }
+ }()
+
+ <-ctx.Done()
+}
+
+func (s *Serverless) handle(ctx context.Context, cancel context.CancelFunc) error {
+ logger.Debug("Creating server handler for a serverless session")
+
+ serverHandler := serverHandlers.NewServerHandler(
+ user.New(s.userName, s.Server()),
+ make(chan struct{}, config.Server.MaxConcurrentCats),
+ make(chan struct{}, config.Server.MaxConcurrentTails),
+ )
+
+ go func() {
+ io.Copy(serverHandler, s.handler)
+ cancel()
+ }()
+
+ go func() {
+ io.Copy(s.handler, serverHandler)
+ cancel()
+ }()
+
+ go func() {
+ select {
+ case <-s.handler.Done():
+ case <-ctx.Done():
+ }
+ cancel()
+ }()
+
+ // Send all commands to client.
+ for _, command := range s.commands {
+ logger.Debug(command)
+ s.handler.SendMessage(command)
+ }
+
+ <-ctx.Done()
+ s.handler.Shutdown()
+
+ return nil
+}
diff --git a/internal/clients/handlers/basehandler.go b/internal/clients/handlers/basehandler.go
index 0f2d1b5..af1ad62 100644
--- a/internal/clients/handlers/basehandler.go
+++ b/internal/clients/handlers/basehandler.go
@@ -22,6 +22,16 @@ type baseHandler struct {
status int
}
+func (h *baseHandler) String() string {
+ return fmt.Sprintf("baseHandler(%s,server:%s,shellStarted:%v,status:%d)@%p",
+ h.done,
+ h.server,
+ h.shellStarted,
+ h.status,
+ h,
+ )
+}
+
func (h *baseHandler) Server() string {
return h.server
}
diff --git a/internal/done.go b/internal/done.go
index 54e5e8e..5ea22a0 100644
--- a/internal/done.go
+++ b/internal/done.go
@@ -17,6 +17,15 @@ func NewDone() *Done {
}
}
+func (d *Done) String() string {
+ select {
+ case <-d.Done():
+ return "Done(yes)"
+ default:
+ return "Done(no)"
+ }
+}
+
// Done returns the done channel (closed when done)
func (d *Done) Done() <-chan struct{} {
return d.ch
diff --git a/internal/server/handlers/serverhandler.go b/internal/server/handlers/serverhandler.go
index 2f3b73b..4820476 100644
--- a/internal/server/handlers/serverhandler.go
+++ b/internal/server/handlers/serverhandler.go
@@ -331,7 +331,11 @@ func (h *ServerHandler) handleAckCommand(argc int, args []string) {
return
}
if args[1] == "close" && args[2] == "connection" {
- close(h.ackCloseReceived)
+ select {
+ case <-h.ackCloseReceived:
+ default:
+ close(h.ackCloseReceived)
+ }
}
}