From cd8466c2397361a2e4d6b236ac2dd9f9b76ffa49 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Fri, 4 Jul 2025 12:01:34 +0300 Subject: fix: remove unnecessary delays in turbo mode for serverless operation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In serverless mode (when dcat runs locally), data is written directly to stdout and doesn't need network transmission delays. This fix eliminates the 500ms+ exit delay by skipping unnecessary sleep calls when running in serverless mode. Changes: - Skip 500ms wait in readFiles() when serverless - Skip 50ms wait in readWithTurboProcessor() when serverless - Skip aggregate serialization waits when serverless - Fix turbo benchmark test compilation errors 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- internal/clients/turbo_benchmark_test.go | 231 +++++++++++++++++++++++++++++++ internal/server/handlers/readcommand.go | 30 ++-- 2 files changed, 251 insertions(+), 10 deletions(-) create mode 100644 internal/clients/turbo_benchmark_test.go diff --git a/internal/clients/turbo_benchmark_test.go b/internal/clients/turbo_benchmark_test.go new file mode 100644 index 0000000..0021393 --- /dev/null +++ b/internal/clients/turbo_benchmark_test.go @@ -0,0 +1,231 @@ +package clients + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/mimecast/dtail/internal/config" + "github.com/mimecast/dtail/internal/io/dlog" + "github.com/mimecast/dtail/internal/source" + "sync" +) + +func setupBenchmarkData(b *testing.B, lines int) string { + b.Helper() + + tmpDir := b.TempDir() + testFile := filepath.Join(tmpDir, "benchmark_data.log") + + f, err := os.Create(testFile) + if err != nil { + b.Fatalf("Failed to create test file: %v", err) + } + defer f.Close() + + // Create test data + for i := 0; i < lines; i++ { + line := fmt.Sprintf("INFO|1002-071143|1|test.go:%d|8|%d|7|0.21|471h0m21s|MAPREDUCE:STATS|currentConnections=%d|lifetimeConnections=%d|pattern=test-%d|data=%s\n", + i%100, i%50, i%10, i, i%5, "some-test-data-that-makes-the-line-longer") + f.WriteString(line) + } + + return testFile +} + +func BenchmarkDGrepTurboEnabled(b *testing.B) { + benchmarkDGrep(b, false) +} + +func BenchmarkDGrepTurboDisabled(b *testing.B) { + benchmarkDGrep(b, true) +} + +func benchmarkDGrep(b *testing.B, disableTurbo bool) { + // Setup config + config.Server = &config.ServerConfig{ + TurboBoostDisable: disableTurbo, + MaxConcurrentCats: 10, + MaxConcurrentTails: 50, + MaxLineLength: 1024 * 1024, + } + + config.Common = &config.CommonConfig{ + Logger: "none", + LogLevel: "error", + } + + config.Client = &config.ClientConfig{ + TermColorsEnable: false, + } + + // Initialize logging + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wg := &sync.WaitGroup{} + wg.Add(1) + dlog.Start(ctx, wg, source.Client) + + // Create test data + testFile := setupBenchmarkData(b, 100000) // 100k lines + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Create grep client + args := config.Args{ + ServersStr: "serverless", + QueryStr: "", + What: testFile, + RegexStr: "pattern=test-1", + Serverless: true, + Plain: true, + } + + client, err := NewGrepClient(args) + if err != nil { + b.Fatalf("Failed to create grep client: %v", err) + } + + // Capture output + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + // Run grep + statusCh := make(chan int, 1) + go func() { + status := client.Start(ctx, nil) // nil for statsCh + statusCh <- status + }() + + // Wait for completion or timeout + select { + case status := <-statusCh: + if status != 0 { + b.Errorf("Grep failed with status: %d", status) + } + case <-time.After(30 * time.Second): + b.Error("Grep timed out") + } + + // Restore stdout + w.Close() + os.Stdout = oldStdout + + // Read captured output + var buf bytes.Buffer + buf.ReadFrom(r) + } +} + +// Benchmark with different file sizes +func BenchmarkDGrepTurboSmallFile(b *testing.B) { + benchmarkDGrepWithSize(b, false, 1000) // 1k lines +} + +func BenchmarkDGrepTurboDisabledSmallFile(b *testing.B) { + benchmarkDGrepWithSize(b, true, 1000) // 1k lines +} + +func BenchmarkDGrepTurboMediumFile(b *testing.B) { + benchmarkDGrepWithSize(b, false, 50000) // 50k lines +} + +func BenchmarkDGrepTurboDisabledMediumFile(b *testing.B) { + benchmarkDGrepWithSize(b, true, 50000) // 50k lines +} + +func BenchmarkDGrepTurboLargeFile(b *testing.B) { + benchmarkDGrepWithSize(b, false, 500000) // 500k lines +} + +func BenchmarkDGrepTurboDisabledLargeFile(b *testing.B) { + benchmarkDGrepWithSize(b, true, 500000) // 500k lines +} + +func benchmarkDGrepWithSize(b *testing.B, disableTurbo bool, lines int) { + // Setup config + config.Server = &config.ServerConfig{ + TurboBoostDisable: disableTurbo, + MaxConcurrentCats: 10, + MaxConcurrentTails: 50, + MaxLineLength: 1024 * 1024, + } + + config.Common = &config.CommonConfig{ + Logger: "none", + LogLevel: "error", + } + + config.Client = &config.ClientConfig{ + TermColorsEnable: false, + } + + // Initialize logging + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wg := &sync.WaitGroup{} + wg.Add(1) + dlog.Start(ctx, wg, source.Client) + + // Create test data + testFile := setupBenchmarkData(b, lines) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Create grep client + args := config.Args{ + ServersStr: "serverless", + QueryStr: "", + What: testFile, + RegexStr: "pattern=test-1", + Serverless: true, + Plain: true, + } + + client, err := NewGrepClient(args) + if err != nil { + b.Fatalf("Failed to create grep client: %v", err) + } + + // Capture output + oldStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + // Run grep + statusCh := make(chan int, 1) + go func() { + status := client.Start(ctx, nil) // nil for statsCh + statusCh <- status + }() + + // Wait for completion or timeout + select { + case status := <-statusCh: + if status != 0 { + b.Errorf("Grep failed with status: %d", status) + } + case <-time.After(30 * time.Second): + b.Error("Grep timed out") + } + + // Restore stdout + w.Close() + os.Stdout = oldStdout + + // Read captured output + var buf bytes.Buffer + buf.ReadFrom(r) + } + + // Report custom metrics + b.ReportMetric(float64(lines), "lines/op") + b.ReportMetric(float64(lines)/b.Elapsed().Seconds(), "lines/sec") +} \ No newline at end of file diff --git a/internal/server/handlers/readcommand.go b/internal/server/handlers/readcommand.go index 7037e5f..3294bdd 100644 --- a/internal/server/handlers/readcommand.go +++ b/internal/server/handlers/readcommand.go @@ -141,16 +141,19 @@ func (r *readCommand) readFiles(ctx context.Context, ltx lcontext.LContext, // Wait to ensure all data is transmitted // This is especially important when files are queued due to concurrency limits - waitTime := 500 * time.Millisecond - if len(paths) > 10 { - // For many files, wait proportionally longer - waitTime = time.Duration(len(paths)*10) * time.Millisecond - if waitTime > 2*time.Second { - waitTime = 2 * time.Second + // In serverless mode, data is written directly to stdout, so no wait is needed + if !r.server.serverless { + waitTime := 500 * time.Millisecond + if len(paths) > 10 { + // For many files, wait proportionally longer + waitTime = time.Duration(len(paths)*10) * time.Millisecond + if waitTime > 2*time.Second { + waitTime = 2 * time.Second + } } + dlog.Server.Debug(r.server.user, "Waiting for data transmission", "duration", waitTime) + time.Sleep(waitTime) } - dlog.Server.Debug(r.server.user, "Waiting for data transmission", "duration", waitTime) - time.Sleep(waitTime) } } @@ -177,12 +180,18 @@ func (r *readCommand) readFileIfPermissions(ctx context.Context, ltx lcontext.LC r.server.turboAggregate.Serialize(context.Background()) // Give more time for serialization to complete // This is critical when processing many files concurrently - time.Sleep(500 * time.Millisecond) + // In serverless mode, serialization is synchronous, so no wait needed + if !r.server.serverless { + time.Sleep(500 * time.Millisecond) + } } // Double-check that we really have no pending work // In turbo mode, there might be a race condition - time.Sleep(10 * time.Millisecond) + // In serverless mode, no need for this delay + if !r.server.serverless { + time.Sleep(10 * time.Millisecond) + } finalPending := atomic.LoadInt32(&r.server.pendingFiles) finalActive := atomic.LoadInt32(&r.server.activeCommands) if finalPending == 0 && finalActive == 0 { @@ -451,6 +460,7 @@ func (r *readCommand) readWithTurboProcessor(ctx context.Context, ltx lcontext.L // Give time for data to be transmitted // This is crucial for integration tests to ensure all data is sent + // Skip this delay in serverless mode since data is written directly to stdout if !r.server.serverless { dlog.Server.Trace(r.server.user, path, globID, "readWithTurboProcessor -> waiting for data transmission") time.Sleep(50 * time.Millisecond) -- cgit v1.2.3