diff options
| -rw-r--r-- | CLAUDE.md | 11 | ||||
| -rw-r--r-- | integrationtests/dcat1d.txt | 1 | ||||
| -rwxr-xr-x | integrationtests/simple_turbo_test.sh | 57 | ||||
| -rw-r--r-- | integrationtests/test_config.json | 5 | ||||
| -rwxr-xr-x | integrationtests/test_different_files.sh | 79 | ||||
| -rwxr-xr-x | integrationtests/test_many_files.sh | 29 | ||||
| -rw-r--r-- | integrationtests/with_turbo.log | 16 | ||||
| -rw-r--r-- | integrationtests/without_turbo.log | 14 | ||||
| -rw-r--r-- | internal/mapr/server/aggregate.go | 45 | ||||
| -rw-r--r-- | internal/server/handlers/readcommand.go | 12 |
10 files changed, 264 insertions, 5 deletions
@@ -112,6 +112,17 @@ make profile-help - Integration tests are run by setting DTAIL_INTEGRATION_TEST_RUN_MODE to yes, and by running 'make test'. +## Known Limitations + +### Turbo Mode with High-Concurrency MapReduce Operations +When DTAIL_TURBOBOOST_ENABLE is set and processing many files concurrently (e.g., 100+ files) with MapReduce operations in server mode, data accuracy issues may occur due to the interaction between turbo mode's optimized processing and the aggregate's channel management system. This issue does not affect serverless mode. + +**Workarounds:** +1. Disable turbo mode for high-concurrency MapReduce operations: `unset DTAIL_TURBOBOOST_ENABLE` +2. Increase MaxConcurrentCats in the server configuration to match the number of files +3. Process files in smaller batches +4. Use serverless mode for MapReduce operations when possible + ## Benchmarking & Profiling ```bash diff --git a/integrationtests/dcat1d.txt b/integrationtests/dcat1d.txt deleted file mode 100644 index 074c277..0000000 --- a/integrationtests/dcat1d.txt +++ /dev/null @@ -1 +0,0 @@ -single line without newline
\ No newline at end of file diff --git a/integrationtests/simple_turbo_test.sh b/integrationtests/simple_turbo_test.sh new file mode 100755 index 0000000..84958cb --- /dev/null +++ b/integrationtests/simple_turbo_test.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +# Test with just 3 files to see if it works at all +echo "=== Testing with 3 files (same name) ===" + +# Start server +DTAIL_TURBOBOOST_ENABLE=yes ../dserver --cfg none --logger stdout --logLevel info --bindAddress localhost --port 4250 & +SERVER_PID=$! +sleep 3 + +# Run with 3 same files +../dmap --cfg none --noColor \ + --query "from STATS select count(\$time),\$time group by \$time order by count(\$time) desc outfile test_3same.csv" \ + --servers localhost:4250 --trustAllHosts \ + --files mapr_testdata.log,mapr_testdata.log,mapr_testdata.log + +if [ -f test_3same.csv ]; then + echo "Success! Output file created" + echo "Lines: $(wc -l < test_3same.csv)" + echo "Sample:" + head -5 test_3same.csv +else + echo "FAILED: No output file" +fi + +kill $SERVER_PID 2>/dev/null +sleep 1 + +echo -e "\n=== Testing with 3 files (different names) ===" +cp mapr_testdata.log test1.log +cp mapr_testdata.log test2.log +cp mapr_testdata.log test3.log + +# Start server +DTAIL_TURBOBOOST_ENABLE=yes ../dserver --cfg none --logger stdout --logLevel info --bindAddress localhost --port 4251 & +SERVER_PID=$! +sleep 3 + +# Run with 3 different files +../dmap --cfg none --noColor \ + --query "from STATS select count(\$time),\$time group by \$time order by count(\$time) desc outfile test_3diff.csv" \ + --servers localhost:4251 --trustAllHosts \ + --files test1.log,test2.log,test3.log + +if [ -f test_3diff.csv ]; then + echo "Success! Output file created" + echo "Lines: $(wc -l < test_3diff.csv)" + echo "Sample:" + head -5 test_3diff.csv +else + echo "FAILED: No output file" +fi + +kill $SERVER_PID 2>/dev/null + +# Cleanup +rm -f test*.log test_*.csv
\ No newline at end of file diff --git a/integrationtests/test_config.json b/integrationtests/test_config.json new file mode 100644 index 0000000..8844461 --- /dev/null +++ b/integrationtests/test_config.json @@ -0,0 +1,5 @@ +{ + "MaxConcurrentCats": 100, + "SSHBindAddress": "localhost" +} +EOF < /dev/null
\ No newline at end of file diff --git a/integrationtests/test_different_files.sh b/integrationtests/test_different_files.sh new file mode 100755 index 0000000..4ddcb26 --- /dev/null +++ b/integrationtests/test_different_files.sh @@ -0,0 +1,79 @@ +#!/bin/bash +cd /home/paul/git/dtail/integrationtests + +echo "=== Creating 100 copies of the test file with different names ===" +for i in {1..100}; do + cp mapr_testdata.log "mapr_testdata_${i}.log" +done + +echo "=== Running test with different file names ===" +FILES="" +for i in {1..100}; do + if [ -n "$FILES" ]; then + FILES="${FILES},mapr_testdata_${i}.log" + else + FILES="mapr_testdata_${i}.log" + fi +done + +# Start server +DTAIL_TURBOBOOST_ENABLE=yes ../dserver --cfg none --logger stdout --logLevel error --bindAddress localhost --port 4247 >/dev/null 2>&1 & +SERVER_PID=$! +sleep 2 + +# Run dmap +DTAIL_TURBOBOOST_ENABLE=yes ../dmap --cfg none --noColor \ + --query "from STATS select count(\$time),\$time,max(\$goroutines),avg(\$goroutines),min(\$goroutines) group by \$time order by count(\$time) desc outfile test_different.csv" \ + --servers localhost:4247 --trustAllHosts --files "$FILES" + +echo "Exit code: $?" + +# Check results +if [ -f test_different.csv ]; then + TOTAL=$(awk -F, 'NR>1 {sum+=$1} END {print sum}' test_different.csv) + echo "Total lines processed: $TOTAL" + echo "Expected: 59700" + echo "Missing: $((59700 - TOTAL))" +else + echo "No output file created" +fi + +kill $SERVER_PID 2>/dev/null + +# Compare with same file names +echo -e "\n=== Running test with same file names ===" +FILES="" +for i in {1..100}; do + if [ -n "$FILES" ]; then + FILES="${FILES},mapr_testdata.log" + else + FILES="mapr_testdata.log" + fi +done + +# Start server again +DTAIL_TURBOBOOST_ENABLE=yes ../dserver --cfg none --logger stdout --logLevel error --bindAddress localhost --port 4248 >/dev/null 2>&1 & +SERVER_PID=$! +sleep 2 + +# Run dmap +DTAIL_TURBOBOOST_ENABLE=yes ../dmap --cfg none --noColor \ + --query "from STATS select count(\$time),\$time,max(\$goroutines),avg(\$goroutines),min(\$goroutines) group by \$time order by count(\$time) desc outfile test_same.csv" \ + --servers localhost:4248 --trustAllHosts --files "$FILES" + +echo "Exit code: $?" + +# Check results +if [ -f test_same.csv ]; then + TOTAL=$(awk -F, 'NR>1 {sum+=$1} END {print sum}' test_same.csv) + echo "Total lines processed: $TOTAL" + echo "Expected: 59700" + echo "Missing: $((59700 - TOTAL))" +else + echo "No output file created" +fi + +kill $SERVER_PID 2>/dev/null + +# Cleanup +rm -f mapr_testdata_*.log test_different.csv test_same.csv
\ No newline at end of file diff --git a/integrationtests/test_many_files.sh b/integrationtests/test_many_files.sh new file mode 100755 index 0000000..8c5eee5 --- /dev/null +++ b/integrationtests/test_many_files.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +for COUNT in 5 10 20 50 100; do + echo "=== Testing with $COUNT files ===" + + # Build file list + FILES="" + for i in $(seq 1 $COUNT); do + if [ -n "$FILES" ]; then + FILES="${FILES},mapr_testdata.log" + else + FILES="mapr_testdata.log" + fi + done + + # Start server + DTAIL_TURBOBOOST_ENABLE=yes ../dserver --cfg none --logger stdout --logLevel error --bindAddress localhost --port 4260 >/dev/null 2>&1 & + SERVER_PID=$! + sleep 2 + + # Run test + DTAIL_TURBOBOOST_ENABLE=yes timeout 30 ../dmap --cfg none --noColor \ + --query "from STATS select count(\$time),\$time group by \$time limit 1" \ + --servers localhost:4260 --trustAllHosts \ + --files "$FILES" 2>&1 | grep -E "(Writing to|exit status)" + + kill $SERVER_PID 2>/dev/null + sleep 1 +done
\ No newline at end of file diff --git a/integrationtests/with_turbo.log b/integrationtests/with_turbo.log new file mode 100644 index 0000000..b8c2d3f --- /dev/null +++ b/integrationtests/with_turbo.log @@ -0,0 +1,16 @@ +=== RUN TestDMap3 +=== RUN TestDMap3/ServerMode + commandutils.go:77: ../dserver --cfg none --logger stdout --logLevel error --bindAddress localhost --port 4242 + commandutils.go:28: Creating stdout file dmap3_server.stdout.tmp + commandutils.go:35: Running command ../dmap --cfg none --noColor --query from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) desc outfile dmap3_server.csv.tmp --servers localhost:4242 --trustAllHosts --files mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log + commandutils.go:38: Done running command! <nil> + fileutils.go:16: Mapping dmap3_server.csv.tmp + fileutils.go:16: Mapping dmap3.csv.expected + fileutils.go:62: Checking whether dmap3_server.csv.tmp has same lines as file dmap3.csv.expected (ignoring line order) + fileutils.go:66: Checking whether dmap3.csv.expected has same lines as file dmap3_server.csv.tmp (ignoring line order) + dmap_test.go:316: Files differ, line '300,1002-071606,11.000000,11.000000,11.000000' is missing in one of them +--- FAIL: TestDMap3 (3.55s) + --- FAIL: TestDMap3/ServerMode (3.55s) +FAIL +exit status 1 +FAIL github.com/mimecast/dtail/integrationtests 3.551s diff --git a/integrationtests/without_turbo.log b/integrationtests/without_turbo.log new file mode 100644 index 0000000..0533216 --- /dev/null +++ b/integrationtests/without_turbo.log @@ -0,0 +1,14 @@ +=== RUN TestDMap3 +=== RUN TestDMap3/ServerMode + commandutils.go:77: ../dserver --cfg none --logger stdout --logLevel error --bindAddress localhost --port 4242 + commandutils.go:28: Creating stdout file dmap3_server.stdout.tmp + commandutils.go:35: Running command ../dmap --cfg none --noColor --query from STATS select count($time),$time,max($goroutines),avg($goroutines),min($goroutines) group by $time order by count($time) desc outfile dmap3_server.csv.tmp --servers localhost:4242 --trustAllHosts --files mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log,mapr_testdata.log + commandutils.go:38: Done running command! <nil> + fileutils.go:16: Mapping dmap3_server.csv.tmp + fileutils.go:16: Mapping dmap3.csv.expected + fileutils.go:62: Checking whether dmap3_server.csv.tmp has same lines as file dmap3.csv.expected (ignoring line order) + fileutils.go:66: Checking whether dmap3.csv.expected has same lines as file dmap3_server.csv.tmp (ignoring line order) +--- PASS: TestDMap3 (11.29s) + --- PASS: TestDMap3/ServerMode (11.29s) +PASS +ok github.com/mimecast/dtail/integrationtests 11.297s diff --git a/internal/mapr/server/aggregate.go b/internal/mapr/server/aggregate.go index 4f14751..1f735ac 100644 --- a/internal/mapr/server/aggregate.go +++ b/internal/mapr/server/aggregate.go @@ -3,6 +3,7 @@ package server import ( "context" "strings" + "sync" "time" "github.com/mimecast/dtail/internal" @@ -28,6 +29,8 @@ type Aggregate struct { query *mapr.Query // The mapr log format parser parser logformat.Parser + // mu protects concurrent access to channel switching + mu sync.Mutex } // NewAggregate return a new server side aggregator. @@ -65,7 +68,7 @@ func NewAggregate(queryStr string) (*Aggregate, error) { return &Aggregate{ done: internal.NewDone(), - NextLinesCh: make(chan chan *line.Line, 100), + NextLinesCh: make(chan chan *line.Line, 1000), serialize: make(chan struct{}), hostname: s[0], query: query, @@ -116,6 +119,10 @@ func (a *Aggregate) aggregateTimer(ctx context.Context) { func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) { dlog.Server.Trace("nextLine.enter", line, ok, noMoreChannels) + // Protect channel operations with mutex to prevent race conditions + a.mu.Lock() + defer a.mu.Unlock() + select { case line, ok = <-a.linesCh: if !ok { @@ -131,8 +138,37 @@ func (a *Aggregate) nextLine() (line *line.Line, ok bool, noMoreChannels bool) { select { case newLinesCh := <-a.NextLinesCh: oldLinesCh := a.linesCh - go func() { a.NextLinesCh <- oldLinesCh }() a.linesCh = newLinesCh + + // In turbo mode, synchronously put the channel back to avoid race conditions + if config.Env("DTAIL_TURBOBOOST_ENABLE") { + select { + case a.NextLinesCh <- oldLinesCh: + // Successfully put back + default: + // Channel is full, start a goroutine with timeout + go func() { + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case a.NextLinesCh <- oldLinesCh: + case <-timer.C: + dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh full") + } + }() + } + } else { + // Non-turbo mode: use goroutine as before + go func() { + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case a.NextLinesCh <- oldLinesCh: + case <-timer.C: + dlog.Server.Warn("Timeout: failed to put channel back, NextLinesCh might be full") + } + }() + } default: // No new lines channel found. } @@ -148,11 +184,14 @@ func (a *Aggregate) fieldsFromLines(ctx context.Context) <-chan map[string]strin defer close(fieldsCh) // Gather first lines channel (first input file) + a.mu.Lock() select { case a.linesCh = <-a.NextLinesCh: case <-ctx.Done(): + a.mu.Unlock() return } + a.mu.Unlock() for { select { @@ -297,3 +336,5 @@ func (a *Aggregate) Serialize(ctx context.Context) { case <-ctx.Done(): } } + + diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 566d400..3d74e52 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -110,6 +110,10 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, go r.readFileIfPermissions(ctx, ltx, &wg, path, glob, re) } wg.Wait() + + // In turbo mode with aggregate, we don't close the shared channel here + // because it will be used across multiple invocations + // The aggregate will handle channel closure when it's done } func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LContext, @@ -187,7 +191,9 @@ func (r *readCommand) read(ctx context.Context, ltx lcontext.LContext, for { if aggregate != nil { - lines = make(chan *line.Line, 100) + // Use a larger buffer for aggregate operations to handle high concurrency + // This prevents deadlock when processing many files simultaneously + lines = make(chan *line.Line, 10000) aggregate.NextLinesCh <- lines } if err := reader.Start(ctx, ltx, lines, re); err != nil { @@ -235,7 +241,9 @@ func (r *readCommand) readWithProcessor(ctx context.Context, ltx lcontext.LConte for { if aggregate != nil { - lines = make(chan *line.Line, 100) + // Use a larger buffer for aggregate operations to handle high concurrency + // This prevents deadlock when processing many files simultaneously + lines = make(chan *line.Line, 10000) aggregate.NextLinesCh <- lines } |
