summaryrefslogtreecommitdiff
path: root/benchmarks
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-03-20 11:06:50 +0200
committerPaul Buetow <paul@buetow.org>2026-03-20 11:06:50 +0200
commit13b21feb07c86f65760f7338f284f3b492364cd9 (patch)
treec9fa6fc4fb0c7fe8b927297d26e5f3b1448a3518 /benchmarks
parentda8e581617a0240626d2bc922916416440e65bae (diff)
Optimize mapr parsing and stabilize aggregate shutdown
Diffstat (limited to 'benchmarks')
-rw-r--r--benchmarks/dmap_benchmark_test.go258
1 files changed, 165 insertions, 93 deletions
diff --git a/benchmarks/dmap_benchmark_test.go b/benchmarks/dmap_benchmark_test.go
index 0ae3f62..11a1c3c 100644
--- a/benchmarks/dmap_benchmark_test.go
+++ b/benchmarks/dmap_benchmark_test.go
@@ -12,9 +12,9 @@ import (
func BenchmarkDMapSimpleAggregation(b *testing.B) {
cleanup := SetupBenchmark(b)
defer cleanup()
-
+
sizes := GetBenchmarkSizes()
-
+
queries := []struct {
name string
query string
@@ -24,7 +24,7 @@ func BenchmarkDMapSimpleAggregation(b *testing.B) {
{"min_max", "from STATS select min(currentConnections),max(lifetimeConnections) group by $hostname"},
{"multi", "from STATS select count($line),last($time),avg($goroutines),min(currentConnections),max(lifetimeConnections) group by $hostname"},
}
-
+
for _, size := range sizes {
for _, q := range queries {
b.Run(fmt.Sprintf("Size=%s/Query=%s", size, q.name), func(b *testing.B) {
@@ -35,29 +35,29 @@ func BenchmarkDMapSimpleAggregation(b *testing.B) {
Compression: NoCompression,
LineVariation: 50,
}
-
+
testFile := GenerateTestFile(b, config)
defer os.Remove(testFile)
-
+
fileSize, _ := GetFileSize(testFile)
lineCount, _ := CountFileLines(testFile)
-
+
// Output file
outputFile := fmt.Sprintf("benchmark_%s_%s.csv.tmp", q.name, size)
defer os.Remove(outputFile)
-
+
// Build query with output file
fullQuery := fmt.Sprintf("%s outfile %s", q.query, outputFile)
-
+
// Warmup
WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile)
os.Remove(outputFile)
-
+
b.ResetTimer()
-
+
// Run benchmark
totalDuration := time.Duration(0)
-
+
for i := 0; i < b.N; i++ {
result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile)
if err != nil {
@@ -66,15 +66,15 @@ func BenchmarkDMapSimpleAggregation(b *testing.B) {
totalDuration += result.Duration
os.Remove(outputFile)
}
-
+
avgDuration := totalDuration / time.Duration(b.N)
throughput := CalculateThroughput(fileSize, avgDuration)
recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration)
-
+
// Report metrics
b.ReportMetric(throughput, "MB/sec")
b.ReportMetric(recordsPerSec, "records/sec")
-
+
// Save result
benchResult := BenchmarkResult{
Timestamp: time.Now(),
@@ -95,24 +95,24 @@ func BenchmarkDMapSimpleAggregation(b *testing.B) {
func BenchmarkDMapGroupByCardinality(b *testing.B) {
cleanup := SetupBenchmark(b)
defer cleanup()
-
+
sizes := GetBenchmarkSizes()
if IsQuickMode() {
sizes = []FileSize{Small}
}
-
+
// Different group by scenarios
groupBys := []struct {
- name string
- groupBy string
+ name string
+ groupBy string
approxGroups int
}{
- {"low", "$hostname", 10}, // Few unique values
- {"medium", "$time", 100}, // Moderate unique values
- {"high", "$goroutines", 50}, // Many unique values
+ {"low", "$hostname", 10}, // Few unique values
+ {"medium", "$time", 100}, // Moderate unique values
+ {"high", "$goroutines", 50}, // Many unique values
{"composite", "$hostname,$goroutines", 500}, // Composite key
}
-
+
for _, size := range sizes {
for _, gb := range groupBys {
b.Run(fmt.Sprintf("Size=%s/GroupBy=%s", size, gb.name), func(b *testing.B) {
@@ -123,30 +123,30 @@ func BenchmarkDMapGroupByCardinality(b *testing.B) {
Compression: NoCompression,
LineVariation: gb.approxGroups,
}
-
+
testFile := GenerateTestFile(b, config)
defer os.Remove(testFile)
-
+
fileSize, _ := GetFileSize(testFile)
lineCount, _ := CountFileLines(testFile)
-
+
// Output file
outputFile := fmt.Sprintf("benchmark_groupby_%s_%s.csv.tmp", gb.name, size)
defer os.Remove(outputFile)
-
+
// Build query
- query := fmt.Sprintf("from STATS select count($line),avg($goroutines) group by %s outfile %s",
+ query := fmt.Sprintf("from STATS select count($line),avg($goroutines) group by %s outfile %s",
gb.groupBy, outputFile)
-
+
// Warmup
WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile)
os.Remove(outputFile)
-
+
b.ResetTimer()
-
+
// Run benchmark
totalDuration := time.Duration(0)
-
+
for i := 0; i < b.N; i++ {
result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile)
if err != nil {
@@ -155,16 +155,16 @@ func BenchmarkDMapGroupByCardinality(b *testing.B) {
totalDuration += result.Duration
os.Remove(outputFile)
}
-
+
avgDuration := totalDuration / time.Duration(b.N)
throughput := CalculateThroughput(fileSize, avgDuration)
recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration)
-
+
// Report metrics
b.ReportMetric(throughput, "MB/sec")
b.ReportMetric(recordsPerSec, "records/sec")
b.ReportMetric(float64(gb.approxGroups), "approx_groups")
-
+
// Save result
benchResult := BenchmarkResult{
Timestamp: time.Now(),
@@ -185,12 +185,12 @@ func BenchmarkDMapGroupByCardinality(b *testing.B) {
func BenchmarkDMapComplexQueries(b *testing.B) {
cleanup := SetupBenchmark(b)
defer cleanup()
-
+
sizes := GetBenchmarkSizes()
if IsQuickMode() {
sizes = []FileSize{Small}
}
-
+
queries := []struct {
name string
query string
@@ -200,7 +200,7 @@ func BenchmarkDMapComplexQueries(b *testing.B) {
{"time_filter", "from STATS select count($line),avg($goroutines) group by $hostname where $time >= \"1002-071200\" and $time <= \"1002-071300\""},
{"order_limit", "from STATS select $hostname,count($line),avg($goroutines) group by $hostname order by count($line) desc limit 10"},
}
-
+
for _, size := range sizes {
for _, q := range queries {
b.Run(fmt.Sprintf("Size=%s/Query=%s", size, q.name), func(b *testing.B) {
@@ -211,29 +211,29 @@ func BenchmarkDMapComplexQueries(b *testing.B) {
Compression: NoCompression,
LineVariation: 50,
}
-
+
testFile := GenerateTestFile(b, config)
defer os.Remove(testFile)
-
+
fileSize, _ := GetFileSize(testFile)
lineCount, _ := CountFileLines(testFile)
-
+
// Output file
outputFile := fmt.Sprintf("benchmark_complex_%s_%s.csv.tmp", q.name, size)
defer os.Remove(outputFile)
-
+
// Build query with output file
fullQuery := fmt.Sprintf("%s outfile %s", q.query, outputFile)
-
+
// Warmup
WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile)
os.Remove(outputFile)
-
+
b.ResetTimer()
-
+
// Run benchmark
totalDuration := time.Duration(0)
-
+
for i := 0; i < b.N; i++ {
result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile)
if err != nil {
@@ -242,15 +242,15 @@ func BenchmarkDMapComplexQueries(b *testing.B) {
totalDuration += result.Duration
os.Remove(outputFile)
}
-
+
avgDuration := totalDuration / time.Duration(b.N)
throughput := CalculateThroughput(fileSize, avgDuration)
recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration)
-
+
// Report metrics
b.ReportMetric(throughput, "MB/sec")
b.ReportMetric(recordsPerSec, "records/sec")
-
+
// Save result
benchResult := BenchmarkResult{
Timestamp: time.Now(),
@@ -267,16 +267,88 @@ func BenchmarkDMapComplexQueries(b *testing.B) {
}
}
+// BenchmarkDMapSetClauseHighCardinality benchmarks set-clause queries that
+// create many groups and stress the aggregation hot path.
+func BenchmarkDMapSetClauseHighCardinality(b *testing.B) {
+ cleanup := SetupBenchmark(b)
+ defer cleanup()
+
+ sizes := GetBenchmarkSizes()
+ if IsQuickMode() {
+ sizes = []FileSize{Small}
+ }
+
+ query := "from STATS select count($line),avg($goroutines) " +
+ "group by $masked,$goroutines " +
+ "set $masked = maskdigits($time)"
+
+ for _, size := range sizes {
+ b.Run(fmt.Sprintf("Size=%s", size), func(b *testing.B) {
+ config := TestDataConfig{
+ Size: size,
+ Format: MapReduceLogFormat,
+ Compression: NoCompression,
+ LineVariation: 1000,
+ }
+
+ testFile := GenerateTestFile(b, config)
+ defer os.Remove(testFile)
+
+ fileSize, _ := GetFileSize(testFile)
+ lineCount, _ := CountFileLines(testFile)
+
+ outputFile := fmt.Sprintf("benchmark_set_high_cardinality_%s.csv.tmp", size)
+ defer os.Remove(outputFile)
+
+ fullQuery := fmt.Sprintf("%s outfile %s", query, outputFile)
+
+ WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile)
+ os.Remove(outputFile)
+
+ b.ResetTimer()
+
+ totalDuration := time.Duration(0)
+ for i := 0; i < b.N; i++ {
+ result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile)
+ if err != nil {
+ b.Fatalf("Command failed: %v", err)
+ }
+ totalDuration += result.Duration
+ os.Remove(outputFile)
+ }
+
+ avgDuration := totalDuration / time.Duration(b.N)
+ throughput := CalculateThroughput(fileSize, avgDuration)
+ recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration)
+
+ b.ReportMetric(throughput, "MB/sec")
+ b.ReportMetric(recordsPerSec, "records/sec")
+ b.ReportMetric(1000, "approx_groups")
+
+ benchResult := BenchmarkResult{
+ Timestamp: time.Now(),
+ Tool: "dmap",
+ Operation: fmt.Sprintf("SetClauseHighCardinality_%s", size),
+ FileSize: fileSize,
+ Duration: avgDuration,
+ Throughput: throughput,
+ LinesPerSec: recordsPerSec,
+ }
+ SaveResults([]BenchmarkResult{benchResult})
+ })
+ }
+}
+
// BenchmarkDMapTimeInterval benchmarks time-based interval queries
func BenchmarkDMapTimeInterval(b *testing.B) {
cleanup := SetupBenchmark(b)
defer cleanup()
-
+
sizes := GetBenchmarkSizes()
if IsQuickMode() {
sizes = []FileSize{Small}
}
-
+
intervals := []struct {
name string
interval int
@@ -285,7 +357,7 @@ func BenchmarkDMapTimeInterval(b *testing.B) {
{"10s", 10},
{"60s", 60},
}
-
+
for _, size := range sizes {
for _, interval := range intervals {
b.Run(fmt.Sprintf("Size=%s/Interval=%s", size, interval.name), func(b *testing.B) {
@@ -296,30 +368,30 @@ func BenchmarkDMapTimeInterval(b *testing.B) {
Compression: NoCompression,
LineVariation: 50,
}
-
+
testFile := GenerateTestFile(b, config)
defer os.Remove(testFile)
-
+
fileSize, _ := GetFileSize(testFile)
lineCount, _ := CountFileLines(testFile)
-
+
// Output file
outputFile := fmt.Sprintf("benchmark_interval_%s_%s.csv.tmp", interval.name, size)
defer os.Remove(outputFile)
-
+
// Build query
- query := fmt.Sprintf("from STATS select count($line),avg($goroutines) group by $hostname interval %d outfile %s",
+ query := fmt.Sprintf("from STATS select count($line),avg($goroutines) group by $hostname interval %d outfile %s",
interval.interval, outputFile)
-
+
// Warmup
WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile)
os.Remove(outputFile)
-
+
b.ResetTimer()
-
+
// Run benchmark
totalDuration := time.Duration(0)
-
+
for i := 0; i < b.N; i++ {
result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile)
if err != nil {
@@ -328,16 +400,16 @@ func BenchmarkDMapTimeInterval(b *testing.B) {
totalDuration += result.Duration
os.Remove(outputFile)
}
-
+
avgDuration := totalDuration / time.Duration(b.N)
throughput := CalculateThroughput(fileSize, avgDuration)
recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration)
-
+
// Report metrics
b.ReportMetric(throughput, "MB/sec")
b.ReportMetric(recordsPerSec, "records/sec")
b.ReportMetric(float64(interval.interval), "interval_seconds")
-
+
// Save result
benchResult := BenchmarkResult{
Timestamp: time.Now(),
@@ -358,7 +430,7 @@ func BenchmarkDMapTimeInterval(b *testing.B) {
func BenchmarkDMapCompressed(b *testing.B) {
cleanup := SetupBenchmark(b)
defer cleanup()
-
+
compressions := []struct {
name string
typ CompressionType
@@ -367,12 +439,12 @@ func BenchmarkDMapCompressed(b *testing.B) {
{"gzip", GzipCompression},
{"zstd", ZstdCompression},
}
-
+
sizes := GetBenchmarkSizes()
if IsQuickMode() {
sizes = []FileSize{Small}
}
-
+
for _, size := range sizes {
for _, comp := range compressions {
b.Run(fmt.Sprintf("Size=%s/Compression=%s", size, comp.name), func(b *testing.B) {
@@ -383,34 +455,34 @@ func BenchmarkDMapCompressed(b *testing.B) {
Compression: comp.typ,
LineVariation: 50,
}
-
+
testFile := GenerateTestFile(b, config)
defer os.Remove(testFile)
-
+
// Get uncompressed size for throughput calculation
uncompressedSize := int64(size)
compressedSize, _ := GetFileSize(testFile)
compressionRatio := float64(uncompressedSize) / float64(compressedSize)
-
+
// Estimate line count
approxLineCount := int(size) / 150
-
+
// Output file
outputFile := fmt.Sprintf("benchmark_compressed_%s_%s.csv.tmp", comp.name, size)
defer os.Remove(outputFile)
-
+
// Query
query := fmt.Sprintf("from STATS select count($line),avg($goroutines) group by $hostname outfile %s", outputFile)
-
+
// Warmup
WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile)
os.Remove(outputFile)
-
+
b.ResetTimer()
-
+
// Run benchmark
totalDuration := time.Duration(0)
-
+
for i := 0; i < b.N; i++ {
result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", query, testFile)
if err != nil {
@@ -419,16 +491,16 @@ func BenchmarkDMapCompressed(b *testing.B) {
totalDuration += result.Duration
os.Remove(outputFile)
}
-
+
avgDuration := totalDuration / time.Duration(b.N)
throughput := CalculateThroughput(uncompressedSize, avgDuration)
recordsPerSec := CalculateLinesPerSecond(approxLineCount, avgDuration)
-
+
// Report metrics
b.ReportMetric(throughput, "MB/sec")
b.ReportMetric(recordsPerSec, "records/sec")
b.ReportMetric(compressionRatio, "compression_ratio")
-
+
// Save result
benchResult := BenchmarkResult{
Timestamp: time.Now(),
@@ -449,12 +521,12 @@ func BenchmarkDMapCompressed(b *testing.B) {
func BenchmarkDMapCustomFunctions(b *testing.B) {
cleanup := SetupBenchmark(b)
defer cleanup()
-
+
sizes := GetBenchmarkSizes()
if IsQuickMode() {
sizes = []FileSize{Small}
}
-
+
queries := []struct {
name string
query string
@@ -463,7 +535,7 @@ func BenchmarkDMapCustomFunctions(b *testing.B) {
{"md5sum", "from STATS select $hash,count($line) set $hash = md5sum($hostname) group by $hash"},
{"multi_set", "from STATS select $mask,$md5,count($line) set $mask = maskdigits($time), $md5 = md5sum($hostname) group by $hostname"},
}
-
+
for _, size := range sizes {
for _, q := range queries {
b.Run(fmt.Sprintf("Size=%s/Function=%s", size, q.name), func(b *testing.B) {
@@ -474,29 +546,29 @@ func BenchmarkDMapCustomFunctions(b *testing.B) {
Compression: NoCompression,
LineVariation: 50,
}
-
+
testFile := GenerateTestFile(b, config)
defer os.Remove(testFile)
-
+
fileSize, _ := GetFileSize(testFile)
lineCount, _ := CountFileLines(testFile)
-
+
// Output file
outputFile := fmt.Sprintf("benchmark_func_%s_%s.csv.tmp", q.name, size)
defer os.Remove(outputFile)
-
+
// Build query with output file
fullQuery := fmt.Sprintf("%s outfile %s", q.query, outputFile)
-
+
// Warmup
WarmupCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile)
os.Remove(outputFile)
-
+
b.ResetTimer()
-
+
// Run benchmark
totalDuration := time.Duration(0)
-
+
for i := 0; i < b.N; i++ {
result, err := RunBenchmarkCommand(b, "dmap", "--cfg", "none", "--noColor", "--query", fullQuery, testFile)
if err != nil && !strings.Contains(err.Error(), "exit status") {
@@ -507,15 +579,15 @@ func BenchmarkDMapCustomFunctions(b *testing.B) {
totalDuration += result.Duration
os.Remove(outputFile)
}
-
+
avgDuration := totalDuration / time.Duration(b.N)
throughput := CalculateThroughput(fileSize, avgDuration)
recordsPerSec := CalculateLinesPerSecond(lineCount, avgDuration)
-
+
// Report metrics
b.ReportMetric(throughput, "MB/sec")
b.ReportMetric(recordsPerSec, "records/sec")
-
+
// Save result
benchResult := BenchmarkResult{
Timestamp: time.Now(),
@@ -530,4 +602,4 @@ func BenchmarkDMapCustomFunctions(b *testing.B) {
})
}
}
-} \ No newline at end of file
+}