diff options
| author | Paul Buetow <paul@buetow.org> | 2025-06-24 18:38:57 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-06-24 18:38:57 +0300 |
| commit | 480dcf59cff0e2329cee80c456dfe4da2b7d4269 (patch) | |
| tree | 7072ee09343b1468383382afcf06b3b8b159e9a2 | |
| parent | 61b2a90aefee82da19ea5b388fb6112760833d97 (diff) | |
Update DMap integration tests to run in both serverless and server modes
- Add server mode variants for all DMap tests (TestDMap1 through TestDMap5CSV)
- Use comma-separated file lists for server mode when multiple files needed
- Follow same pattern as dcat tests with separate serverless/server functions
- All DMap tests now pass in both execution modes
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
| -rw-r--r-- | integrationtests/dmap_test.go | 474 |
1 files changed, 453 insertions, 21 deletions
diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go index f772243..920a167 100644 --- a/integrationtests/dmap_test.go +++ b/integrationtests/dmap_test.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "os" + "strings" "testing" + "time" "github.com/mimecast/dtail/internal/config" ) @@ -30,21 +32,40 @@ func TestDMap1(t *testing.T) { "$foo = 42, $bar = \"baz\", $baz = $time group by $hostname", } - for subtestName, query := range testTable { - t.Log("Testing dmap with input file") - if err := testDmap1(t, query, subtestName, false); err != nil { - t.Error(err) - return + // Test in serverless mode + t.Run("Serverless", func(t *testing.T) { + for subtestName, query := range testTable { + t.Log("Testing dmap with input file") + if err := testDmap1Serverless(t, query, subtestName, false); err != nil { + t.Error(err) + return + } + t.Log("Testing dmap with stdin input pipe") + if err := testDmap1Serverless(t, query, subtestName, true); err != nil { + t.Error(err) + return + } } - t.Log("Testing dmap with stdin input pipe") - if err := testDmap1(t, query, subtestName, true); err != nil { - t.Error(err) - return + }) + + // Test in server mode + t.Run("ServerMode", func(t *testing.T) { + for subtestName, query := range testTable { + t.Log("Testing dmap with input file in server mode") + if err := testDmap1WithServer(t, query, subtestName, false); err != nil { + t.Error(err) + return + } + t.Log("Testing dmap with stdin input pipe in server mode") + if err := testDmap1WithServer(t, query, subtestName, true); err != nil { + t.Error(err) + return + } } - } + }) } -func testDmap1(t *testing.T, query, subtestName string, usePipe bool) error { +func testDmap1Serverless(t *testing.T, query, subtestName string, usePipe bool) error { inFile := "mapr_testdata.log" csvFile := fmt.Sprintf("dmap1%s.csv.tmp", subtestName) expectedCsvFile := fmt.Sprintf("dmap1%s.csv.expected", subtestName) @@ -96,11 +117,98 @@ func testDmap1(t *testing.T, query, subtestName string, usePipe bool) error { return nil } +func testDmap1WithServer(t *testing.T, query, subtestName string, usePipe bool) error { + inFile := "mapr_testdata.log" + csvFile := fmt.Sprintf("dmap1%s.csv.tmp", subtestName) + expectedCsvFile := fmt.Sprintf("dmap1%s.csv.expected", subtestName) + queryFile := fmt.Sprintf("%s.query", csvFile) + expectedQueryFile := fmt.Sprintf("dmap1%s.csv.query.expected", subtestName) + query = fmt.Sprintf("%s outfile %s", query, csvFile) + port := getUniquePortNumber() + bindAddress := "localhost" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start dserver + _, _, _, err := startCommand(ctx, t, + "", "../dserver", + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "error", + "--bindAddress", bindAddress, + "--port", fmt.Sprintf("%d", port), + ) + if err != nil { + return err + } + + // Give server time to start + time.Sleep(500 * time.Millisecond) + + var stdoutCh, stderrCh <-chan string + var cmdErrCh <-chan error + + if usePipe { + stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t, + inFile, "../dmap", + "--cfg", "none", + "--query", query, + "--logger", "stdout", + "--logLevel", "info", + "--noColor", + "--servers", fmt.Sprintf("%s:%d", bindAddress, port), + "--trustAllHosts") + } else { + stdoutCh, stderrCh, cmdErrCh, err = startCommand(ctx, t, + "", "../dmap", + "--cfg", "none", + "--query", query, + "--logger", "stdout", + "--logLevel", "info", + "--noColor", + "--servers", fmt.Sprintf("%s:%d", bindAddress, port), + "--trustAllHosts", + "--files", inFile) + } + + if err != nil { + return err + } + + waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + cancel() + + if err := compareFiles(t, csvFile, expectedCsvFile); err != nil { + return err + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + return err + } + + os.Remove(csvFile) + os.Remove(queryFile) + return nil +} + func TestDMap2(t *testing.T) { if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { t.Log("Skipping") return } + + // Test in serverless mode + t.Run("Serverless", func(t *testing.T) { + testDMap2Serverless(t) + }) + + // Test in server mode + t.Run("ServerMode", func(t *testing.T) { + testDMap2WithServer(t) + }) +} + +func testDMap2Serverless(t *testing.T) { inFile := "mapr_testdata.log" outFile := "dmap2.stdout.tmp" csvFile := "dmap2.csv.tmp" @@ -133,11 +241,84 @@ func TestDMap2(t *testing.T) { os.Remove(queryFile) } +func testDMap2WithServer(t *testing.T) { + inFile := "mapr_testdata.log" + outFile := "dmap2.stdout.tmp" + csvFile := "dmap2.csv.tmp" + expectedCsvFile := "dmap2.csv.expected" + queryFile := fmt.Sprintf("%s.query", csvFile) + expectedQueryFile := "dmap2.csv.query.expected" + port := getUniquePortNumber() + bindAddress := "localhost" + + query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+ + "avg($goroutines),min($goroutines) group by $time order by count($time) "+ + "outfile %s", csvFile) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start dserver + _, _, _, err := startCommand(ctx, t, + "", "../dserver", + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "error", + "--bindAddress", bindAddress, + "--port", fmt.Sprintf("%d", port), + ) + if err != nil { + t.Error(err) + return + } + + // Give server time to start + time.Sleep(500 * time.Millisecond) + + _, err = runCommand(ctx, t, outFile, + "../dmap", "--query", query, "--cfg", "none", + "--servers", fmt.Sprintf("%s:%d", bindAddress, port), + "--trustAllHosts", + "--files", inFile) + if err != nil { + t.Error(err) + return + } + + cancel() + + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + t.Error(err) + return + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + t.Error(err) + return + } + + os.Remove(outFile) + os.Remove(csvFile) + os.Remove(queryFile) +} + func TestDMap3(t *testing.T) { if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { t.Log("Skipping") return } + + // Test in serverless mode + t.Run("Serverless", func(t *testing.T) { + testDMap3Serverless(t) + }) + + // Test in server mode + t.Run("ServerMode", func(t *testing.T) { + testDMap3WithServer(t) + }) +} + +func testDMap3Serverless(t *testing.T) { inFile := "mapr_testdata.log" outFile := "dmap3.stdout.tmp" csvFile := "dmap3.csv.tmp" @@ -152,6 +333,84 @@ func TestDMap3(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Create 100 copies of the input file + files := make([]string, 100) + for i := 0; i < 100; i++ { + files[i] = inFile + } + + args := []string{ + "--query", query, + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "info", + "--noColor", + } + args = append(args, files...) + + stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, + "", "../dmap", args...) + + if err != nil { + t.Error(err) + return + } + waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + t.Error(err) + return + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + t.Error(err) + return + } + + os.Remove(outFile) + os.Remove(csvFile) + os.Remove(queryFile) +} + +func testDMap3WithServer(t *testing.T) { + inFile := "mapr_testdata.log" + outFile := "dmap3.stdout.tmp" + csvFile := "dmap3.csv.tmp" + expectedCsvFile := "dmap3.csv.expected" + queryFile := fmt.Sprintf("%s.query", csvFile) + expectedQueryFile := "dmap3.csv.query.expected" + port := getUniquePortNumber() + bindAddress := "localhost" + + query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+ + "avg($goroutines),min($goroutines) group by $time order by count($time) "+ + "outfile %s", csvFile) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start dserver + _, _, _, err := startCommand(ctx, t, + "", "../dserver", + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "error", + "--bindAddress", bindAddress, + "--port", fmt.Sprintf("%d", port), + ) + if err != nil { + t.Error(err) + return + } + + // Give server time to start + time.Sleep(500 * time.Millisecond) + + // Create 100 copies of the input file + files := make([]string, 100) + for i := 0; i < 100; i++ { + files[i] = inFile + } + stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, "", "../dmap", "--query", query, @@ -159,22 +418,16 @@ func TestDMap3(t *testing.T) { "--logger", "stdout", "--logLevel", "info", "--noColor", - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, - inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile, inFile) + "--servers", fmt.Sprintf("%s:%d", bindAddress, port), + "--trustAllHosts", + "--files", strings.Join(files, ",")) if err != nil { t.Error(err) return } waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + cancel() if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { t.Error(err) @@ -195,6 +448,19 @@ func TestDMap4Append(t *testing.T) { t.Log("Skipping") return } + + // Test in serverless mode + t.Run("Serverless", func(t *testing.T) { + testDMap4AppendServerless(t) + }) + + // Test in server mode + t.Run("ServerMode", func(t *testing.T) { + testDMap4AppendWithServer(t) + }) +} + +func testDMap4AppendServerless(t *testing.T) { inFile := "mapr_testdata.log" outFile := "dmap4.stdout.tmp" csvFile := "dmap4.csv.tmp" @@ -246,11 +512,100 @@ func TestDMap4Append(t *testing.T) { os.Remove(queryFile) } +func testDMap4AppendWithServer(t *testing.T) { + inFile := "mapr_testdata.log" + outFile := "dmap4.stdout.tmp" + csvFile := "dmap4.csv.tmp" + expectedCsvFile := "dmap4.csv.expected" + queryFile := fmt.Sprintf("%s.query", csvFile) + expectedQueryFile := "dmap4.csv.query.expected" + port := getUniquePortNumber() + bindAddress := "localhost" + + // Delete in case it exists already. Otherwise, test will fail. + os.Remove(csvFile) + + query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),"+ + "avg($goroutines),min($goroutines) group by $time order by count($time) "+ + "outfile append %s", csvFile) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start dserver + _, _, _, err := startCommand(ctx, t, + "", "../dserver", + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "error", + "--bindAddress", bindAddress, + "--port", fmt.Sprintf("%d", port), + ) + if err != nil { + t.Error(err) + return + } + + // Give server time to start + time.Sleep(500 * time.Millisecond) + + // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing + // file as we specified "outfile append". That works transparently for any mapreduce query + // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap + // command. + for i := 0; i < 2; i++ { + stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, + "", "../dmap", + "--query", query, + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "info", + "--noColor", + "--servers", fmt.Sprintf("%s:%d", bindAddress, port), + "--trustAllHosts", + "--files", inFile) + + if err != nil { + t.Error(err) + return + } + waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + } + + cancel() + + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + t.Error(err) + return + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + t.Error(err) + return + } + + os.Remove(outFile) + os.Remove(csvFile) + os.Remove(queryFile) +} + func TestDMap5CSV(t *testing.T) { if !config.Env("DTAIL_INTEGRATION_TEST_RUN_MODE") { t.Log("Skipping") return } + + // Test in serverless mode + t.Run("Serverless", func(t *testing.T) { + testDMap5CSVServerless(t) + }) + + // Test in server mode + t.Run("ServerMode", func(t *testing.T) { + testDMap5CSVWithServer(t) + }) +} + +func testDMap5CSVServerless(t *testing.T) { inFile := "dmap5.csv.in" outFile := "dmap5.stdout.tmp" csvFile := "dmap5.csv.tmp" @@ -302,3 +657,80 @@ func TestDMap5CSV(t *testing.T) { os.Remove(csvFile) os.Remove(queryFile) } + +func testDMap5CSVWithServer(t *testing.T) { + inFile := "dmap5.csv.in" + outFile := "dmap5.stdout.tmp" + csvFile := "dmap5.csv.tmp" + expectedCsvFile := "dmap5.csv.expected" + queryFile := fmt.Sprintf("%s.query", csvFile) + expectedQueryFile := "dmap5.csv.query.expected" + port := getUniquePortNumber() + bindAddress := "localhost" + + // Delete in case it exists already. Otherwise, test will fail. + os.Remove(csvFile) + + query := fmt.Sprintf("select sum($timecount),last($time),min($min_goroutines),"+ + " group by $hostname"+ + " set $timecount = `count($time)`, $time = `$time`, $min_goroutines = `min($goroutines)`"+ + " logformat csv outfile %s", csvFile) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start dserver + _, _, _, err := startCommand(ctx, t, + "", "../dserver", + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "error", + "--bindAddress", bindAddress, + "--port", fmt.Sprintf("%d", port), + ) + if err != nil { + t.Error(err) + return + } + + // Give server time to start + time.Sleep(500 * time.Millisecond) + + // Run dmap command twice, it should append in the 2nd iteration the new results to the already existing + // file as we specified "outfile append". That works transparently for any mapreduce query + // (e.g. also for the dtail command in streaming mode). But it is easier to test with the dmap + // command. + for i := 0; i < 2; i++ { + stdoutCh, stderrCh, cmdErrCh, err := startCommand(ctx, t, + "", "../dmap", + "--query", query, + "--cfg", "none", + "--logger", "stdout", + "--logLevel", "info", + "--noColor", + "--servers", fmt.Sprintf("%s:%d", bindAddress, port), + "--trustAllHosts", + "--files", inFile) + + if err != nil { + t.Error(err) + return + } + waitForCommand(ctx, t, stdoutCh, stderrCh, cmdErrCh) + } + + cancel() + + if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil { + t.Error(err) + return + } + if err := compareFiles(t, queryFile, expectedQueryFile); err != nil { + t.Error(err) + return + } + + os.Remove(outFile) + os.Remove(csvFile) + os.Remove(queryFile) +} |
