Spaces:
Running
Running
File size: 3,695 Bytes
df04431 6cf7e7c 3ac681e 6cf7e7c 3ac681e df04431 3ac681e 6cf7e7c 3ac681e 6cf7e7c 3ac681e df04431 3ac681e b42c3ef 3ac681e b42c3ef 3ac681e b42c3ef df04431 b42c3ef df04431 b42c3ef 3ac681e 6cf7e7c 3ac681e 6cf7e7c 3ac681e 6cf7e7c 3ac681e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | import threading
from typing import Any, Dict
from lean_interact import Command, LeanREPLConfig, LeanServer, TempRequireProject
class LeanEnvironment:
"""
Manages the Lean REPL environment for verifying Lean 4 proofs.
Safe to share across threads: `verify_proof` serializes concurrent calls
behind a lock, because the underlying LeanServer is a single-process
stdin/stdout pipe and would otherwise interleave commands.
"""
def __init__(self, use_mathlib: bool = True, lean_version: str = "v4.8.0"):
"""
Initializes the Lean environment.
Args:
use_mathlib (bool): If True, configures a TempRequireProject with Mathlib.
This may take a while to build on the first run.
lean_version (str): The Lean 4 version to use. Default is v4.8.0.
"""
self.lean_version = lean_version
self.use_mathlib = use_mathlib
if self.use_mathlib:
# We use TempRequireProject with mathlib as specified in lean_interact documentation
project = TempRequireProject(
lean_version=self.lean_version,
require="mathlib"
)
self.config = LeanREPLConfig(project=project)
else:
self.config = LeanREPLConfig(lean_version=self.lean_version)
self.server = LeanServer(self.config)
self._lock = threading.Lock()
def verify_proof(self, lean_code: str) -> Dict[str, Any]:
"""
Executes a block of Lean code and verifies if it is a correct proof.
Args:
lean_code (str): The full Lean 4 code string containing imports, theorem statement, and proof.
Returns:
dict: A dictionary containing the status, errors (if any), and goals (if open sorries remain).
"""
# Guard: empty/whitespace input would crash `Command(cmd=...)` with a
# pydantic ValidationError (min_length=1). Return a clean failure.
if not lean_code or not lean_code.strip():
return {
"status": "failure",
"errors": ["empty Lean code: nothing to verify"],
"goals": [],
"env": None,
}
# Guard: LeanServer can raise on disconnect / crash / OOM. Surface as a
# failure result so the agent's retry loop continues instead of dying.
# The lock serializes concurrent requests since LeanServer is single-process.
try:
with self._lock:
response = self.server.run(Command(cmd=lean_code))
except Exception as exc:
return {
"status": "failure",
"errors": [f"Lean server error: {exc}"],
"goals": [],
"env": None,
}
errors = []
goals = []
# Check for error or warning messages
if hasattr(response, 'messages') and response.messages:
for msg in response.messages:
if msg.severity in ['error', 'warning']:
# E.g., 'declaration uses 'sorry'' is a warning, but we might want to capture it
errors.append(msg.data)
# Check for open goals (sorries)
if hasattr(response, 'sorries') and response.sorries:
for sorry in response.sorries:
if sorry.goal:
goals.append(sorry.goal)
is_success = len(errors) == 0 and len(goals) == 0
return {
"status": "success" if is_success else "failure",
"errors": errors,
"goals": goals,
"env": getattr(response, "env", None)
}
|