VeriDeepResearch / tools.py
Vilin97's picture
Add repair_lean_proofs tool for instant sorry-filling
f3ef14f
from __future__ import annotations
import json
import os
import tarfile
import tempfile
import httpx
from config import (
THEOREM_SEARCH_BASE_URL,
LEAN_EXPLORE_BASE_URL,
AXLE_API_KEY,
AXLE_BASE_URL,
LEAN_ENVIRONMENT,
ARISTOTLE_API_KEY,
)
# ---------------------------------------------------------------------------
# TheoremSearch
# ---------------------------------------------------------------------------
async def search_theorems(query: str, n_results: int = 5) -> str:
try:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
f"{THEOREM_SEARCH_BASE_URL}/search",
json={"query": query, "n_results": n_results},
)
if response.status_code != 200:
return json.dumps({"error": f"TheoremSearch unavailable (HTTP {response.status_code})"})
data = response.json()
results = []
for thm in data.get("theorems", []):
results.append({
"name": thm.get("name", ""),
"body": thm.get("body", "")[:500],
"slogan": thm.get("slogan", ""),
"paper_title": thm.get("paper", {}).get("title", ""),
"link": thm.get("link", ""),
})
return json.dumps(results, indent=2)
except Exception as e:
return json.dumps({"error": f"TheoremSearch error: {e}"})
# ---------------------------------------------------------------------------
# LeanExplore
# ---------------------------------------------------------------------------
async def search_lean_library(query: str) -> str:
try:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(
f"{LEAN_EXPLORE_BASE_URL}/search",
params={"q": query},
)
if response.status_code != 200:
return json.dumps({"error": f"LeanExplore unavailable (HTTP {response.status_code})"})
data = response.json()
results = []
for decl in data.get("results", [])[:10]:
results.append({
"name": decl.get("name", ""),
"module": decl.get("module", ""),
"source_text": decl.get("source_text", "")[:400],
"informalization": decl.get("informalization", "")[:300],
})
return json.dumps(results, indent=2)
except Exception as e:
return json.dumps({"error": f"LeanExplore error: {e}"})
# ---------------------------------------------------------------------------
# Loogle (type-based Mathlib search)
# ---------------------------------------------------------------------------
async def search_loogle(query: str) -> str:
"""Search Mathlib by type signature pattern via Loogle."""
try:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(
"https://loogle.lean-lang.org/json",
params={"q": query},
)
if response.status_code != 200:
return json.dumps({"error": f"Loogle unavailable (HTTP {response.status_code})"})
data = response.json()
if "error" in data:
return json.dumps({"error": data["error"], "suggestions": data.get("suggestions", [])})
hits = data.get("hits", [])[:10]
results = []
for h in hits:
results.append({
"name": h.get("name", ""),
"type": h.get("type", ""),
"module": h.get("module", ""),
"doc": (h.get("doc") or "")[:200],
})
return json.dumps({"count": data.get("count", 0), "results": results}, indent=2)
except Exception as e:
return json.dumps({"error": f"Loogle error: {e}"})
# ---------------------------------------------------------------------------
# Axle (Lean verification)
# ---------------------------------------------------------------------------
async def check_lean_code(code: str) -> str:
try:
async with httpx.AsyncClient(timeout=180) as client:
response = await client.post(
f"{AXLE_BASE_URL}/check",
headers={
"Authorization": f"Bearer {AXLE_API_KEY}",
"Content-Type": "application/json",
},
json={
"content": code,
"environment": LEAN_ENVIRONMENT,
"timeout_seconds": 120,
},
)
if response.status_code != 200:
return json.dumps({"error": f"Axle HTTP {response.status_code}: {response.text[:500]}"})
data = response.json()
if "user_error" in data:
return json.dumps({"error": data["user_error"]})
return json.dumps({
"okay": data.get("okay", False),
"errors": data.get("lean_messages", {}).get("errors", []),
"warnings": data.get("lean_messages", {}).get("warnings", []),
"tool_errors": data.get("tool_messages", {}).get("errors", []),
"tool_infos": data.get("tool_messages", {}).get("infos", []),
}, indent=2)
except Exception as e:
return json.dumps({"error": f"Axle error: {e}"})
async def repair_lean_proofs(code: str) -> str:
"""Use Axle repair_proofs to automatically fill sorries with grind/simp/omega."""
try:
async with httpx.AsyncClient(timeout=180) as client:
response = await client.post(
f"{AXLE_BASE_URL}/repair_proofs",
headers={
"Authorization": f"Bearer {AXLE_API_KEY}",
"Content-Type": "application/json",
},
json={
"content": code,
"environment": LEAN_ENVIRONMENT,
"terminal_tactics": ["grind", "simp_all", "omega", "norm_num", "nlinarith", "aesop"],
"timeout_seconds": 60,
},
)
if response.status_code != 200:
return json.dumps({"error": f"Axle HTTP {response.status_code}: {response.text[:500]}"})
data = response.json()
repaired_code = data.get("content", code)
stats = data.get("repair_stats", {})
has_sorry = "sorry" in repaired_code
return json.dumps({
"repaired_code": repaired_code,
"repairs_applied": stats.get("apply_terminal_tactics", 0),
"still_has_sorry": has_sorry,
"okay": data.get("okay", False),
}, indent=2)
except Exception as e:
return json.dumps({"error": f"repair_proofs error: {e}"})
async def extract_sorry_lemmas(code: str) -> str:
"""Use Axle sorry2lemma to extract sorry'd subgoals into standalone lemma stubs."""
try:
async with httpx.AsyncClient(timeout=180) as client:
response = await client.post(
f"{AXLE_BASE_URL}/sorry2lemma",
headers={
"Authorization": f"Bearer {AXLE_API_KEY}",
"Content-Type": "application/json",
},
json={
"content": code,
"environment": LEAN_ENVIRONMENT,
"extract_sorries": True,
"extract_errors": False,
"reconstruct_callsite": True,
"timeout_seconds": 120,
},
)
if response.status_code != 200:
return json.dumps({"error": f"Axle HTTP {response.status_code}: {response.text[:500]}"})
data = response.json()
lemma_names = data.get("lemma_names", [])
content = data.get("content", "")
return json.dumps({
"lemma_names": lemma_names,
"decomposed_code": content,
"num_lemmas": len(lemma_names),
}, indent=2)
except Exception as e:
return json.dumps({"error": f"sorry2lemma error: {e}"})
# ---------------------------------------------------------------------------
# Aristotle (Lean formalization & proving)
# ---------------------------------------------------------------------------
async def submit_to_aristotle(prompt: str) -> str:
"""Submit a prompt to Aristotle. Returns project info JSON."""
import aristotlelib
from aristotlelib import Project
aristotlelib.set_api_key(ARISTOTLE_API_KEY)
try:
project = await Project.create(prompt=prompt)
return json.dumps({
"project_id": project.project_id,
"status": project.status.value,
})
except Exception as e:
return json.dumps({"error": f"Aristotle submit error: {e}"})
async def check_aristotle_status(project_id: str) -> dict:
"""Check the status of an Aristotle project. Returns a dict (non-blocking)."""
import aristotlelib
from aristotlelib import Project
aristotlelib.set_api_key(ARISTOTLE_API_KEY)
try:
project = await Project.from_id(project_id)
return {
"project_id": project.project_id,
"status": project.status.value,
"percent_complete": project.percent_complete,
}
except Exception as e:
return {"error": str(e)}
async def get_aristotle_result(project_id: str) -> str:
"""Download and return the Lean code from a completed Aristotle project."""
import aristotlelib
from aristotlelib import Project
aristotlelib.set_api_key(ARISTOTLE_API_KEY)
try:
project = await Project.from_id(project_id)
with tempfile.TemporaryDirectory() as tmpdir:
result_path = await project.get_solution(
destination=os.path.join(tmpdir, "result")
)
result_path = str(result_path)
if result_path.endswith(".tar.gz") or result_path.endswith(".tgz"):
extract_dir = os.path.join(tmpdir, "extracted")
os.makedirs(extract_dir, exist_ok=True)
with tarfile.open(result_path) as tar:
tar.extractall(extract_dir)
lean_contents = []
for root, _dirs, files in os.walk(extract_dir):
for f in sorted(files):
if f.endswith(".lean"):
with open(os.path.join(root, f)) as fh:
lean_contents.append(fh.read())
if not lean_contents:
return "No Lean files found in Aristotle result."
return "\n\n".join(lean_contents)
else:
with open(result_path) as f:
return f.read()
except Exception as e:
return f"Error downloading Aristotle result: {e}"
# ---------------------------------------------------------------------------
# Tool definitions for OpenAI function calling
# ---------------------------------------------------------------------------
TOOL_DEFINITIONS = [
{
"type": "function",
"function": {
"name": "search_theorems",
"description": (
"Search for mathematical theorems on arXiv using TheoremSearch. "
"Returns theorem statements, slogans, and paper references."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language search query about the mathematical topic",
}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "search_lean_library",
"description": (
"Search Mathlib (Lean 4 math library) for relevant declarations, "
"theorems, and definitions. Use to find existing Lean names and API."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Search query - a Lean name like 'Nat.add_comm' "
"or natural language like 'sum of even numbers is even'"
),
}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "search_loogle",
"description": (
"Search Mathlib by TYPE SIGNATURE pattern using Loogle. "
"Use this to find lemmas by their type shape. Examples:\n"
"- 'List.map' (by name)\n"
"- 'Nat -> Nat -> Nat' (by type)\n"
"- '_ + _ = _ + _' (by pattern)\n"
"- 'List.map (_ ∘ _) _ = _' (mixed)\n"
"Returns declaration names, types, and modules."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Loogle query — a name, type pattern, or expression pattern",
}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "check_lean_code",
"description": (
"Quickly check if Lean 4 code compiles using the Axle verifier (seconds). "
"Code MUST start with 'import Mathlib'. Use for fast verification."
),
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Complete Lean 4 code. Must start with 'import Mathlib'.",
}
},
"required": ["code"],
},
},
},
{
"type": "function",
"function": {
"name": "submit_to_aristotle",
"description": (
"Submit a proof request to Aristotle (automated theorem prover). "
"Aristotle proves graduate/research-level math in Lean 4. "
"Returns a project_id immediately — use check_aristotle_status to poll "
"and get_aristotle_result when COMPLETE.\n\n"
"PREFERRED FORMAT: Lean 4 code with sorry placeholders. This preserves "
"the exact theorem signature and lets Aristotle focus on filling proofs:\n"
" 'Fill in the sorries:\\n```lean\\nimport Mathlib\\n\\ntheorem my_thm ... := by\\n sorry\\n```'\n\n"
"ALTERNATIVE FORMATS (when you don't have a Lean statement yet):\n"
"- Natural language description of the result to prove\n"
"- Sub-lemmas that the result decomposes into\n\n"
"SUBMIT MULTIPLE JOBS for different sub-lemmas or strategies.\n"
"After submitting, continue researching — do NOT wait immediately."
),
"parameters": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": (
"PREFERRED: Lean 4 code with `sorry` in place of proofs. "
"ALTERNATIVE: Natural language description of the theorem to prove."
),
}
},
"required": ["prompt"],
},
},
},
{
"type": "function",
"function": {
"name": "check_aristotle_status",
"description": (
"Non-blocking check of an Aristotle project's status. "
"Returns status and percent_complete. Use this to poll while "
"doing other work."
),
"parameters": {
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "The project_id from submit_to_aristotle.",
}
},
"required": ["project_id"],
},
},
},
# wait_for_aristotle REMOVED — it blocked the agent for 2+ hours.
# Agent must use check_aristotle_status (non-blocking) + get_aristotle_result instead.
{
"type": "function",
"function": {
"name": "get_aristotle_result",
"description": (
"Download the Lean code from a COMPLETED Aristotle project. "
"Only call after check_aristotle_status shows COMPLETE."
),
"parameters": {
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "The project_id of a completed Aristotle project.",
}
},
"required": ["project_id"],
},
},
},
{
"type": "function",
"function": {
"name": "repair_lean_proofs",
"description": (
"Automatically attempt to fill sorry placeholders with automation tactics "
"(grind, simp, omega, norm_num, nlinarith, aesop). FAST (~1-2 seconds). "
"Call this BEFORE submitting to Aristotle — it may resolve simple sorries instantly. "
"Returns repaired code and whether sorry remains."
),
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Lean 4 code with sorry placeholders to attempt repair.",
}
},
"required": ["code"],
},
},
},
{
"type": "function",
"function": {
"name": "extract_sorry_lemmas",
"description": (
"Extract sorry'd subgoals from Lean code into standalone lemma stubs. "
"Use this when you have code that compiles with sorry — it decomposes the sorry "
"into independent lemma statements that can each be submitted to Aristotle. "
"Returns the lemma names and decomposed code."
),
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Lean 4 code containing sorry placeholders.",
}
},
"required": ["code"],
},
},
},
{
"type": "function",
"function": {
"name": "final_answer",
"description": (
"Submit the final answer to the user. Call this when you have "
"a complete natural language answer and Lean code."
),
"parameters": {
"type": "object",
"properties": {
"answer": {
"type": "string",
"description": "Clear natural language explanation. Use LaTeX ($...$ and $$...$$) for math.",
},
"lean_code": {
"type": "string",
"description": "Complete Lean 4 code (should start with 'import Mathlib').",
},
"verified": {
"type": "boolean",
"description": "Whether the Lean code was verified successfully by Axle.",
},
},
"required": ["answer", "lean_code", "verified"],
},
},
},
]