summaryrefslogtreecommitdiff
path: root/integrationtests/dmap_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'integrationtests/dmap_test.go')
-rw-r--r--integrationtests/dmap_test.go474
1 files changed, 453 insertions, 21 deletions
diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go
index f772243..920a167 100644
--- a/integrationtests/dmap_test.go
+++ b/integrationtests/dmap_test.go
@@ -4,7 +4,9 @@ import (
"context"
"fmt"
"os"
+ "strings"
"testing"
+ "time"
"github.com/mimecast/dtail/internal/config"
)
@@ -30,21 +32,40 @@ func TestDMap1(t *testing.T) {
"$foo = 42, $bar = \"baz\", $baz = $time group by $hostname",
}
- for subtestName, query := range testTable {
- t.Log("Testing dmap with input file")
- if err := testDmap1(t, query, subtestName, false); err != nil {
- t.Error(err)
- return
+ // Test in serverless mode
+ t.Run("Serverless", func(t *testing.T) {
+ for subtestName, query := range testTable {
+ t.Log("Testing dmap with input file")
+ if err := testDmap1Serverless(t, query, subtestName, false); err != nil {
+ t.Error(err)
+ return
+ }
+ t.Log("Testing dmap with stdin input pipe")
+ if err := testDmap1Serverless(t, query, subtestName, true); err != nil {
+ t.Error(err)
+ return
+ }
}
- t.Log("Testing dmap with stdin input pipe")
- if err := testDmap1(t, query, subtestName, true); err != nil {
- t.Error(err)
- return
+ })
+
+ // Test in server mode
+ t.Run("ServerMode", func(t *testing.T) {
+ for subtestName, query := range testTable {
+ t.Log("Testing dmap with input file in server mode")
+ if err := testDmap1WithServer(t, query, subtestName, false); err != nil {
+ t.Error(err)
+ return
+ }
+ t.Log("Testing dmap with stdin input pipe in server mode")
+ if err := testDmap1WithServer(t, query, subtestName, true); err != nil {
+ t.Error(err)
+ return
+ }
}
- }
+ })
}
-func testDmap1(t *testing.T, query, subtestName string, usePipe bool) error {
+func testDmap1Serverless(t *testing.T, query, subtestName string, usePipe bool) error {
inFile := "mapr_testdata.log"
csvFile := fmt.Sprintf("dmap1%s.csv.tmp", subtestName)
expectedCsvFile := fmt.Sprintf("dmap1%s.csv.expected", subtestName)
@@ -96,11 +117,98 @@ func testDmap1(t *testing.T, query, subtestName string, usePipe bool) error {
return nil
}
+func testDmap1WithServer(t *testing.T, query, subtestName string, usePipe bool) error {
+ inFile := "mapr_testdata.log"
+ csvFile := fmt.Sprintf("dmap1%s.csv.tmp", subtestName)
+ expectedCsvFile := fmt.Sprintf("dmap1%s.csv.expected", subtestName)
+ queryFile := fmt.Sprintf("%s.query", csvFile)
+ expectedQueryFile := fmt.Sprintf("dmap1%s.csv.query.expected", subtestName)
+ query = fmt.Sprintf("%s outfile %s", query, csvFile)
+ port := getUniquePortNumber()
+ bindAddress := "localhost"
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Start dserver
+ _, _, _, err := startCommand(ctx, t,
+ "", "../dserver",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "error",
+ "--bindAddress", bindAddress,
+ "--port", fmt.Sprintf("%d", port),
+ )
+ if err != nil {
+ return err
+ }
+
+ // Give server time to start
+ time.Sleep(500 * time.Millisecond)
+
+ var stdoutCh, stderrCh <-chan string
+ var cmdErrCh <-chan error
+
+ if usePipe {
+ stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t,
+ inFile, "../dmap",
+ "--cfg", "none",
+ "--query", query,
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor",
+ "--servers", fmt.Sprintf("%s:%d", bindAddress, port),
+ "--trustAllHosts")
+ } else {
+ stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t,
+ "", "../dmap",
+ "--cfg", "none",
+ "--query", query,
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor",
+ "--servers", fmt.Sprintf("%s:%d", bindAddress, port),
+ "--trustAllHosts",
+ "--files", inFile)
+ }
+
+ if err != nil {
+ return err
+ }
+
+ waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+ cancel()
+
+ if err := compareFiles(t, csvFile, expectedCsvFile); err != nil {
+ return err
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ return err
+ }
+
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+ return nil
+}
+
func TestDMap2(t *testing.T) {
if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") {
t.Log("Skipping")
return
}
+
+ // Test in serverless mode
+ t.Run("Serverless", func(t *testing.T) {
+ testDMap2Serverless(t)
+ })
+
+ // Test in server mode
+ t.Run("ServerMode", func(t *testing.T) {
+ testDMap2WithServer(t)
+ })
+}
+
+func testDMap2Serverless(t *testing.T) {
inFile := "mapr_testdata.log"
outFile := "dmap2.stdout.tmp"
csvFile := "dmap2.csv.tmp"
@@ -133,11 +241,84 @@ func TestDMap2(t *testing.T) {
os.Remove(queryFile)
}
+func testDMap2WithServer(t *testing.T) {
+ inFile := "mapr_testdata.log"
+ outFile := "dmap2.stdout.tmp"
+ csvFile := "dmap2.csv.tmp"
+ expectedCsvFile := "dmap2.csv.expected"
+ queryFile := fmt.Sprintf("%s.query", csvFile)
+ expectedQueryFile := "dmap2.csv.query.expected"
+ port := getUniquePortNumber()
+ bindAddress := "localhost"
+
+ query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+
+ "avg($goroutines),min($goroutines) group by $time order by count($time) "+
+ "outfile %s", csvFile)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Start dserver
+ _, _, _, err := startCommand(ctx, t,
+ "", "../dserver",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "error",
+ "--bindAddress", bindAddress,
+ "--port", fmt.Sprintf("%d", port),
+ )
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ // Give server time to start
+ time.Sleep(500 * time.Millisecond)
+
+ _, err = runCommand(ctx, t, outFile,
+ "../dmap", "--query", query, "--cfg", "none",
+ "--servers", fmt.Sprintf("%s:%d", bindAddress, port),
+ "--trustAllHosts",
+ "--files", inFile)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ cancel()
+
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ t.Error(err)
+ return
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ t.Error(err)
+ return
+ }
+
+ os.Remove(outFile)
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+}
+
func TestDMap3(t *testing.T) {
if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") {
t.Log("Skipping")
return
}
+
+ // Test in serverless mode
+ t.Run("Serverless", func(t *testing.T) {
+ testDMap3Serverless(t)
+ })
+
+ // Test in server mode
+ t.Run("ServerMode", func(t *testing.T) {
+ testDMap3WithServer(t)
+ })
+}
+
+func testDMap3Serverless(t *testing.T) {
inFile := "mapr_testdata.log"
outFile := "dmap3.stdout.tmp"
csvFile := "dmap3.csv.tmp"
@@ -152,6 +333,84 @@ func TestDMap3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
+ // Create 100 copies of the input file
+ files := make([]string, 100)
+ for i := 0; i < 100; i++ {
+ files[i] = inFile
+ }
+
+ args := []string{
+ "--query", query,
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor",
+ }
+ args = append(args, files...)
+
+ stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
+ "", "../dmap", args...)
+
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ t.Error(err)
+ return
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ t.Error(err)
+ return
+ }
+
+ os.Remove(outFile)
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+}
+
+func testDMap3WithServer(t *testing.T) {
+ inFile := "mapr_testdata.log"
+ outFile := "dmap3.stdout.tmp"
+ csvFile := "dmap3.csv.tmp"
+ expectedCsvFile := "dmap3.csv.expected"
+ queryFile := fmt.Sprintf("%s.query", csvFile)
+ expectedQueryFile := "dmap3.csv.query.expected"
+ port := getUniquePortNumber()
+ bindAddress := "localhost"
+
+ query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+
+ "avg($goroutines),min($goroutines) group by $time order by count($time) "+
+ "outfile %s", csvFile)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Start dserver
+ _, _, _, err := startCommand(ctx, t,
+ "", "../dserver",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "error",
+ "--bindAddress", bindAddress,
+ "--port", fmt.Sprintf("%d", port),
+ )
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ // Give server time to start
+ time.Sleep(500 * time.Millisecond)
+
+ // Create 100 copies of the input file
+ files := make([]string, 100)
+ for i := 0; i < 100; i++ {
+ files[i] = inFile
+ }
+
stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
"", "../dmap",
"--query", query,
@@ -159,22 +418,16 @@ func TestDMap3(t *testing.T) {
"--logger", "stdout",
"--logLevel", "info",
"--noColor",
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile,
- inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile)
+ "--servers", fmt.Sprintf("%s:%d", bindAddress, port),
+ "--trustAllHosts",
+ "--files", strings.Join(files, ","))
if err != nil {
t.Error(err)
return
}
waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+ cancel()
if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
t.Error(err)
@@ -195,6 +448,19 @@ func TestDMap4Append(t *testing.T) {
t.Log("Skipping")
return
}
+
+ // Test in serverless mode
+ t.Run("Serverless", func(t *testing.T) {
+ testDMap4AppendServerless(t)
+ })
+
+ // Test in server mode
+ t.Run("ServerMode", func(t *testing.T) {
+ testDMap4AppendWithServer(t)
+ })
+}
+
+func testDMap4AppendServerless(t *testing.T) {
inFile := "mapr_testdata.log"
outFile := "dmap4.stdout.tmp"
csvFile := "dmap4.csv.tmp"
@@ -246,11 +512,100 @@ func TestDMap4Append(t *testing.T) {
os.Remove(queryFile)
}
+func testDMap4AppendWithServer(t *testing.T) {
+ inFile := "mapr_testdata.log"
+ outFile := "dmap4.stdout.tmp"
+ csvFile := "dmap4.csv.tmp"
+ expectedCsvFile := "dmap4.csv.expected"
+ queryFile := fmt.Sprintf("%s.query", csvFile)
+ expectedQueryFile := "dmap4.csv.query.expected"
+ port := getUniquePortNumber()
+ bindAddress := "localhost"
+
+ // Delete in case it exists already. Otherwise, test will fail.
+ os.Remove(csvFile)
+
+ query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+
+ "avg($goroutines),min($goroutines) group by $time order by count($time) "+
+ "outfile append %s", csvFile)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Start dserver
+ _, _, _, err := startCommand(ctx, t,
+ "", "../dserver",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "error",
+ "--bindAddress", bindAddress,
+ "--port", fmt.Sprintf("%d", port),
+ )
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ // Give server time to start
+ time.Sleep(500 * time.Millisecond)
+
+ // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing
+ // file as we specified "outfile append". That works transparently for any mapreduce query
+ // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap
+ // command.
+ for i := 0; i < 2; i++ {
+ stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
+ "", "../dmap",
+ "--query", query,
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor",
+ "--servers", fmt.Sprintf("%s:%d", bindAddress, port),
+ "--trustAllHosts",
+ "--files", inFile)
+
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+ }
+
+ cancel()
+
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ t.Error(err)
+ return
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ t.Error(err)
+ return
+ }
+
+ os.Remove(outFile)
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+}
+
func TestDMap5CSV(t *testing.T) {
if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") {
t.Log("Skipping")
return
}
+
+ // Test in serverless mode
+ t.Run("Serverless", func(t *testing.T) {
+ testDMap5CSVServerless(t)
+ })
+
+ // Test in server mode
+ t.Run("ServerMode", func(t *testing.T) {
+ testDMap5CSVWithServer(t)
+ })
+}
+
+func testDMap5CSVServerless(t *testing.T) {
inFile := "dmap5.csv.in"
outFile := "dmap5.stdout.tmp"
csvFile := "dmap5.csv.tmp"
@@ -302,3 +657,80 @@ func TestDMap5CSV(t *testing.T) {
os.Remove(csvFile)
os.Remove(queryFile)
}
+
+func testDMap5CSVWithServer(t *testing.T) {
+ inFile := "dmap5.csv.in"
+ outFile := "dmap5.stdout.tmp"
+ csvFile := "dmap5.csv.tmp"
+ expectedCsvFile := "dmap5.csv.expected"
+ queryFile := fmt.Sprintf("%s.query", csvFile)
+ expectedQueryFile := "dmap5.csv.query.expected"
+ port := getUniquePortNumber()
+ bindAddress := "localhost"
+
+ // Delete in case it exists already. Otherwise, test will fail.
+ os.Remove(csvFile)
+
+ query := fmt.Sprintf("select sum($timecount),last($time),min($min_goroutines),"+
+ " group by $hostname"+
+ " set $timecount = `count($time)`, $time = `$time`, $min_goroutines = `min($goroutines)`"+
+ " logformat csv outfile %s", csvFile)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Start dserver
+ _, _, _, err := startCommand(ctx, t,
+ "", "../dserver",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "error",
+ "--bindAddress", bindAddress,
+ "--port", fmt.Sprintf("%d", port),
+ )
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ // Give server time to start
+ time.Sleep(500 * time.Millisecond)
+
+ // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing
+ // file as we specified "outfile append". That works transparently for any mapreduce query
+ // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap
+ // command.
+ for i := 0; i < 2; i++ {
+ stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t,
+ "", "../dmap",
+ "--query", query,
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--noColor",
+ "--servers", fmt.Sprintf("%s:%d", bindAddress, port),
+ "--trustAllHosts",
+ "--files", inFile)
+
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh)
+ }
+
+ cancel()
+
+ if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
+ t.Error(err)
+ return
+ }
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ t.Error(err)
+ return
+ }
+
+ os.Remove(outFile)
+ os.Remove(csvFile)
+ os.Remove(queryFile)
+}