"""StdinBackend: local Lean verification via `lean --stdin`. Requires a prior `cd lean && lake build` so the .olean files for all spec modules exist. _ensure_lake_build() auto-triggers the build on first use if the artifacts are absent — useful in Docker where the build runs at image build time and this guard is a no-op. """ from __future__ import annotations import os import shutil import subprocess import textwrap import time from pathlib import Path from .interface import LeanBackend, LeanResult REPO_ROOT = Path(__file__).resolve().parents[1] DEFAULT_LEAN_BIN = Path(shutil.which("lean") or Path.home() / ".elan/bin/lean") DEFAULT_LEAN_CWD = REPO_ROOT / "lean" DEFAULT_LEAN_PATH = DEFAULT_LEAN_CWD / ".lake" / "build" / "lib" DEFAULT_LAKE_BIN = Path(shutil.which("lake") or Path.home() / ".elan/bin/lake") LEAN_BIN = Path(os.environ.get("LEAN_BIN", str(DEFAULT_LEAN_BIN))) LEAN_CWD = Path(os.environ.get("LEAN_CWD", str(DEFAULT_LEAN_CWD))) LEAN_PATH = os.environ.get("LEAN_PATH", str(DEFAULT_LEAN_PATH)) LAKE_BIN = Path(os.environ.get("LAKE_BIN", str(DEFAULT_LAKE_BIN))) if not LEAN_CWD.exists(): LEAN_CWD = DEFAULT_LEAN_CWD if not Path(LEAN_PATH).exists(): LEAN_PATH = str(DEFAULT_LEAN_PATH) _LEAN_BUILD_READY = False def _get_spec_modules() -> list[str]: from lean_migrate.env.tasks import _TASKS return list(dict.fromkeys(t.lean_spec_module for t in _TASKS.values())) def _ensure_lake_build() -> None: global _LEAN_BUILD_READY if _LEAN_BUILD_READY: return modules = _get_spec_modules() lean_path = Path(LEAN_PATH) if all((lean_path / f"{m}.olean").exists() for m in modules): _LEAN_BUILD_READY = True return process = subprocess.run( [str(LAKE_BIN), "build"] + modules, capture_output=True, text=True, cwd=str(LEAN_CWD), env={**os.environ, "LEAN_PATH": LEAN_PATH}, ) if process.returncode != 0: raise RuntimeError( "Lean build failed before verification.\n" f"stdout:\n{process.stdout}\n" f"stderr:\n{process.stderr}" ) _LEAN_BUILD_READY = True class StdinBackend(LeanBackend): def _run_lean(self, code: str, timeout: int = 15) -> LeanResult: _ensure_lake_build() start_time = time.monotonic() try: process = subprocess.run( [str(LEAN_BIN), "--stdin"], input=code, capture_output=True, text=True, timeout=timeout, cwd=str(LEAN_CWD), env={**os.environ, "LEAN_PATH": LEAN_PATH}, ) elapsed_ms = int((time.monotonic() - start_time) * 1000) passed = process.returncode == 0 and "error:" not in process.stderr.lower() error_output = process.stderr.strip() or process.stdout.strip() return LeanResult( passed=passed, error="" if passed else error_output, latency_ms=elapsed_ms, raw_stderr=process.stderr, ) except subprocess.TimeoutExpired: elapsed_ms = int((time.monotonic() - start_time) * 1000) return LeanResult( passed=False, error=f"LEAN verification timed out after {timeout} seconds.", latency_ms=elapsed_ms, raw_stderr="", ) except Exception as error: elapsed_ms = int((time.monotonic() - start_time) * 1000) return LeanResult( passed=False, error=f"Backend error: {error}", latency_ms=elapsed_ms, raw_stderr="", ) def verify( self, spec_module: str, function_name: str, code: str, symbol_name: str | None = None, extra_imports: list[str] | None = None, sample_checks: list[str] | None = None, ) -> LeanResult: module_name = f"lean.{spec_module}" if not spec_module.startswith("lean.") else spec_module import_lines = [f"import {module_name}"] for module_name in extra_imports or []: prefixed_name = ( module_name if module_name.startswith("lean.") else f"lean.{module_name}" ) import_lines.append(f"import {prefixed_name}") sections = [ "\n".join(import_lines), code, ] if sample_checks: sections.extend(sample_checks) sections.append(f"#check {symbol_name or f'_root_.{function_name}'}") lean_code = "\n\n".join(section for section in sections if section.strip()) return self._run_lean(lean_code) def verify_proof(self, spec_module: str, proof_code: str) -> LeanResult: if "sorry" in proof_code: return LeanResult( passed=False, error="Proof contains 'sorry' and cannot be accepted.", latency_ms=0, raw_stderr="", ) lean_code = textwrap.dedent( f""" import lean.{spec_module} open {spec_module} {proof_code} """ ).strip() return self._run_lean(lean_code, timeout=30)