lean-migrate / lean_backend /stdin_backend.py
Hrushi's picture
Upload folder using huggingface_hub
bf9c466 verified
"""StdinBackend: local Lean verification via `lean --stdin`.
Requires a prior `cd lean && lake build` so the .olean files for all spec
modules exist. _ensure_lake_build() auto-triggers the build on first use
if the artifacts are absent — useful in Docker where the build runs at image
build time and this guard is a no-op.
"""
from __future__ import annotations
import os
import shutil
import subprocess
import textwrap
import time
from pathlib import Path
from .interface import LeanBackend, LeanResult
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_LEAN_BIN = Path(shutil.which("lean") or Path.home() / ".elan/bin/lean")
DEFAULT_LEAN_CWD = REPO_ROOT / "lean"
DEFAULT_LEAN_PATH = DEFAULT_LEAN_CWD / ".lake" / "build" / "lib"
DEFAULT_LAKE_BIN = Path(shutil.which("lake") or Path.home() / ".elan/bin/lake")
LEAN_BIN = Path(os.environ.get("LEAN_BIN", str(DEFAULT_LEAN_BIN)))
LEAN_CWD = Path(os.environ.get("LEAN_CWD", str(DEFAULT_LEAN_CWD)))
LEAN_PATH = os.environ.get("LEAN_PATH", str(DEFAULT_LEAN_PATH))
LAKE_BIN = Path(os.environ.get("LAKE_BIN", str(DEFAULT_LAKE_BIN)))
if not LEAN_CWD.exists():
LEAN_CWD = DEFAULT_LEAN_CWD
if not Path(LEAN_PATH).exists():
LEAN_PATH = str(DEFAULT_LEAN_PATH)
_LEAN_BUILD_READY = False
def _get_spec_modules() -> list[str]:
from lean_migrate.env.tasks import _TASKS
return list(dict.fromkeys(t.lean_spec_module for t in _TASKS.values()))
def _ensure_lake_build() -> None:
global _LEAN_BUILD_READY
if _LEAN_BUILD_READY:
return
modules = _get_spec_modules()
lean_path = Path(LEAN_PATH)
if all((lean_path / f"{m}.olean").exists() for m in modules):
_LEAN_BUILD_READY = True
return
process = subprocess.run(
[str(LAKE_BIN), "build"] + modules,
capture_output=True,
text=True,
cwd=str(LEAN_CWD),
env={**os.environ, "LEAN_PATH": LEAN_PATH},
)
if process.returncode != 0:
raise RuntimeError(
"Lean build failed before verification.\n"
f"stdout:\n{process.stdout}\n"
f"stderr:\n{process.stderr}"
)
_LEAN_BUILD_READY = True
class StdinBackend(LeanBackend):
def _run_lean(self, code: str, timeout: int = 15) -> LeanResult:
_ensure_lake_build()
start_time = time.monotonic()
try:
process = subprocess.run(
[str(LEAN_BIN), "--stdin"],
input=code,
capture_output=True,
text=True,
timeout=timeout,
cwd=str(LEAN_CWD),
env={**os.environ, "LEAN_PATH": LEAN_PATH},
)
elapsed_ms = int((time.monotonic() - start_time) * 1000)
passed = process.returncode == 0 and "error:" not in process.stderr.lower()
error_output = process.stderr.strip() or process.stdout.strip()
return LeanResult(
passed=passed,
error="" if passed else error_output,
latency_ms=elapsed_ms,
raw_stderr=process.stderr,
)
except subprocess.TimeoutExpired:
elapsed_ms = int((time.monotonic() - start_time) * 1000)
return LeanResult(
passed=False,
error=f"LEAN verification timed out after {timeout} seconds.",
latency_ms=elapsed_ms,
raw_stderr="",
)
except Exception as error:
elapsed_ms = int((time.monotonic() - start_time) * 1000)
return LeanResult(
passed=False,
error=f"Backend error: {error}",
latency_ms=elapsed_ms,
raw_stderr="",
)
def verify(
self,
spec_module: str,
function_name: str,
code: str,
symbol_name: str | None = None,
extra_imports: list[str] | None = None,
sample_checks: list[str] | None = None,
) -> LeanResult:
module_name = f"lean.{spec_module}" if not spec_module.startswith("lean.") else spec_module
import_lines = [f"import {module_name}"]
for module_name in extra_imports or []:
prefixed_name = (
module_name
if module_name.startswith("lean.")
else f"lean.{module_name}"
)
import_lines.append(f"import {prefixed_name}")
sections = [
"\n".join(import_lines),
code,
]
if sample_checks:
sections.extend(sample_checks)
sections.append(f"#check {symbol_name or f'_root_.{function_name}'}")
lean_code = "\n\n".join(section for section in sections if section.strip())
return self._run_lean(lean_code)
def verify_proof(self, spec_module: str, proof_code: str) -> LeanResult:
if "sorry" in proof_code:
return LeanResult(
passed=False,
error="Proof contains 'sorry' and cannot be accepted.",
latency_ms=0,
raw_stderr="",
)
lean_code = textwrap.dedent(
f"""
import lean.{spec_module}
open {spec_module}
{proof_code}
"""
).strip()
return self._run_lean(lean_code, timeout=30)