TestForge / runner.py
perceptron01's picture
Upload 24 files
c4f3819 verified
Raw
History Blame Contribute Delete
3.5 kB
"""Subprocess pytest and coverage runner."""
from __future__ import annotations
import re
import subprocess
import sys
from dataclasses import dataclass
from importlib.util import find_spec
from pathlib import Path
@dataclass(frozen=True)
class RunResult:
ok: bool
passed: int
failed: int
errors: int
coverage: float | None
stdout: str
stderr: str
returncode: int
@property
def summary(self) -> str:
parts = []
if self.passed:
parts.append(f"{self.passed} passed")
if self.failed:
parts.append(f"{self.failed} failed")
if self.errors:
parts.append(f"{self.errors} errors")
return ", ".join(parts) or "no tests"
def run_pytest(
workdir: str | Path,
package_parent: str | Path | None = None,
timeout: int = 30,
with_coverage: bool = True,
package_name: str = "legacy_repo",
) -> RunResult:
"""Run pytest in a subprocess and parse pass/fail/error counts."""
cwd = Path(workdir).resolve()
command = [sys.executable, "-m", "pytest", "tests", "-q"]
coverage_available = find_spec("pytest_cov") is not None
if with_coverage and coverage_available:
command.extend([f"--cov={package_name}", "--cov-report=term-missing"])
env = None
if package_parent is not None:
env = dict(__import__("os").environ)
parent = str(Path(package_parent).resolve())
current = env.get("PYTHONPATH", "")
env["PYTHONPATH"] = parent if not current else f"{parent}{__import__('os').pathsep}{current}"
try:
completed = subprocess.run(
command,
cwd=cwd,
env=env,
text=True,
capture_output=True,
timeout=timeout,
check=False,
)
except subprocess.TimeoutExpired as exc:
return RunResult(
ok=False,
passed=0,
failed=0,
errors=1,
coverage=None,
stdout=exc.stdout or "",
stderr=(exc.stderr or "") + f"\nTimed out after {timeout}s",
returncode=124,
)
stdout = completed.stdout
stderr = completed.stderr
passed, failed, errors = _parse_counts(stdout + "\n" + stderr)
coverage = _parse_coverage(stdout)
ok = completed.returncode == 0 and failed == 0 and errors == 0
return RunResult(
ok=ok,
passed=passed,
failed=failed,
errors=errors,
coverage=coverage,
stdout=stdout,
stderr=stderr,
returncode=completed.returncode,
)
def _parse_counts(output: str) -> tuple[int, int, int]:
passed = failed = errors = 0
for pattern, name in [
(r"(\d+)\s+passed", "passed"),
(r"(\d+)\s+failed", "failed"),
(r"(\d+)\s+errors?", "errors"),
(r"(\d+)\s+error", "errors"),
]:
matches = re.findall(pattern, output)
if not matches:
continue
value = int(matches[-1])
if name == "passed":
passed = value
elif name == "failed":
failed = value
else:
errors = value
return passed, failed, errors
def _parse_coverage(output: str) -> float | None:
total_lines = [line for line in output.splitlines() if line.strip().startswith("TOTAL")]
if not total_lines:
return None
match = re.search(r"(\d+(?:\.\d+)?)%", total_lines[-1])
return float(match.group(1)) if match else None