diff options
| author | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2026-03-20 11:06:50 +0200 |
| commit | 13b21feb07c86f65760f7338f284f3b492364cd9 (patch) | |
| tree | c9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /benchmarks | |
| parent | da8e581617a0240626d2bc922916416440e65bae (diff) | |
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'benchmarks')
| -rw-r--r-- | benchmarks/dmap_benchmark_test.go | 258 |
1 files changed, 165 insertions, 93 deletions
diff --git a/benchmarks/dmap_benchmark_test.go b/benchmarks/dmap_benchmark_test.go index 0ae3f62..11a1c3c 100644 --- a/benchmarks/dmap_benchmark_test.go +++ b/benchmarks/dmap_benchmark_test.go @@ -12,9 +12,9 @@ import ( func BenchmarkDMapSimpleAggregation(b *testing.B) { cleanup := SetupBenchmark(b) defer cleanup() - + sizes := GetBenchmarkSizes() - + queries := []struct { name string query string @@ -24,7 +24,7 @@ func BenchmarkDMapSimpleAggregation(b *testing.B) { {"min_max", "from STATS select min(currentConnections),max(lifetimeConnections) group by $hostname"}, {"multi", "from STATS select count($line),last($time),avg($goroutines),min(currentConnections),max(lifetimeConnections) group by $hostname"}, } - + for _, size := range sizes { for _, q := range queries { b.Run(fmt.Sprintf("Size=%s/Query=%s", size, q.name), func(b *testing.B) { @@ -35,29 +35,29 @@ func BenchmarkDMapSimpleAggregation(b *testing.B) { Compression: NoCompression, LineVariation: 50, } - + testFile := GenerateTestFile(b, config) defer os.Remove(testFile) - + fileSize, _ := GetFileSize(testFile) lineCount, _ := CountFileLines(testFile) - + // Output file outputFile := fmt.Sprintf("benchmark_%s_%s.csv.tmp", q.name, size) defer os.Remove(outputFile) - + // Build query with output file fullQuery := fmt.Sprintf("%s outfile %s", q.query, outputFile) - + // Warmup WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile) os.Remove(outputFile) - + b.ResetTimer() - + // Run benchmark totalDuration := time.Duration(0) - + for i := 0; i < b.N; i++ { result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile) if err != nil { @@ -66,15 +66,15 @@ func BenchmarkDMapSimpleAggregation(b *testing.B) { totalDuration += result.Duration os.Remove(outputFile) } - + avgDuration := totalDuration / time.Duration(b.N) throughput := CalculateThroughput(fileSize, avgDuration) recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration) - + // Report metrics b.ReportMetric(throughput, "MB/sec") b.ReportMetric(recordsPerSec, "records/sec") - + // Save result benchResult := BenchmarkResult{ Timestamp: time.Now(), @@ -95,24 +95,24 @@ func BenchmarkDMapSimpleAggregation(b *testing.B) { func BenchmarkDMapGroupByCardinality(b *testing.B) { cleanup := SetupBenchmark(b) defer cleanup() - + sizes := GetBenchmarkSizes() if IsQuickMode() { sizes = []FileSize{Small} } - + // Different group by scenarios groupBys := []struct { - name string - groupBy string + name string + groupBy string approxGroups int }{ - {"low", "$hostname", 10}, // Few unique values - {"medium", "$time", 100}, // Moderate unique values - {"high", "$goroutines", 50}, // Many unique values + {"low", "$hostname", 10}, // Few unique values + {"medium", "$time", 100}, // Moderate unique values + {"high", "$goroutines", 50}, // Many unique values {"composite", "$hostname,$goroutines", 500}, // Composite key } - + for _, size := range sizes { for _, gb := range groupBys { b.Run(fmt.Sprintf("Size=%s/GroupBy=%s", size, gb.name), func(b *testing.B) { @@ -123,30 +123,30 @@ func BenchmarkDMapGroupByCardinality(b *testing.B) { Compression: NoCompression, LineVariation: gb.approxGroups, } - + testFile := GenerateTestFile(b, config) defer os.Remove(testFile) - + fileSize, _ := GetFileSize(testFile) lineCount, _ := CountFileLines(testFile) - + // Output file outputFile := fmt.Sprintf("benchmark_groupby_%s_%s.csv.tmp", gb.name, size) defer os.Remove(outputFile) - + // Build query - query := fmt.Sprintf("from STATS select count($line),avg($goroutines) group by %s outfile %s", + query := fmt.Sprintf("from STATS select count($line),avg($goroutines) group by %s outfile %s", gb.groupBy, outputFile) - + // Warmup WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile) os.Remove(outputFile) - + b.ResetTimer() - + // Run benchmark totalDuration := time.Duration(0) - + for i := 0; i < b.N; i++ { result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile) if err != nil { @@ -155,16 +155,16 @@ func BenchmarkDMapGroupByCardinality(b *testing.B) { totalDuration += result.Duration os.Remove(outputFile) } - + avgDuration := totalDuration / time.Duration(b.N) throughput := CalculateThroughput(fileSize, avgDuration) recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration) - + // Report metrics b.ReportMetric(throughput, "MB/sec") b.ReportMetric(recordsPerSec, "records/sec") b.ReportMetric(float64(gb.approxGroups), "approx_groups") - + // Save result benchResult := BenchmarkResult{ Timestamp: time.Now(), @@ -185,12 +185,12 @@ func BenchmarkDMapGroupByCardinality(b *testing.B) { func BenchmarkDMapComplexQueries(b *testing.B) { cleanup := SetupBenchmark(b) defer cleanup() - + sizes := GetBenchmarkSizes() if IsQuickMode() { sizes = []FileSize{Small} } - + queries := []struct { name string query string @@ -200,7 +200,7 @@ func BenchmarkDMapComplexQueries(b *testing.B) { {"time_filter", "from STATS select count($line),avg($goroutines) group by $hostname where $time >= \"1002-071200\" and $time <= \"1002-071300\""}, {"order_limit", "from STATS select $hostname,count($line),avg($goroutines) group by $hostname order by count($line) desc limit 10"}, } - + for _, size := range sizes { for _, q := range queries { b.Run(fmt.Sprintf("Size=%s/Query=%s", size, q.name), func(b *testing.B) { @@ -211,29 +211,29 @@ func BenchmarkDMapComplexQueries(b *testing.B) { Compression: NoCompression, LineVariation: 50, } - + testFile := GenerateTestFile(b, config) defer os.Remove(testFile) - + fileSize, _ := GetFileSize(testFile) lineCount, _ := CountFileLines(testFile) - + // Output file outputFile := fmt.Sprintf("benchmark_complex_%s_%s.csv.tmp", q.name, size) defer os.Remove(outputFile) - + // Build query with output file fullQuery := fmt.Sprintf("%s outfile %s", q.query, outputFile) - + // Warmup WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile) os.Remove(outputFile) - + b.ResetTimer() - + // Run benchmark totalDuration := time.Duration(0) - + for i := 0; i < b.N; i++ { result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile) if err != nil { @@ -242,15 +242,15 @@ func BenchmarkDMapComplexQueries(b *testing.B) { totalDuration += result.Duration os.Remove(outputFile) } - + avgDuration := totalDuration / time.Duration(b.N) throughput := CalculateThroughput(fileSize, avgDuration) recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration) - + // Report metrics b.ReportMetric(throughput, "MB/sec") b.ReportMetric(recordsPerSec, "records/sec") - + // Save result benchResult := BenchmarkResult{ Timestamp: time.Now(), @@ -267,16 +267,88 @@ func BenchmarkDMapComplexQueries(b *testing.B) { } } +// BenchmarkDMapSetClauseHighCardinality benchmarks set-clause queries that +// create many groups and stress the aggregation hot path. +func BenchmarkDMapSetClauseHighCardinality(b *testing.B) { + cleanup := SetupBenchmark(b) + defer cleanup() + + sizes := GetBenchmarkSizes() + if IsQuickMode() { + sizes = []FileSize{Small} + } + + query := "from STATS select count($line),avg($goroutines) " + + "group by $masked,$goroutines " + + "set $masked = maskdigits($time)" + + for _, size := range sizes { + b.Run(fmt.Sprintf("Size=%s", size), func(b *testing.B) { + config := TestDataConfig{ + Size: size, + Format: MapReduceLogFormat, + Compression: NoCompression, + LineVariation: 1000, + } + + testFile := GenerateTestFile(b, config) + defer os.Remove(testFile) + + fileSize, _ := GetFileSize(testFile) + lineCount, _ := CountFileLines(testFile) + + outputFile := fmt.Sprintf("benchmark_set_high_cardinality_%s.csv.tmp", size) + defer os.Remove(outputFile) + + fullQuery := fmt.Sprintf("%s outfile %s", query, outputFile) + + WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile) + os.Remove(outputFile) + + b.ResetTimer() + + totalDuration := time.Duration(0) + for i := 0; i < b.N; i++ { + result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile) + if err != nil { + b.Fatalf("Command failed: %v", err) + } + totalDuration += result.Duration + os.Remove(outputFile) + } + + avgDuration := totalDuration / time.Duration(b.N) + throughput := CalculateThroughput(fileSize, avgDuration) + recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration) + + b.ReportMetric(throughput, "MB/sec") + b.ReportMetric(recordsPerSec, "records/sec") + b.ReportMetric(1000, "approx_groups") + + benchResult := BenchmarkResult{ + Timestamp: time.Now(), + Tool: "dmap", + Operation: fmt.Sprintf("SetClauseHighCardinality_%s", size), + FileSize: fileSize, + Duration: avgDuration, + Throughput: throughput, + LinesPerSec: recordsPerSec, + } + SaveResults([]BenchmarkResult{benchResult}) + }) + } +} + // BenchmarkDMapTimeInterval benchmarks time-based interval queries func BenchmarkDMapTimeInterval(b *testing.B) { cleanup := SetupBenchmark(b) defer cleanup() - + sizes := GetBenchmarkSizes() if IsQuickMode() { sizes = []FileSize{Small} } - + intervals := []struct { name string interval int @@ -285,7 +357,7 @@ func BenchmarkDMapTimeInterval(b *testing.B) { {"10s", 10}, {"60s", 60}, } - + for _, size := range sizes { for _, interval := range intervals { b.Run(fmt.Sprintf("Size=%s/Interval=%s", size, interval.name), func(b *testing.B) { @@ -296,30 +368,30 @@ func BenchmarkDMapTimeInterval(b *testing.B) { Compression: NoCompression, LineVariation: 50, } - + testFile := GenerateTestFile(b, config) defer os.Remove(testFile) - + fileSize, _ := GetFileSize(testFile) lineCount, _ := CountFileLines(testFile) - + // Output file outputFile := fmt.Sprintf("benchmark_interval_%s_%s.csv.tmp", interval.name, size) defer os.Remove(outputFile) - + // Build query - query := fmt.Sprintf("from STATS select count($line),avg($goroutines) group by $hostname interval %d outfile %s", + query := fmt.Sprintf("from STATS select count($line),avg($goroutines) group by $hostname interval %d outfile %s", interval.interval, outputFile) - + // Warmup WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile) os.Remove(outputFile) - + b.ResetTimer() - + // Run benchmark totalDuration := time.Duration(0) - + for i := 0; i < b.N; i++ { result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile) if err != nil { @@ -328,16 +400,16 @@ func BenchmarkDMapTimeInterval(b *testing.B) { totalDuration += result.Duration os.Remove(outputFile) } - + avgDuration := totalDuration / time.Duration(b.N) throughput := CalculateThroughput(fileSize, avgDuration) recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration) - + // Report metrics b.ReportMetric(throughput, "MB/sec") b.ReportMetric(recordsPerSec, "records/sec") b.ReportMetric(float64(interval.interval), "interval_seconds") - + // Save result benchResult := BenchmarkResult{ Timestamp: time.Now(), @@ -358,7 +430,7 @@ func BenchmarkDMapTimeInterval(b *testing.B) { func BenchmarkDMapCompressed(b *testing.B) { cleanup := SetupBenchmark(b) defer cleanup() - + compressions := []struct { name string typ CompressionType @@ -367,12 +439,12 @@ func BenchmarkDMapCompressed(b *testing.B) { {"gzip", GzipCompression}, {"zstd", ZstdCompression}, } - + sizes := GetBenchmarkSizes() if IsQuickMode() { sizes = []FileSize{Small} } - + for _, size := range sizes { for _, comp := range compressions { b.Run(fmt.Sprintf("Size=%s/Compression=%s", size, comp.name), func(b *testing.B) { @@ -383,34 +455,34 @@ func BenchmarkDMapCompressed(b *testing.B) { Compression: comp.typ, LineVariation: 50, } - + testFile := GenerateTestFile(b, config) defer os.Remove(testFile) - + // Get uncompressed size for throughput calculation uncompressedSize := int64(size) compressedSize, _ := GetFileSize(testFile) compressionRatio := float64(uncompressedSize) / float64(compressedSize) - + // Estimate line count approxLineCount := int(size) / 150 - + // Output file outputFile := fmt.Sprintf("benchmark_compressed_%s_%s.csv.tmp", comp.name, size) defer os.Remove(outputFile) - + // Query query := fmt.Sprintf("from STATS select count($line),avg($goroutines) group by $hostname outfile %s", outputFile) - + // Warmup WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile) os.Remove(outputFile) - + b.ResetTimer() - + // Run benchmark totalDuration := time.Duration(0) - + for i := 0; i < b.N; i++ { result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile) if err != nil { @@ -419,16 +491,16 @@ func BenchmarkDMapCompressed(b *testing.B) { totalDuration += result.Duration os.Remove(outputFile) } - + avgDuration := totalDuration / time.Duration(b.N) throughput := CalculateThroughput(uncompressedSize, avgDuration) recordsPerSec := CalculateLinesPerSecond(approxLineCount, avgDuration) - + // Report metrics b.ReportMetric(throughput, "MB/sec") b.ReportMetric(recordsPerSec, "records/sec") b.ReportMetric(compressionRatio, "compression_ratio") - + // Save result benchResult := BenchmarkResult{ Timestamp: time.Now(), @@ -449,12 +521,12 @@ func BenchmarkDMapCompressed(b *testing.B) { func BenchmarkDMapCustomFunctions(b *testing.B) { cleanup := SetupBenchmark(b) defer cleanup() - + sizes := GetBenchmarkSizes() if IsQuickMode() { sizes = []FileSize{Small} } - + queries := []struct { name string query string @@ -463,7 +535,7 @@ func BenchmarkDMapCustomFunctions(b *testing.B) { {"md5sum", "from STATS select $hash,count($line) set $hash = md5sum($hostname) group by $hash"}, {"multi_set", "from STATS select $mask,$md5,count($line) set $mask = maskdigits($time), $md5 = md5sum($hostname) group by $hostname"}, } - + for _, size := range sizes { for _, q := range queries { b.Run(fmt.Sprintf("Size=%s/Function=%s", size, q.name), func(b *testing.B) { @@ -474,29 +546,29 @@ func BenchmarkDMapCustomFunctions(b *testing.B) { Compression: NoCompression, LineVariation: 50, } - + testFile := GenerateTestFile(b, config) defer os.Remove(testFile) - + fileSize, _ := GetFileSize(testFile) lineCount, _ := CountFileLines(testFile) - + // Output file outputFile := fmt.Sprintf("benchmark_func_%s_%s.csv.tmp", q.name, size) defer os.Remove(outputFile) - + // Build query with output file fullQuery := fmt.Sprintf("%s outfile %s", q.query, outputFile) - + // Warmup WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile) os.Remove(outputFile) - + b.ResetTimer() - + // Run benchmark totalDuration := time.Duration(0) - + for i := 0; i < b.N; i++ { result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile) if err != nil && !strings.Contains(err.Error(), "exit status") { @@ -507,15 +579,15 @@ func BenchmarkDMapCustomFunctions(b *testing.B) { totalDuration += result.Duration os.Remove(outputFile) } - + avgDuration := totalDuration / time.Duration(b.N) throughput := CalculateThroughput(fileSize, avgDuration) recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration) - + // Report metrics b.ReportMetric(throughput, "MB/sec") b.ReportMetric(recordsPerSec, "records/sec") - + // Save result benchResult := BenchmarkResult{ Timestamp: time.Now(), @@ -530,4 +602,4 @@ func BenchmarkDMapCustomFunctions(b *testing.B) { }) } } -}
\ No newline at end of file +} |
