summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2021-10-03 13:32:20 +0300
committerPaul Buetow <paul@buetow.org>2021-10-03 13:39:03 +0300
commitb2dbe133347ef220ff781ffeb1f8137245f5235f (patch)
treed2ff5f5cc0b02159a14f51b1850e4427877ef9a7
parent07e1470892beacf0722276f94bacbd822b002540 (diff)
when a mapreduce outfile is specified also always write a outfile.query file
-rw-r--r--docker/Makefile13
-rw-r--r--integrationtests/commons.go3
-rw-r--r--integrationtests/dmap.csv.query.expected1
-rw-r--r--integrationtests/dmap2.csv.query.expected1
-rw-r--r--integrationtests/dmap_test.go16
-rw-r--r--internal/mapr/groupset.go36
6 files changed, 53 insertions, 17 deletions
diff --git a/docker/Makefile b/docker/Makefile
index 921ad28..4ffa423 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -6,10 +6,13 @@ build:
cp ../dserver .
docker build . -t dserver:develop
rm ./dserver
+ rm ./mapr_testdata.log
spinup:
./spinup.sh 10
spindown:
./spindown.sh 10
+spinup1:
+ docker run -p 2222:2222 dserver:develop
dtail:
../dtail --servers serverlist.txt --files '/var/log/dserver/*' --trustAllHosts --logLevel DEBUG
dtail2:
@@ -20,20 +23,16 @@ dcat:
../dcat --servers serverlist.txt --files '/etc/passwd' --trustAllHosts
dcat_notrust:
../dcat --servers serverlist.txt --files '/etc/passwd'
-dcat2:
- # TODO: All serverless tests in this Makefile have to move to actual unit tests
- ../dcat /etc/passwd
dmap:
../dmap --servers serverlist.txt --files '/var/log/dserver/*' --trustAllHosts --query 'from stats select avg($$goroutines),max($$goroutines),min($$goroutines),last($$goroutines),count($$hostname),$$hostname group by $$hostname order by avg($$goroutines)'
-dmap2:
+test: dmap_test dmap2_test
+dmap_test:
../dmap --servers serverlist.txt --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-A.csv'
../dmap --servers serverlist.txt --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-B.csv'
@echo Expecting zero diff!
diff -u <(sort dmap2-A.csv) <(sort dmap2-B.csv)
-dmap3:
+dmap2_test:
../dmap --servers <(head -n 1 serverlist.txt) --files '/var/log/mapr_testdata.log' --trustAllHosts --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-A.csv'
../dmap --query 'from stats select count($$time),last($$time) group by $$time order by count($$time) outfile dmap2-serverless.csv' ./mapr_testdata.log
@echo Expecting zero diff!
diff -u <(sort dmap2-A.csv) <(sort dmap2-serverless.csv)
-spinup1:
- docker run -p 2222:2222 dserver:develop
diff --git a/integrationtests/commons.go b/integrationtests/commons.go
index 74eeac5..f789322 100644
--- a/integrationtests/commons.go
+++ b/integrationtests/commons.go
@@ -78,9 +78,12 @@ func compareFilesContents(t *testing.T, fileA, fileB string) error {
return err
}
+ // The mapreduce result can be in a different order each time (Golang maps are not sorted).
+ t.Log(fmt.Sprintf("Checking whether %s has same lines as file %s (ignoring line order)", fileA, fileB))
if err := compareMaps(a, b); err != nil {
return err
}
+ t.Log(fmt.Sprintf("Checking whether %s has same lines as file %s (ignoring line order)", fileB, fileA))
if err := compareMaps(b, a); err != nil {
return err
}
diff --git a/integrationtests/dmap.csv.query.expected b/integrationtests/dmap.csv.query.expected
new file mode 100644
index 0000000..2bb2a52
--- /dev/null
+++ b/integrationtests/dmap.csv.query.expected
@@ -0,0 +1 @@
+from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname outfile dmap.csv.tmp \ No newline at end of file
diff --git a/integrationtests/dmap2.csv.query.expected b/integrationtests/dmap2.csv.query.expected
new file mode 100644
index 0000000..b15de3a
--- /dev/null
+++ b/integrationtests/dmap2.csv.query.expected
@@ -0,0 +1 @@
+from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile dmap2.csv.tmp \ No newline at end of file
diff --git a/integrationtests/dmap_test.go b/integrationtests/dmap_test.go
index bfca039..dc508e2 100644
--- a/integrationtests/dmap_test.go
+++ b/integrationtests/dmap_test.go
@@ -11,6 +11,8 @@ func TestDMap(t *testing.T) {
stdoutFile := "dmap.stdout.tmp"
csvFile := "dmap.csv.tmp"
expectedCsvFile := "dmap.csv.expected"
+ queryFile := fmt.Sprintf("%s.query", csvFile)
+ expectedQueryFile := "dmap.csv.query.expected"
query := fmt.Sprintf("from STATS select count($line),last($time),avg($goroutines),min(concurrentConnections),max(lifetimeConnections) group by $hostname outfile %s", csvFile)
@@ -18,14 +20,18 @@ func TestDMap(t *testing.T) {
t.Error(err)
return
}
-
if err := compareFiles(t, csvFile, expectedCsvFile); err != nil {
t.Error(err)
return
}
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ t.Error(err)
+ return
+ }
os.Remove(stdoutFile)
os.Remove(csvFile)
+ os.Remove(queryFile)
}
func TestDMap2(t *testing.T) {
@@ -33,6 +39,8 @@ func TestDMap2(t *testing.T) {
stdoutFile := "dmap2.stdout.tmp"
csvFile := "dmap2.csv.tmp"
expectedCsvFile := "dmap2.csv.expected"
+ queryFile := fmt.Sprintf("%s.query", csvFile)
+ expectedQueryFile := "dmap2.csv.query.expected"
query := fmt.Sprintf("from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) outfile %s", csvFile)
@@ -40,12 +48,16 @@ func TestDMap2(t *testing.T) {
t.Error(err)
return
}
-
if err := compareFilesContents(t, csvFile, expectedCsvFile); err != nil {
t.Error(err)
return
}
+ if err := compareFiles(t, queryFile, expectedQueryFile); err != nil {
+ t.Error(err)
+ return
+ }
os.Remove(stdoutFile)
os.Remove(csvFile)
+ os.Remove(queryFile)
}
diff --git a/internal/mapr/groupset.go b/internal/mapr/groupset.go
index df8c603..ce7630d 100644
--- a/internal/mapr/groupset.go
+++ b/internal/mapr/groupset.go
@@ -178,11 +178,31 @@ func (g *GroupSet) Result(query *Query, rowsLimit int) (string, int, error) {
return sb.String(), len(rows), nil
}
+func (*GroupSet) writeQueryFile(query *Query) error {
+ queryFile := fmt.Sprintf("%s.query", query.Outfile)
+ tmpQueryFile := fmt.Sprintf("%s.tmp", queryFile)
+ dlog.Common.Debug("Writing query file", queryFile)
+
+ fd, err := os.Create(tmpQueryFile)
+ if err != nil {
+ return err
+ }
+ defer fd.Close()
+
+ fd.WriteString(query.RawQuery)
+ os.Rename(tmpQueryFile, queryFile)
+
+ return nil
+}
+
// WriteResult writes the result to an CSV outfile.
func (g *GroupSet) WriteResult(query *Query) error {
if !query.HasOutfile() {
return errors.New("No outfile specified")
}
+ if err := g.writeQueryFile(query); err != nil {
+ return err
+ }
rows, _, err := g.result(query, false)
if err != nil {
@@ -192,22 +212,22 @@ func (g *GroupSet) WriteResult(query *Query) error {
dlog.Common.Info("Writing outfile", query.Outfile)
tmpOutfile := fmt.Sprintf("%s.tmp", query.Outfile)
- file, err := os.Create(tmpOutfile)
+ fd, err := os.Create(tmpOutfile)
if err != nil {
return err
}
- defer file.Close()
+ defer fd.Close()
// Generate header now
lastIndex := len(query.Select) - 1
for i, sc := range query.Select {
- file.WriteString(sc.FieldStorage)
+ fd.WriteString(sc.FieldStorage)
if i == lastIndex {
continue
}
- file.WriteString(protocol.CSVDelimiter)
+ fd.WriteString(protocol.CSVDelimiter)
}
- file.WriteString("\n")
+ fd.WriteString("\n")
// And now write the data
for i, r := range rows {
@@ -215,13 +235,13 @@ func (g *GroupSet) WriteResult(query *Query) error {
break
}
for j, value := range r.values {
- file.WriteString(value)
+ fd.WriteString(value)
if j == lastIndex {
continue
}
- file.WriteString(protocol.CSVDelimiter)
+ fd.WriteString(protocol.CSVDelimiter)
}
- file.WriteString("\n")
+ fd.WriteString("\n")
}
if err := os.Rename(tmpOutfile, query.Outfile); err != nil {