diff options
| author | Paul Buetow <paul@buetow.org> | 2025-07-04 12:01:34 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2025-07-04 12:01:34 +0300 |
| commit | cd8466c2397361a2e4d6b236ac2dd9f9b76ffa49 (patch) | |
| tree | 8ca842cfabf8123e7fd3abefdfe2ba0d1db02959 /internal/clients | |
| parent | 4e7abc300e4c4607511a781ac5f67b44f00a7644 (diff) | |
fix: remove unnecessary delays in turbo mode for serverless operation
In serverless mode (when dcat runs locally), data is written directly to stdout
and doesn't need network transmission delays. This fix eliminates the 500ms+
exit delay by skipping unnecessary sleep calls when running in serverless mode.
Changes:
- Skip 500ms wait in readFiles() when serverless
- Skip 50ms wait in readWithTurboProcessor() when serverless
- Skip aggregate serialization waits when serverless
- Fix turbo benchmark test compilation errors
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
Diffstat (limited to 'internal/clients')
| -rw-r--r-- | internal/clients/turbo_benchmark_test.go | 231 |
1 files changed, 231 insertions, 0 deletions
diff --git a/internal/clients/turbo_benchmark_test.go b/internal/clients/turbo_benchmark_test.go new file mode 100644 index 0000000..0021393 --- /dev/null +++ b/internal/clients/turbo_benchmark_test.go @@ -0,0 +1,231 @@ +package clients + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/source" + "sync" +) + +func setupBenchmarkData(b *testing.B, lines int) string { + b.Helper() + + tmpDir := b.TempDir() + testFile := filepath.Join(tmpDir, "benchmark_data.log") + + f, err := os.Create(testFile) + if err != nil { + b.Fatalf("Failed to create test file: %v", err) + } + defer f.Close() + + // Create test data + for i := 0; i < lines; i++ { + line := fmt.Sprintf("INFO|1002-071143|1|test.go:%d|8|%d|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=%d|lifetimeConnections=%d|pattern=test-%d|data=%s\n", + i%100, i%50, i%10, i, i%5, "some-test-data-that-makes-the-line-longer") + f.WriteString(line) + } + + return testFile +} + +func BenchmarkDGrepTurboEnabled(b *testing.B) { + benchmarkDGrep(b, false) +} + +func BenchmarkDGrepTurboDisabled(b *testing.B) { + benchmarkDGrep(b, true) +} + +func benchmarkDGrep(b *testing.B, disableTurbo bool) { + // Setup config + config.Server = &config.ServerConfig{ + TurboBoostDisable: disableTurbo, + MaxConcurrentCats: 10, + MaxConcurrentTails: 50, + MaxLineLength: 1024 * 1024, + } + + config.Common = &config.CommonConfig{ + Logger: "none", + LogLevel: "error", + } + + config.Client = &config.ClientConfig{ + TermColorsEnable: false, + } + + // Initialize logging + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wg := &sync.WaitGroup{} + wg.Add(1) + dlog.Start(ctx, wg, source.Client) + + // Create test data + testFile := setupBenchmarkData(b, 100000) // 100k lines + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Create grep client + args := config.Args{ + ServersStr: "serverless", + QueryStr: "", + What: testFile, + RegexStr: "pattern=test-1", + Serverless: true, + Plain: true, + } + + client, err := NewGrepClient(args) + if err != nil { + b.Fatalf("Failed to create grep client: %v", err) + } + + // Capture output + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + // Run grep + statusCh := make(chan int, 1) + go func() { + status := client.Start(ctx, nil) // nil for statsCh + statusCh <- status + }() + + // Wait for completion or timeout + select { + case status := <-statusCh: + if status != 0 { + b.Errorf("Grep failed with status: %d", status) + } + case <-time.After(30 * time.Second): + b.Error("Grep timed out") + } + + // Restore stdout + w.Close() + os.Stdout = oldStdout + + // Read captured output + var buf bytes.Buffer + buf.ReadFrom(r) + } +} + +// Benchmark with different file sizes +func BenchmarkDGrepTurboSmallFile(b *testing.B) { + benchmarkDGrepWithSize(b, false, 1000) // 1k lines +} + +func BenchmarkDGrepTurboDisabledSmallFile(b *testing.B) { + benchmarkDGrepWithSize(b, true, 1000) // 1k lines +} + +func BenchmarkDGrepTurboMediumFile(b *testing.B) { + benchmarkDGrepWithSize(b, false, 50000) // 50k lines +} + +func BenchmarkDGrepTurboDisabledMediumFile(b *testing.B) { + benchmarkDGrepWithSize(b, true, 50000) // 50k lines +} + +func BenchmarkDGrepTurboLargeFile(b *testing.B) { + benchmarkDGrepWithSize(b, false, 500000) // 500k lines +} + +func BenchmarkDGrepTurboDisabledLargeFile(b *testing.B) { + benchmarkDGrepWithSize(b, true, 500000) // 500k lines +} + +func benchmarkDGrepWithSize(b *testing.B, disableTurbo bool, lines int) { + // Setup config + config.Server = &config.ServerConfig{ + TurboBoostDisable: disableTurbo, + MaxConcurrentCats: 10, + MaxConcurrentTails: 50, + MaxLineLength: 1024 * 1024, + } + + config.Common = &config.CommonConfig{ + Logger: "none", + LogLevel: "error", + } + + config.Client = &config.ClientConfig{ + TermColorsEnable: false, + } + + // Initialize logging + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wg := &sync.WaitGroup{} + wg.Add(1) + dlog.Start(ctx, wg, source.Client) + + // Create test data + testFile := setupBenchmarkData(b, lines) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Create grep client + args := config.Args{ + ServersStr: "serverless", + QueryStr: "", + What: testFile, + RegexStr: "pattern=test-1", + Serverless: true, + Plain: true, + } + + client, err := NewGrepClient(args) + if err != nil { + b.Fatalf("Failed to create grep client: %v", err) + } + + // Capture output + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + // Run grep + statusCh := make(chan int, 1) + go func() { + status := client.Start(ctx, nil) // nil for statsCh + statusCh <- status + }() + + // Wait for completion or timeout + select { + case status := <-statusCh: + if status != 0 { + b.Errorf("Grep failed with status: %d", status) + } + case <-time.After(30 * time.Second): + b.Error("Grep timed out") + } + + // Restore stdout + w.Close() + os.Stdout = oldStdout + + // Read captured output + var buf bytes.Buffer + buf.ReadFrom(r) + } + + // Report custom metrics + b.ReportMetric(float64(lines), "lines/op") + b.ReportMetric(float64(lines)/b.Elapsed().Seconds(), "lines/sec") +}
\ No newline at end of file |
