summaryrefslogtreecommitdiff
path: root/integrationtests/harness.go
diff options
context:
space:
mode:
Diffstat (limited to 'integrationtests/harness.go')
-rw-r--r--integrationtests/harness.go185
1 files changed, 170 insertions, 15 deletions
diff --git a/integrationtests/harness.go b/integrationtests/harness.go
index b69e5d2..6633332 100644
--- a/integrationtests/harness.go
+++ b/integrationtests/harness.go
@@ -2,6 +2,7 @@ package integrationtests
import (
"bufio"
+ "bytes"
"fmt"
"io"
"os"
@@ -9,13 +10,18 @@ import (
"path/filepath"
"strconv"
"strings"
+ "sync"
"time"
)
const (
workloadStartupTimeout = 5 * time.Second
+ iorReadyTimeout = 30 * time.Second
+ iorReadySettleDelay = time.Second
iorShutdownGrace = 30 * time.Second
bpfObjectOverrideEnv = "IOR_BPF_OBJECT"
+ workloadStartupFileEnv = "IOR_WORKLOAD_STARTUP_FILE"
+ iorReadyLine = "Probing for "
)
// TestHarness orchestrates integration tests by starting an ior trace
@@ -37,17 +43,21 @@ func (h *TestHarness) Run(scenario string, duration int) (TestResult, int, error
// RunWithIorArgs behaves like Run but forwards additional args to ior.
func (h *TestHarness) RunWithIorArgs(scenario string, duration int, extraIorArgs []string) (TestResult, int, error) {
- workloadCmd, workloadPID, err := h.startWorkload(scenario)
+ startupFile := h.workloadStartupFile(scenario)
+ workloadCmd, workloadPID, workloadStderr, err := h.startWorkload(scenario, startupFile)
if err != nil {
return TestResult{}, 0, err
}
- iorCmd, err := h.startIor(workloadPID, scenario, duration, extraIorArgs)
+ iorCmd, readyCh, err := h.startIorForRun(workloadPID, scenario, duration, extraIorArgs)
if err != nil {
workloadCmd.Process.Kill()
workloadCmd.Wait()
return TestResult{}, workloadPID, err
}
+ if err := releaseWorkloadWhenIorReady(startupFile, workloadCmd, iorCmd, readyCh); err != nil {
+ return TestResult{}, workloadPID, err
+ }
workloadErr, iorErr := waitBoth(workloadCmd, iorCmd, duration, iorShutdownGrace)
@@ -55,7 +65,7 @@ func (h *TestHarness) RunWithIorArgs(scenario string, duration int, extraIorArgs
return TestResult{}, workloadPID, fmt.Errorf("ior: %w", iorErr)
}
if workloadErr != nil {
- return TestResult{}, workloadPID, fmt.Errorf("workload: %w", workloadErr)
+ return TestResult{}, workloadPID, workloadCommandError(workloadErr, workloadStderr.String())
}
iorFile, err := findIorZstFile(h.OutputDir, scenario)
@@ -74,43 +84,65 @@ func (h *TestHarness) RunWithIorArgs(scenario string, duration int, extraIorArgs
// RunParquet executes a scenario in headless Parquet mode and returns the
// recorded Parquet path.
func (h *TestHarness) RunParquet(scenario string, duration int) (string, int, error) {
+ return h.RunParquetWithIorArgs(scenario, duration, nil)
+}
+
+// RunParquetWithIorArgs behaves like RunParquet but forwards additional args
+// to ior.
+func (h *TestHarness) RunParquetWithIorArgs(scenario string, duration int, extraIorArgs []string) (string, int, error) {
parquetPath := filepath.Join(h.OutputDir, scenario+".parquet")
- workloadCmd, workloadPID, err := h.startWorkload(scenario)
+ startupFile := h.workloadStartupFile(scenario)
+ workloadCmd, workloadPID, workloadStderr, err := h.startWorkload(scenario, startupFile)
if err != nil {
return "", 0, err
}
- iorCmd, err := h.startIorParquet(workloadPID, parquetPath, duration)
+ iorCmd, readyCh, err := h.startIorParquetForRun(workloadPID, parquetPath, duration, extraIorArgs)
if err != nil {
workloadCmd.Process.Kill()
workloadCmd.Wait()
return "", workloadPID, err
}
+ if err := releaseWorkloadWhenIorReady(startupFile, workloadCmd, iorCmd, readyCh); err != nil {
+ return "", workloadPID, err
+ }
workloadErr, iorErr := waitBoth(workloadCmd, iorCmd, duration, iorShutdownGrace)
if iorErr != nil {
return "", workloadPID, fmt.Errorf("ior: %w", iorErr)
}
if workloadErr != nil {
- return "", workloadPID, fmt.Errorf("workload: %w", workloadErr)
+ return "", workloadPID, workloadCommandError(workloadErr, workloadStderr.String())
}
return parquetPath, workloadPID, nil
}
-func (h *TestHarness) startWorkload(scenario string) (*exec.Cmd, int, error) {
+func (h *TestHarness) workloadStartupFile(scenario string) string {
+ if filepath.Base(h.WorkloadBinary) != "ioworkload" {
+ return ""
+ }
+ return filepath.Join(h.OutputDir, scenario+".startup")
+}
+
+func (h *TestHarness) startWorkload(scenario, startupFile string) (*exec.Cmd, int, *bytes.Buffer, error) {
cmd := exec.Command(h.WorkloadBinary, "--scenario="+scenario)
- cmd.Stderr = os.Stderr
- if len(h.WorkloadEnv) > 0 {
+ stderr := &bytes.Buffer{}
+ cmd.Stderr = io.MultiWriter(os.Stderr, stderr)
+ if len(h.WorkloadEnv) > 0 || startupFile != "" {
cmd.Env = append(os.Environ(), h.WorkloadEnv...)
+ if startupFile != "" {
+ os.Remove(startupFile) //nolint:errcheck
+ cmd.Env = append(cmd.Env, workloadStartupFileEnv+"="+startupFile)
+ }
}
stdout, err := cmd.StdoutPipe()
if err != nil {
- return nil, 0, fmt.Errorf("workload stdout pipe: %w", err)
+ return nil, 0, nil, fmt.Errorf("workload stdout pipe: %w", err)
}
if err := cmd.Start(); err != nil {
- return nil, 0, fmt.Errorf("start workload: %w", err)
+ return nil, 0, nil, fmt.Errorf("start workload: %w", err)
}
pidCh := make(chan int, 1)
@@ -138,18 +170,26 @@ func (h *TestHarness) startWorkload(scenario string) (*exec.Cmd, int, error) {
select {
case pid := <-pidCh:
- return cmd, pid, nil
+ return cmd, pid, stderr, nil
case err := <-errCh:
cmd.Process.Kill()
cmd.Wait()
- return nil, 0, err
+ return nil, 0, nil, err
case <-startupTimer.C:
cmd.Process.Kill()
cmd.Wait()
- return nil, 0, fmt.Errorf("timeout waiting for workload PID")
+ return nil, 0, nil, fmt.Errorf("timeout waiting for workload PID")
}
}
+func workloadCommandError(err error, stderr string) error {
+ stderr = strings.TrimSpace(stderr)
+ if stderr == "" {
+ return fmt.Errorf("workload: %w", err)
+ }
+ return fmt.Errorf("workload: %w: %s", err, stderr)
+}
+
func (h *TestHarness) startIor(pid int, scenario string, duration int, extraArgs []string) (*exec.Cmd, error) {
args := []string{
"-pid", strconv.Itoa(pid),
@@ -161,15 +201,37 @@ func (h *TestHarness) startIor(pid int, scenario string, duration int, extraArgs
return h.startIorArgs(args)
}
-func (h *TestHarness) startIorParquet(pid int, parquetPath string, duration int) (*exec.Cmd, error) {
+func (h *TestHarness) startIorForRun(pid int, scenario string, duration int, extraArgs []string) (*exec.Cmd, <-chan error, error) {
+ args := []string{
+ "-pid", strconv.Itoa(pid),
+ "-flamegraph",
+ "-name", scenario,
+ "-duration", strconv.Itoa(duration),
+ }
+ args = append(args, extraArgs...)
+ return h.startIorArgsWithReady(args)
+}
+
+func (h *TestHarness) startIorParquet(pid int, parquetPath string, duration int, extraArgs []string) (*exec.Cmd, error) {
args := []string{
"-pid", strconv.Itoa(pid),
"-parquet", parquetPath,
"-duration", strconv.Itoa(duration),
}
+ args = append(args, extraArgs...)
return h.startIorArgs(args)
}
+func (h *TestHarness) startIorParquetForRun(pid int, parquetPath string, duration int, extraArgs []string) (*exec.Cmd, <-chan error, error) {
+ args := []string{
+ "-pid", strconv.Itoa(pid),
+ "-parquet", parquetPath,
+ "-duration", strconv.Itoa(duration),
+ }
+ args = append(args, extraArgs...)
+ return h.startIorArgsWithReady(args)
+}
+
func (h *TestHarness) startIorArgs(args []string) (*exec.Cmd, error) {
cmd := exec.Command(h.IorBinary, args...)
cmd.Dir = h.OutputDir
@@ -185,6 +247,99 @@ func (h *TestHarness) startIorArgs(args []string) (*exec.Cmd, error) {
return cmd, nil
}
+func (h *TestHarness) startIorArgsWithReady(args []string) (*exec.Cmd, <-chan error, error) {
+ cmd := exec.Command(h.IorBinary, args...)
+ cmd.Dir = h.OutputDir
+ if h.BpfObject != "" {
+ cmd.Env = append(os.Environ(), bpfObjectOverrideEnv+"="+h.BpfObject)
+ }
+
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return nil, nil, fmt.Errorf("ior stdout pipe: %w", err)
+ }
+ stderr, err := cmd.StderrPipe()
+ if err != nil {
+ return nil, nil, fmt.Errorf("ior stderr pipe: %w", err)
+ }
+
+ if err := cmd.Start(); err != nil {
+ return nil, nil, fmt.Errorf("start ior: %w", err)
+ }
+
+ readyCh := make(chan error, 1)
+ var once sync.Once
+ signalReady := func(err error) {
+ once.Do(func() {
+ readyCh <- err
+ close(readyCh)
+ })
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go scanIorOutput(stdout, os.Stdout, signalReady, &wg)
+ go scanIorOutput(stderr, os.Stderr, signalReady, &wg)
+ go func() {
+ wg.Wait()
+ signalReady(fmt.Errorf("ior exited before readiness line"))
+ }()
+
+ return cmd, readyCh, nil
+}
+
+func scanIorOutput(r io.Reader, w io.Writer, signalReady func(error), wg *sync.WaitGroup) {
+ defer wg.Done()
+ scanner := bufio.NewScanner(r)
+ for scanner.Scan() {
+ line := scanner.Text()
+ fmt.Fprintln(w, line)
+ if strings.Contains(line, iorReadyLine) {
+ signalReady(nil)
+ }
+ }
+ if err := scanner.Err(); err != nil {
+ signalReady(fmt.Errorf("read ior output: %w", err))
+ }
+}
+
+func releaseWorkloadWhenIorReady(startupFile string, workloadCmd, iorCmd *exec.Cmd, readyCh <-chan error) error {
+ if startupFile == "" {
+ return nil
+ }
+
+ timer := time.NewTimer(iorReadyTimeout)
+ defer stopAndDrainTimer(timer)
+
+ select {
+ case err := <-readyCh:
+ if err != nil {
+ killAndWait(workloadCmd)
+ killAndWait(iorCmd)
+ return fmt.Errorf("wait for ior readiness: %w", err)
+ }
+ time.Sleep(iorReadySettleDelay)
+ if err := os.WriteFile(startupFile, []byte("ready\n"), 0o600); err != nil {
+ killAndWait(workloadCmd)
+ killAndWait(iorCmd)
+ return fmt.Errorf("release workload: %w", err)
+ }
+ return nil
+ case <-timer.C:
+ killAndWait(workloadCmd)
+ killAndWait(iorCmd)
+ return fmt.Errorf("timeout waiting for ior readiness")
+ }
+}
+
+func killAndWait(cmd *exec.Cmd) {
+ if cmd == nil || cmd.Process == nil {
+ return
+ }
+ cmd.Process.Kill() //nolint:errcheck
+ cmd.Wait() //nolint:errcheck
+}
+
// waitBoth waits for both the workload and ior commands concurrently.
// If ior does not finish within duration + grace period, it is killed.
func waitBoth(workloadCmd, iorCmd *exec.Cmd, duration int, grace time.Duration) (workloadErr, iorErr error) {