Spaces:
Running on Zero
Running on Zero
| """Subprocess pytest and coverage runner.""" | |
| from __future__ import annotations | |
| import re | |
| import subprocess | |
| import sys | |
| from dataclasses import dataclass | |
| from importlib.util import find_spec | |
| from pathlib import Path | |
| class RunResult: | |
| ok: bool | |
| passed: int | |
| failed: int | |
| errors: int | |
| coverage: float | None | |
| stdout: str | |
| stderr: str | |
| returncode: int | |
| def summary(self) -> str: | |
| parts = [] | |
| if self.passed: | |
| parts.append(f"{self.passed} passed") | |
| if self.failed: | |
| parts.append(f"{self.failed} failed") | |
| if self.errors: | |
| parts.append(f"{self.errors} errors") | |
| return ", ".join(parts) or "no tests" | |
| def run_pytest( | |
| workdir: str | Path, | |
| package_parent: str | Path | None = None, | |
| timeout: int = 30, | |
| with_coverage: bool = True, | |
| package_name: str = "legacy_repo", | |
| ) -> RunResult: | |
| """Run pytest in a subprocess and parse pass/fail/error counts.""" | |
| cwd = Path(workdir).resolve() | |
| command = [sys.executable, "-m", "pytest", "tests", "-q"] | |
| coverage_available = find_spec("pytest_cov") is not None | |
| if with_coverage and coverage_available: | |
| command.extend([f"--cov={package_name}", "--cov-report=term-missing"]) | |
| env = None | |
| if package_parent is not None: | |
| env = dict(__import__("os").environ) | |
| parent = str(Path(package_parent).resolve()) | |
| current = env.get("PYTHONPATH", "") | |
| env["PYTHONPATH"] = parent if not current else f"{parent}{__import__('os').pathsep}{current}" | |
| try: | |
| completed = subprocess.run( | |
| command, | |
| cwd=cwd, | |
| env=env, | |
| text=True, | |
| capture_output=True, | |
| timeout=timeout, | |
| check=False, | |
| ) | |
| except subprocess.TimeoutExpired as exc: | |
| return RunResult( | |
| ok=False, | |
| passed=0, | |
| failed=0, | |
| errors=1, | |
| coverage=None, | |
| stdout=exc.stdout or "", | |
| stderr=(exc.stderr or "") + f"\nTimed out after {timeout}s", | |
| returncode=124, | |
| ) | |
| stdout = completed.stdout | |
| stderr = completed.stderr | |
| passed, failed, errors = _parse_counts(stdout + "\n" + stderr) | |
| coverage = _parse_coverage(stdout) | |
| ok = completed.returncode == 0 and failed == 0 and errors == 0 | |
| return RunResult( | |
| ok=ok, | |
| passed=passed, | |
| failed=failed, | |
| errors=errors, | |
| coverage=coverage, | |
| stdout=stdout, | |
| stderr=stderr, | |
| returncode=completed.returncode, | |
| ) | |
| def _parse_counts(output: str) -> tuple[int, int, int]: | |
| passed = failed = errors = 0 | |
| for pattern, name in [ | |
| (r"(\d+)\s+passed", "passed"), | |
| (r"(\d+)\s+failed", "failed"), | |
| (r"(\d+)\s+errors?", "errors"), | |
| (r"(\d+)\s+error", "errors"), | |
| ]: | |
| matches = re.findall(pattern, output) | |
| if not matches: | |
| continue | |
| value = int(matches[-1]) | |
| if name == "passed": | |
| passed = value | |
| elif name == "failed": | |
| failed = value | |
| else: | |
| errors = value | |
| return passed, failed, errors | |
| def _parse_coverage(output: str) -> float | None: | |
| total_lines = [line for line in output.splitlines() if line.strip().startswith("TOTAL")] | |
| if not total_lines: | |
| return None | |
| match = re.search(r"(\d+(?:\.\d+)?)%", total_lines[-1]) | |
| return float(match.group(1)) if match else None | |