"""Mutation engine. Pure-Python AST-based mutation operators (no mutmut dependency at runtime because mutmut doesn't run cleanly on native Windows). Five operator classes are produced: NCR (numeric constant), BCR (boolean constant), ROR (relational op), LCR (logical connector), AOR (arithmetic op replacement). The engine is deterministic: identical input source produces identical mutant ordering and identical mutant ids. Each `Mutant.mutated_source` is the full mutated module source ready to be written to disk and re-tested. ``baseline_report`` runs the existing test suite against every mutant. It delegates to :meth:`mutant_hunter.safety.sandbox.Sandbox.make_workspace` so the per-mutant copy uses the same option-B layout as the live env: the package source lands at ``//``, tests are hoisted to ``//``, and ``PYTHONPATH=`` is prepended on every subprocess so the workspace beats any editable install. """ from __future__ import annotations import ast import copy import os import subprocess import sys from dataclasses import dataclass from pathlib import Path from typing import Callable from mutant_hunter.models import Mutant from mutant_hunter.safety.sandbox import Sandbox @dataclass(frozen=True) class MutationReport: total_mutants: int killed_mutants: list[str] survived_mutants: list[Mutant] @property def killed(self) -> int: return len(self.killed_mutants) @property def survived(self) -> int: return len(self.survived_mutants) @property def mutation_score(self) -> float: if self.total_mutants <= 0: return 0.0 return self.killed / self.total_mutants # --- Operator helpers ------------------------------------------------------- _COMPARE_FLIP: dict[type[ast.cmpop], type[ast.cmpop]] = { ast.Eq: ast.NotEq, ast.NotEq: ast.Eq, ast.Lt: ast.LtE, ast.LtE: ast.Lt, ast.Gt: ast.GtE, ast.GtE: ast.Gt, ast.In: ast.NotIn, ast.NotIn: ast.In, ast.Is: ast.IsNot, ast.IsNot: ast.Is, } _ARITH_FLIP: dict[type[ast.operator], type[ast.operator]] = { ast.Add: ast.Sub, ast.Sub: ast.Add, ast.Mult: ast.FloorDiv, ast.FloorDiv: ast.Mult, ast.Div: ast.Mult, ast.Mod: ast.Mult, } @dataclass(frozen=True) class _Candidate: operator: str original: str mutated: str line: int column: int apply: Callable[[ast.AST], None] """Function applied to the equivalent node in a fresh deep-copy of the tree.""" def _apply_constant(new_value: object) -> Callable[[ast.AST], None]: def _f(node: ast.AST) -> None: assert isinstance(node, ast.Constant) node.value = new_value return _f def _apply_compare_op(new_op_cls: type[ast.cmpop]) -> Callable[[ast.AST], None]: def _f(node: ast.AST) -> None: assert isinstance(node, ast.Compare) node.ops[0] = new_op_cls() return _f def _apply_boolop(new_op_cls: type[ast.boolop]) -> Callable[[ast.AST], None]: def _f(node: ast.AST) -> None: assert isinstance(node, ast.BoolOp) node.op = new_op_cls() return _f def _apply_binop(new_op_cls: type[ast.operator]) -> Callable[[ast.AST], None]: def _f(node: ast.AST) -> None: assert isinstance(node, ast.BinOp) node.op = new_op_cls() return _f # --- Engine ----------------------------------------------------------------- class MutationEngine: """Deterministic AST mutation engine.""" def generate_mutants(self, source: str) -> list[Mutant]: """Return one `Mutant` per single-point mutation. Each mutant's `mutated_source` is a complete, syntactically-valid Python source string ready to drop in place of the original module. """ original_tree = ast.parse(source) nodes_in_order = list(ast.walk(original_tree)) candidates: list[tuple[int, _Candidate]] = [] for idx, node in enumerate(nodes_in_order): for cand in self._candidates_for_node(node): candidates.append((idx, cand)) mutants: list[Mutant] = [] for mid, (idx, cand) in enumerate(candidates): new_tree = copy.deepcopy(original_tree) target = list(ast.walk(new_tree))[idx] cand.apply(target) ast.fix_missing_locations(new_tree) mutated_source = ast.unparse(new_tree) if mutated_source == source: # Defensive: if the mutation collapsed (e.g. no observable # change), skip it rather than emit a useless duplicate. continue mutants.append( Mutant( mutant_id=f"{cand.operator}-{cand.line}-{cand.column}-{mid}", operator=cand.operator, line=cand.line, column=cand.column, original=cand.original, mutated=cand.mutated, mutated_source=mutated_source, ) ) return mutants def _candidates_for_node(self, node: ast.AST) -> list[_Candidate]: line = getattr(node, "lineno", 1) col = getattr(node, "col_offset", 0) out: list[_Candidate] = [] if isinstance(node, ast.Constant): v = node.value # bool is a subclass of int, so check it first if isinstance(v, bool): out.append( _Candidate( operator="BCR", original=str(v), mutated=str(not v), line=line, column=col, apply=_apply_constant(not v), ) ) elif isinstance(v, int): out.append( _Candidate( operator="NCR", original=repr(v), mutated=repr(v + 1), line=line, column=col, apply=_apply_constant(v + 1), ) ) if v != 0: out.append( _Candidate( operator="NCR", original=repr(v), mutated="0", line=line, column=col, apply=_apply_constant(0), ) ) elif isinstance(v, float): out.append( _Candidate( operator="NCR", original=repr(v), mutated=repr(v + 1.0), line=line, column=col, apply=_apply_constant(v + 1.0), ) ) if v != 0.0: out.append( _Candidate( operator="NCR", original=repr(v), mutated="0.0", line=line, column=col, apply=_apply_constant(0.0), ) ) elif isinstance(node, ast.Compare) and len(node.ops) == 1: op_cls = type(node.ops[0]) new_cls = _COMPARE_FLIP.get(op_cls) if new_cls is not None: out.append( _Candidate( operator="ROR", original=op_cls.__name__, mutated=new_cls.__name__, line=line, column=col, apply=_apply_compare_op(new_cls), ) ) elif isinstance(node, ast.BoolOp): if isinstance(node.op, ast.And): out.append( _Candidate( operator="LCR", original="and", mutated="or", line=line, column=col, apply=_apply_boolop(ast.Or), ) ) elif isinstance(node.op, ast.Or): out.append( _Candidate( operator="LCR", original="or", mutated="and", line=line, column=col, apply=_apply_boolop(ast.And), ) ) elif isinstance(node, ast.BinOp): new_cls = _ARITH_FLIP.get(type(node.op)) if new_cls is not None: out.append( _Candidate( operator="AOR", original=type(node.op).__name__, mutated=new_cls.__name__, line=line, column=col, apply=_apply_binop(new_cls), ) ) return out # --- Test runners ------------------------------------------------------- def run_pytest( self, repo_dir: Path, *, timeout_s: float = 20.0, extra_pytest_args: list[str] | None = None, env: dict[str, str] | None = None, ) -> subprocess.CompletedProcess[str]: args = [sys.executable, "-m", "pytest", "-q", "--no-header", "-x"] if extra_pytest_args: args.extend(extra_pytest_args) return subprocess.run( args, cwd=str(repo_dir), text=True, capture_output=True, timeout=timeout_s, env=env, ) def baseline_report( self, *, repo_dir: Path, module_relpath: Path, test_dir: str = "tests", package_name: str | None = None, timeout_s: float = 20.0, progress_every: int = 10, ) -> MutationReport: """Run the existing suite against every mutant. Killed = exit != 0. ``package_name`` controls the workspace layout (see :meth:`Sandbox.make_workspace`). Pass the corpus repo name (which matches the dotted module's first segment) for real corpus modules; leave it ``None`` for the synthetic flat-``m.py`` fixtures used by the unit tests. """ source_path = repo_dir / module_relpath source = source_path.read_text(encoding="utf-8") mutants = self.generate_mutants(source) killed: list[str] = [] survived: list[Mutant] = [] workspace = Sandbox.make_workspace( repo_dir, package_name=package_name, test_dir=test_dir, ) try: if package_name is None: tmp_source = workspace / module_relpath else: tmp_source = workspace / package_name / module_relpath original_text = tmp_source.read_text(encoding="utf-8") base_env = os.environ.copy() base_env["PYTHONPATH"] = str(workspace) + ( os.pathsep + base_env["PYTHONPATH"] if base_env.get("PYTHONPATH") else "" ) base_env["PYTHONNOUSERSITE"] = "1" base_env["PYTHONDONTWRITEBYTECODE"] = "1" for i, m in enumerate(mutants, start=1): assert m.mutated_source is not None tmp_source.write_text(m.mutated_source, encoding="utf-8") try: res = self.run_pytest( workspace, timeout_s=timeout_s, extra_pytest_args=[test_dir], env=base_env, ) is_killed = res.returncode != 0 except subprocess.TimeoutExpired: is_killed = True finally: tmp_source.write_text(original_text, encoding="utf-8") if is_killed: killed.append(m.mutant_id) else: survived.append(m) if progress_every and i % progress_every == 0: print( f" mutants: {i}/{len(mutants)} killed={len(killed)} survived={len(survived)}", flush=True, ) finally: import shutil as _shutil _shutil.rmtree(workspace.parent, ignore_errors=True) return MutationReport( total_mutants=len(mutants), killed_mutants=killed, survived_mutants=survived, )