diff options
| -rw-r--r-- | Magefile.go | 133 |
1 files changed, 82 insertions, 51 deletions
diff --git a/Magefile.go b/Magefile.go index 540515c..6f24e9d 100644 --- a/Magefile.go +++ b/Magefile.go @@ -840,50 +840,36 @@ type goTestEvent struct { Output string `json:"Output"` } -func runGoTestWithProgress(env map[string]string, args ...string) error { - cmdArgs := append([]string{"test"}, args...) - - var cmd *exec.Cmd +// buildGoTestCmd constructs the exec.Cmd for running `go test -json` with the +// given env vars. When not root it wraps the command with `sudo env KEY=VAL …` +// so elevated integration tests inherit the correct CGO/LIBBPFGO environment. +func buildGoTestCmd(env map[string]string, cmdArgs []string) *exec.Cmd { if os.Geteuid() == 0 { - cmd = exec.Command("go", cmdArgs...) + cmd := exec.Command("go", cmdArgs...) cmd.Env = append(os.Environ(), envToList(env)...) - } else { - keys := make([]string, 0, len(env)) - for k := range env { - keys = append(keys, k) - } - slices.Sort(keys) - sudoArgs := make([]string, 0, 1+len(keys)+1+len(cmdArgs)) - sudoArgs = append(sudoArgs, "env") - for _, k := range keys { - sudoArgs = append(sudoArgs, k+"="+env[k]) - } - sudoArgs = append(sudoArgs, "go") - sudoArgs = append(sudoArgs, cmdArgs...) - cmd = exec.Command("sudo", sudoArgs...) - } - - stdout, err := cmd.StdoutPipe() - if err != nil { - return err + return cmd } - stderr, err := cmd.StderrPipe() - if err != nil { - return err + keys := make([]string, 0, len(env)) + for k := range env { + keys = append(keys, k) } - - if err := cmd.Start(); err != nil { - return err + slices.Sort(keys) + sudoArgs := make([]string, 0, 1+len(keys)+1+len(cmdArgs)) + sudoArgs = append(sudoArgs, "env") + for _, k := range keys { + sudoArgs = append(sudoArgs, k+"="+env[k]) } + sudoArgs = append(sudoArgs, "go") + sudoArgs = append(sudoArgs, cmdArgs...) + return exec.Command("sudo", sudoArgs...) +} - scanner := bufio.NewScanner(stdout) - scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) - - running := map[string]time.Time{} +// startProgressTicker prints the set of currently-running tests every 15 s. +// Call close(done) to stop the ticker goroutine. +func startProgressTicker(running map[string]time.Time, done <-chan struct{}) { ticker := time.NewTicker(15 * time.Second) - defer ticker.Stop() - done := make(chan struct{}) go func() { + defer ticker.Stop() for { select { case <-done: @@ -902,11 +888,11 @@ func runGoTestWithProgress(env map[string]string, args ...string) error { } } }() +} - go func() { - _, _ = io.Copy(os.Stderr, stderr) - }() - +// drainTestEvents reads JSON test events from scanner, updates the running map, +// and prints human-readable RUN/PASS/FAIL/SKIP/LOG lines. +func drainTestEvents(scanner *bufio.Scanner, running map[string]time.Time) { for scanner.Scan() { line := scanner.Bytes() var ev goTestEvent @@ -938,7 +924,38 @@ func runGoTestWithProgress(env map[string]string, args ...string) error { } } } +} + +// runGoTestWithProgress runs `go test -json` (via sudo when not root), streams +// progress to stdout every 15 s, and returns a non-nil error on test failure. +func runGoTestWithProgress(env map[string]string, args ...string) error { + cmd := buildGoTestCmd(env, append([]string{"test"}, args...)) + + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + stderr, err := cmd.StderrPipe() + if err != nil { + return err + } + if err := cmd.Start(); err != nil { + return err + } + + // Forward stderr from the test binary so build errors are always visible. + go func() { _, _ = io.Copy(os.Stderr, stderr) }() + + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + + running := map[string]time.Time{} + done := make(chan struct{}) + startProgressTicker(running, done) + + drainTestEvents(scanner, running) close(done) + if err := scanner.Err(); err != nil { return err } @@ -1091,12 +1108,9 @@ var expectedParquetColumns = []string{ "bytes", "file", "is_error", "filter_epoch", } -// runParquetChecks runs schema, row-count, and sanity checks against the parquet file. -// dir is the absolute directory containing file (mounted at /data in the container). -func runParquetChecks(dir, file string) error { - dataFile := "/data/" + file - - // --- Schema check --- +// parquetSchemaCheck verifies that all expectedParquetColumns appear in the +// DESCRIBE output of the parquet file, returning an error listing any missing ones. +func parquetSchemaCheck(dir, file, dataFile string) error { fmt.Println("--- Schema check ---") schemaOut, err := runClickHouseQuery(dir, file, fmt.Sprintf("DESCRIBE TABLE file('%s', Parquet)", dataFile)) @@ -1104,8 +1118,7 @@ func runParquetChecks(dir, file string) error { return err } fmt.Println(schemaOut) - // Verify all expected column names appear somewhere in the DESCRIBE output. - missing := []string{} + var missing []string for _, col := range expectedParquetColumns { if !strings.Contains(schemaOut, col) { missing = append(missing, col) @@ -1115,8 +1128,11 @@ func runParquetChecks(dir, file string) error { return fmt.Errorf("schema check FAIL: missing columns: %s", strings.Join(missing, ", ")) } fmt.Printf("Schema check PASS: all %d expected columns present\n\n", len(expectedParquetColumns)) + return nil +} - // --- Row count check --- +// parquetRowCountCheck asserts the file contains at least one row. +func parquetRowCountCheck(dir, file, dataFile string) error { fmt.Println("--- Row count check ---") countOut, err := runClickHouseQuery(dir, file, fmt.Sprintf("SELECT count(*) FROM file('%s', Parquet)", dataFile)) @@ -1131,8 +1147,11 @@ func runParquetChecks(dir, file string) error { return fmt.Errorf("row count check FAIL: got %d rows, expected > 0", rowCount) } fmt.Printf("Row count check PASS: %d rows\n\n", rowCount) + return nil +} - // --- Sanity check: seq and time_ns ranges --- +// parquetSanityCheck validates seq monotonicity and a non-zero min(time_ns). +func parquetSanityCheck(dir, file, dataFile string) error { fmt.Println("--- Sanity check ---") sanityOut, err := runClickHouseQuery(dir, file, fmt.Sprintf("SELECT min(seq), max(seq), min(time_ns), countIf(is_error) FROM file('%s', Parquet)", dataFile)) @@ -1158,10 +1177,22 @@ func runParquetChecks(dir, file string) error { } fmt.Printf("Sanity check PASS: seq range [%d, %d], min time_ns=%d, error_count=%s\n", minSeq, maxSeq, minTimeNs, strings.TrimSpace(parts[3])) - return nil } +// runParquetChecks runs schema, row-count, and sanity checks against the parquet file. +// dir is the absolute directory containing file (mounted at /data in the container). +func runParquetChecks(dir, file string) error { + dataFile := "/data/" + file + if err := parquetSchemaCheck(dir, file, dataFile); err != nil { + return err + } + if err := parquetRowCountCheck(dir, file, dataFile); err != nil { + return err + } + return parquetSanityCheck(dir, file, dataFile) +} + // --- Demo (VHS-driven TUI recordings) --------------------------------------- // // Demo regenerates GIFs and PNG screenshots under docs/tutorial/assets/ by |
