diff options
| -rw-r--r-- | TODO.md | 1 | ||||
| -rw-r--r-- | cmd/dtail/main.go | 1 | ||||
| -rw-r--r-- | integrationtests/commandutils.go | 112 | ||||
| -rw-r--r-- | integrationtests/dcat_test.go | 9 | ||||
| -rw-r--r-- | integrationtests/dgrep_test.go | 17 | ||||
| -rw-r--r-- | integrationtests/dmap_test.go | 16 | ||||
| -rw-r--r-- | integrationtests/dtail_test.go | 90 | ||||
| -rw-r--r-- | integrationtests/dtailhealthcheck_test.go | 33 | ||||
| -rw-r--r-- | integrationtests/fileutils.go (renamed from integrationtests/commons.go) | 38 | ||||
| -rw-r--r-- | internal/clients/connectors/serverconnection.go | 12 |
10 files changed, 248 insertions, 81 deletions
@@ -16,4 +16,3 @@ This is a loose list of what to do. Maybe for the next releae or maybe for a lat [ ] More integration test colors (via dcat?) [ ] Integration test for dtail in serverless mode [ ] Integration test for dtail normal mode -[ ] Fix the sync.Pools (they aren't concurrent as it seems and can cause a panic) diff --git a/cmd/dtail/main.go b/cmd/dtail/main.go index 4285476..bc4236c 100644 --- a/cmd/dtail/main.go +++ b/cmd/dtail/main.go @@ -85,6 +85,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) if shutdownAfter > 0 { + // TODO: This does not work (auto shutdown) ctx, cancel = context.WithTimeout(ctx, time.Duration(shutdownAfter)*time.Second) defer cancel() } diff --git a/integrationtests/commandutils.go b/integrationtests/commandutils.go new file mode 100644 index 0000000..d2f567f --- /dev/null +++ b/integrationtests/commandutils.go @@ -0,0 +1,112 @@ +package integrationtests + +import ( + "bufio" + "context" + "fmt" + "os" + "os/exec" + "sync" + "syscall" + "time" +) + +// The exit code and the Go error of the command terminated. +type exitPromise func() (int, error) + +func runCommand(ctx context.Context, stdoutFile, cmdStr string, + args ...string) (int, error) { + + stdinCh, _, exit, err := startCommand(ctx, cmdStr, args...) + if err != nil { + return -1, err + } + + fd, err := os.Create(stdoutFile) + if err != nil { + return -2, err + } + + var wg sync.WaitGroup + wg.Add(1) + defer wg.Wait() + + go func() { + defer fd.Close() + defer wg.Done() + for line := range stdinCh { + fd.WriteString(line) + fd.WriteString("\n") + } + }() + + return exit() +} + +func runCommandRetry(ctx context.Context, retries int, stdoutFile, cmd string, + args ...string) (exitCode int, err error) { + + for i := 0; i < retries; i++ { + time.Sleep(time.Second) + if exitCode, err = runCommand(ctx, stdoutFile, cmd, args...); exitCode == 0 { + return + } + } + return +} + +func startCommand(ctx context.Context, cmdStr string, + args ...string) (<-chan string, <-chan string, exitPromise, error) { + + stdoutCh := make(chan string) + stderrCh := make(chan string) + + if _, err := os.Stat(cmdStr); err != nil { + return stdoutCh, stderrCh, nil, + fmt.Errorf("no such executable '%s', please compile first: %v", cmdStr, err) + } + + cmd := exec.CommandContext(ctx, cmdStr, args...) + + cmdStdout, err := cmd.StdoutPipe() + if err != nil { + return stdoutCh, stderrCh, nil, err + } + cmdStderr, err := cmd.StderrPipe() + err = cmd.Start() + if err != nil { + return stdoutCh, stderrCh, nil, err + } + + go func() { + defer close(stdoutCh) + scanner := bufio.NewScanner(cmdStdout) + scanner.Split(bufio.ScanLines) + for scanner.Scan() { + stdoutCh <- scanner.Text() + } + }() + go func() { + close(stderrCh) + scanner := bufio.NewScanner(cmdStderr) + scanner.Split(bufio.ScanLines) + for scanner.Scan() { + stderrCh <- scanner.Text() + } + }() + + return stdoutCh, stderrCh, func() (int, error) { + err := cmd.Wait() + return exitCodeFromError(err), err + }, nil +} + +func exitCodeFromError(err error) int { + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + ws := exitError.Sys().(syscall.WaitStatus) + return ws.ExitStatus() + } + } + return 0 +} diff --git a/integrationtests/dcat_test.go b/integrationtests/dcat_test.go index 342ebd0..e172bfa 100644 --- a/integrationtests/dcat_test.go +++ b/integrationtests/dcat_test.go @@ -1,6 +1,7 @@ package integrationtests import ( + "context" "os" "testing" ) @@ -8,15 +9,19 @@ import ( func TestDCat(t *testing.T) { testdataFile := "dcat.txt.expected" stdoutFile := "dcat.out" - args := []string{"-spartan", testdataFile} - if _, err := runCommand(t, "../dcat", args, stdoutFile); err != nil { + _, err := runCommand(context.TODO(), stdoutFile, + "../dcat", "--spartan", testdataFile) + + if err != nil { t.Error(err) return } + if err := compareFiles(t, stdoutFile, testdataFile); err != nil { t.Error(err) return } + os.Remove(stdoutFile) } diff --git a/integrationtests/dgrep_test.go b/integrationtests/dgrep_test.go index 4d54a2d..15519b3 100644 --- a/integrationtests/dgrep_test.go +++ b/integrationtests/dgrep_test.go @@ -1,6 +1,7 @@ package integrationtests import ( + "context" "os" "testing" ) @@ -9,16 +10,20 @@ func TestDGrep(t *testing.T) { inFile := "mapr_testdata.log" stdoutFile := "dgrep.stdout.tmp" expectedStdoutFile := "dgrep.txt.expected" - args := []string{"-spartan", "--grep", "20211002-071947", inFile} - if _, err := runCommand(t, "../dgrep", args, stdoutFile); err != nil { + _, err := runCommand(context.TODO(), stdoutFile, + "../dgrep", "--spartan", "--grep", "20211002-071947", inFile) + + if err != nil { t.Error(err) return } + if err := compareFiles(t, stdoutFile, expectedStdoutFile); err != nil { t.Error(err) return } + os.Remove(stdoutFile) } @@ -26,15 +31,19 @@ func TestDGrep2(t *testing.T) { inFile := "mapr_testdata.log" stdoutFile := "dgrep2.stdout.tmp" expectedStdoutFile := "dgrep2.txt.expected" - args := []string{"-spartan", "--grep", "20211002-071947", "--invert", inFile} - if _, err := runCommand(t, "../dgrep", args, stdoutFile); err != nil { + _, err := runCommand(context.TODO(), stdoutFile, + "../dgrep", "-spartan", "--grep", "20211002-071947", "--invert", inFile) + + if err != nil { t.Error(err) return } + if err := compareFiles(t, stdoutFile, expectedStdoutFile); err != nil { t.Error(err) return } + os.Remove(stdoutFile) } diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go index f5c78e0..966944a 100644 --- a/integrationtests/dmap_test.go +++ b/integrationtests/dmap_test.go @@ -1,6 +1,7 @@ package integrationtests import ( + "context" "fmt" "os" "testing" @@ -17,12 +18,15 @@ func TestDMap(t *testing.T) { query := fmt.Sprintf("from STATS select count($line),last($time),"+ "avg($goroutines),min(concurrentConnections),max(lifetimeConnections) "+ "group by $hostname outfile %s", csvFile) - args := []string{"-query", query, inFile} - if _, err := runCommand(t, "../dmap", args, stdoutFile); err != nil { + _, err := runCommand(context.TODO(), stdoutFile, + "../dmap", "--query", query, inFile) + + if err != nil { t.Error(err) return } + if err := compareFiles(t, csvFile, expectedCsvFile); err != nil { t.Error(err) return @@ -49,11 +53,13 @@ func TestDMap2(t *testing.T) { "avg($goroutines),min($goroutines) group by $time order by count($time) "+ "outfile %s", csvFile) - args := []string{"-query", query, inFile} - if _, err := runCommand(t, "../dmap", args, stdoutFile); err != nil { + _, err := runCommand(context.TODO(), stdoutFile, + "../dmap", "--query", query, inFile) + if err != nil { t.Error(err) return } + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { t.Error(err) return @@ -86,7 +92,7 @@ func TestDMap3(t *testing.T) { args = append(args, inFile) } - if _, err := runCommand(t, "../dmap", args, stdoutFile); err != nil { + if _, err := runCommand(context.TODO(), stdoutFile, "../dmap", args...); err != nil { t.Error(err) return } diff --git a/integrationtests/dtail_test.go b/integrationtests/dtail_test.go index 36eadc0..267cd26 100644 --- a/integrationtests/dtail_test.go +++ b/integrationtests/dtail_test.go @@ -1,16 +1,102 @@ package integrationtests import ( + "context" "os" "testing" ) +func TestDTailWithServer(t *testing.T) { + followFile := "dtail.follow.tmp" + //serverStdoutFile := "dtail.dserver.stdout.tmp" + //greetings := []string{"world", "sol system", "milky way", "universe", "multiverse"} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + serverCh, _, _, err := startCommand(ctx, + "../dserver", + "--logger", "stdout", + "--logLevel", "info", + "--port", "4242", + "--relaxedAuth", + ) + if err != nil { + t.Error(err) + return + } + + clientCh, _, _, err := startCommand(ctx, + "../dtail", + "--logger", "stdout", + "--logLevel", "devel", + "--servers", "localhost:4242", + "--files", followFile, + "--grep", "Hello", + "--trustAllHosts", + "--noColor", + ) + if err != nil { + t.Error(err) + return + } + + for { + select { + case line := <-serverCh: + t.Log("server:", line) + case line := <-clientCh: + t.Log("client:", line) + case <-ctx.Done(): + t.Log("Done reading client and server pipes") + } + } + + /* + // Start dtail client, connect to the server and follow followFile. + + //clientStdoutFile := "dtail.stdout.tmp" + /* + + t.Log(clientArgs) + // TODO: Pipe with dtail command to read stdin stream. + // runCommandContextRetry(ctx, t, "../dtail", clientArgs, clientStdoutFile) + + // Write greetings to followFile + fd, err := os.Create(followFile) + if err != nil { + t.Error(err) + } + defer fd.Close() + + go func() { + var circular int + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + fd.WriteString(time.Now().String()) + fd.WriteString(fmt.Sprintf(" - Hello %s!\n", greetings[circular])) + circular = (circular + 1) % len(greetings) + } + } + }() + */ + + /* + os.Remove(serverStdoutFile) + os.Remove(clientStdoutFile) + os.Remove(followFile) + */ +} + func TestDTailColorTable(t *testing.T) { stdoutFile := "dtailcolortable.stdout.tmp" expectedStdoutFile := "dtailcolortable.expected" - args := []string{"-colorTable"} - if _, err := runCommand(t, "../dtail", args, stdoutFile); err != nil { + _, err := runCommand(context.TODO(), stdoutFile, "../dtail", "--colorTable") + if err != nil { t.Error(err) return } diff --git a/integrationtests/dtailhealthcheck_test.go b/integrationtests/dtailhealthcheck_test.go index d562239..a99bfdc 100644 --- a/integrationtests/dtailhealthcheck_test.go +++ b/integrationtests/dtailhealthcheck_test.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "testing" - "time" ) func TestDTailHealthCheck(t *testing.T) { @@ -13,7 +12,7 @@ func TestDTailHealthCheck(t *testing.T) { expectedStdoutFile := "dtailhealthcheck.expected" t.Log("Serverless check, is supposed to exit with warning state.") - exitCode, err := runCommand(t, "../dtailhealthcheck", []string{}, stdoutFile) + exitCode, err := runCommand(context.TODO(), stdoutFile, "../dtailhealthcheck") if exitCode != 1 { t.Error(fmt.Sprintf("Expected exit code '1' but got '%d': %v", exitCode, err)) return @@ -23,17 +22,17 @@ func TestDTailHealthCheck(t *testing.T) { t.Error(err) return } - os.Remove(stdoutFile) } func TestDTailHealthCheck2(t *testing.T) { stdoutFile := "dtailhealthcheck2.stdout.tmp" expectedStdoutFile := "dtailhealthcheck2.expected" - args := []string{"--server", "example:1"} t.Log("Negative test, is supposed to exit with a critical state.") - exitCode, err := runCommand(t, "../dtailhealthcheck", args, stdoutFile) + exitCode, err := runCommand(context.TODO(), stdoutFile, + "../dtailhealthcheck", "--server", "example:1") + if exitCode != 2 { t.Error(fmt.Sprintf("Expected exit code '2' but got '%d': %v", exitCode, err)) return @@ -49,27 +48,20 @@ func TestDTailHealthCheck2(t *testing.T) { func TestDTailHealthCheck3(t *testing.T) { stdoutFile := "dtailhealthcheck3.stdout.tmp" - serverStdoutFile := "dtailhealthcheck3.dserver.stdout.tmp" expectedStdoutFile := "dtailhealthcheck3.expected" ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - args := []string{"--logger", "stdout", "--logLevel", "trace", "--port", "4242"} - runCommandContext(ctx, t, "../dserver", args, serverStdoutFile) - }() + startCommand(ctx, + "../dserver", + "--logger", "stdout", + "--logLevel", "trace", + "--port", "4242", + ) - var err error - args := []string{"--server", "localhost:4242"} - for i := 0; i < 30; i++ { - t.Log("Waiting for dserver to start", i) - time.Sleep(time.Second) - var exitCode int - if exitCode, err = runCommand(t, "../dtailhealthcheck", args, stdoutFile); exitCode == 0 { - break - } - } + _, err := runCommandRetry(ctx, 10, stdoutFile, + "../dtailhealthcheck", "--server", "localhost:4242") if err != nil { t.Error(err) return @@ -80,6 +72,5 @@ func TestDTailHealthCheck3(t *testing.T) { return } - os.Remove(serverStdoutFile) os.Remove(stdoutFile) } diff --git a/integrationtests/commons.go b/integrationtests/fileutils.go index 2fdbfc3..d771607 100644 --- a/integrationtests/commons.go +++ b/integrationtests/fileutils.go @@ -2,53 +2,15 @@ package integrationtests import ( "bufio" - "context" "crypto/sha256" "encoding/base64" "fmt" "io/ioutil" "os" "os/exec" - "strings" - "syscall" "testing" ) -func runCommand(t *testing.T, cmd string, args []string, stdoutFile string) (int, error) { - return runCommandContext(context.TODO(), t, cmd, args, stdoutFile) -} - -func runCommandContext(ctx context.Context, t *testing.T, cmd string, args []string, - stdoutFile string) (int, error) { - - if _, err := os.Stat(cmd); err != nil { - return -1, fmt.Errorf("No such binary %s, please compile first (%v)", cmd, err) - } - - t.Log("Running command:", cmd, strings.Join(args, " ")) - bytes, cmdErr := exec.CommandContext(ctx, cmd, args...).Output() - - t.Log("Writing stdout to file", stdoutFile) - fd, err := os.Create(stdoutFile) - if err != nil { - return -1, err - } - defer fd.Close() - fd.Write(bytes) - - return exitCodeFromError(cmdErr), err -} - -func exitCodeFromError(err error) int { - if err != nil { - if exitError, ok := err.(*exec.ExitError); ok { - ws := exitError.Sys().(syscall.WaitStatus) - return ws.ExitStatus() - } - } - return 0 -} - // Checks whether both files have the same lines (order doesn't matter) func compareFilesContents(t *testing.T, fileA, fileB string) error { mapFile := func(file string) (map[string]int, error) { diff --git a/internal/clients/connectors/serverconnection.go b/internal/clients/connectors/serverconnection.go index 2d7b45a..2737ede 100644 --- a/internal/clients/connectors/serverconnection.go +++ b/internal/clients/connectors/serverconnection.go @@ -37,6 +37,7 @@ func NewServerConnection(server string, userName string, c := ServerConnection{ hostKeyCallback: hostKeyCallback, server: server, + port: config.Common.SSHPort, handler: handler, commands: commands, config: &ssh.ClientConfig{ @@ -47,25 +48,20 @@ func NewServerConnection(server string, userName string, }, } + // TODO: After reconnecting the port is wrong! Due to string slicing? c.initServerPort() return &c } // Server returns the server hostname connected to. -func (c *ServerConnection) Server() string { - return c.server -} +func (c *ServerConnection) Server() string { return c.server } // Handler returns the handler used for the connection. -func (c *ServerConnection) Handler() handlers.Handler { - return c.handler -} +func (c *ServerConnection) Handler() handlers.Handler { return c.handler } // Attempt to parse the server port address from the provided server FQDN. func (c *ServerConnection) initServerPort() { - c.port = config.Common.SSHPort parts := strings.Split(c.server, ":") - if len(parts) == 2 { dlog.Client.Debug("Parsing port from hostname", parts) port, err := strconv.Atoi(parts[1]) |
