from __future__ import annotations import json import os import tarfile import tempfile import httpx from config import ( THEOREM_SEARCH_BASE_URL, LEAN_EXPLORE_BASE_URL, AXLE_API_KEY, AXLE_BASE_URL, LEAN_ENVIRONMENT, ARISTOTLE_API_KEY, ) # --------------------------------------------------------------------------- # TheoremSearch # --------------------------------------------------------------------------- async def search_theorems(query: str, n_results: int = 5) -> str: try: async with httpx.AsyncClient(timeout=30) as client: response = await client.post( f"{THEOREM_SEARCH_BASE_URL}/search", json={"query": query, "n_results": n_results}, ) if response.status_code != 200: return json.dumps({"error": f"TheoremSearch unavailable (HTTP {response.status_code})"}) data = response.json() results = [] for thm in data.get("theorems", []): results.append({ "name": thm.get("name", ""), "body": thm.get("body", "")[:500], "slogan": thm.get("slogan", ""), "paper_title": thm.get("paper", {}).get("title", ""), "link": thm.get("link", ""), }) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": f"TheoremSearch error: {e}"}) # --------------------------------------------------------------------------- # LeanExplore # --------------------------------------------------------------------------- async def search_lean_library(query: str) -> str: try: async with httpx.AsyncClient(timeout=30) as client: response = await client.get( f"{LEAN_EXPLORE_BASE_URL}/search", params={"q": query}, ) if response.status_code != 200: return json.dumps({"error": f"LeanExplore unavailable (HTTP {response.status_code})"}) data = response.json() results = [] for decl in data.get("results", [])[:10]: results.append({ "name": decl.get("name", ""), "module": decl.get("module", ""), "source_text": decl.get("source_text", "")[:400], "informalization": decl.get("informalization", "")[:300], }) return json.dumps(results, indent=2) except Exception as e: return json.dumps({"error": f"LeanExplore error: {e}"}) # --------------------------------------------------------------------------- # Loogle (type-based Mathlib search) # --------------------------------------------------------------------------- async def search_loogle(query: str) -> str: """Search Mathlib by type signature pattern via Loogle.""" try: async with httpx.AsyncClient(timeout=30) as client: response = await client.get( "https://loogle.lean-lang.org/json", params={"q": query}, ) if response.status_code != 200: return json.dumps({"error": f"Loogle unavailable (HTTP {response.status_code})"}) data = response.json() if "error" in data: return json.dumps({"error": data["error"], "suggestions": data.get("suggestions", [])}) hits = data.get("hits", [])[:10] results = [] for h in hits: results.append({ "name": h.get("name", ""), "type": h.get("type", ""), "module": h.get("module", ""), "doc": (h.get("doc") or "")[:200], }) return json.dumps({"count": data.get("count", 0), "results": results}, indent=2) except Exception as e: return json.dumps({"error": f"Loogle error: {e}"}) # --------------------------------------------------------------------------- # Axle (Lean verification) # --------------------------------------------------------------------------- async def check_lean_code(code: str) -> str: try: async with httpx.AsyncClient(timeout=180) as client: response = await client.post( f"{AXLE_BASE_URL}/check", headers={ "Authorization": f"Bearer {AXLE_API_KEY}", "Content-Type": "application/json", }, json={ "content": code, "environment": LEAN_ENVIRONMENT, "timeout_seconds": 120, }, ) if response.status_code != 200: return json.dumps({"error": f"Axle HTTP {response.status_code}: {response.text[:500]}"}) data = response.json() if "user_error" in data: return json.dumps({"error": data["user_error"]}) return json.dumps({ "okay": data.get("okay", False), "errors": data.get("lean_messages", {}).get("errors", []), "warnings": data.get("lean_messages", {}).get("warnings", []), "tool_errors": data.get("tool_messages", {}).get("errors", []), "tool_infos": data.get("tool_messages", {}).get("infos", []), }, indent=2) except Exception as e: return json.dumps({"error": f"Axle error: {e}"}) async def repair_lean_proofs(code: str) -> str: """Use Axle repair_proofs to automatically fill sorries with grind/simp/omega.""" try: async with httpx.AsyncClient(timeout=180) as client: response = await client.post( f"{AXLE_BASE_URL}/repair_proofs", headers={ "Authorization": f"Bearer {AXLE_API_KEY}", "Content-Type": "application/json", }, json={ "content": code, "environment": LEAN_ENVIRONMENT, "terminal_tactics": ["grind", "simp_all", "omega", "norm_num", "nlinarith", "aesop"], "timeout_seconds": 60, }, ) if response.status_code != 200: return json.dumps({"error": f"Axle HTTP {response.status_code}: {response.text[:500]}"}) data = response.json() repaired_code = data.get("content", code) stats = data.get("repair_stats", {}) has_sorry = "sorry" in repaired_code return json.dumps({ "repaired_code": repaired_code, "repairs_applied": stats.get("apply_terminal_tactics", 0), "still_has_sorry": has_sorry, "okay": data.get("okay", False), }, indent=2) except Exception as e: return json.dumps({"error": f"repair_proofs error: {e}"}) async def extract_sorry_lemmas(code: str) -> str: """Use Axle sorry2lemma to extract sorry'd subgoals into standalone lemma stubs.""" try: async with httpx.AsyncClient(timeout=180) as client: response = await client.post( f"{AXLE_BASE_URL}/sorry2lemma", headers={ "Authorization": f"Bearer {AXLE_API_KEY}", "Content-Type": "application/json", }, json={ "content": code, "environment": LEAN_ENVIRONMENT, "extract_sorries": True, "extract_errors": False, "reconstruct_callsite": True, "timeout_seconds": 120, }, ) if response.status_code != 200: return json.dumps({"error": f"Axle HTTP {response.status_code}: {response.text[:500]}"}) data = response.json() lemma_names = data.get("lemma_names", []) content = data.get("content", "") return json.dumps({ "lemma_names": lemma_names, "decomposed_code": content, "num_lemmas": len(lemma_names), }, indent=2) except Exception as e: return json.dumps({"error": f"sorry2lemma error: {e}"}) # --------------------------------------------------------------------------- # Aristotle (Lean formalization & proving) # --------------------------------------------------------------------------- async def submit_to_aristotle(prompt: str) -> str: """Submit a prompt to Aristotle. Returns project info JSON.""" import aristotlelib from aristotlelib import Project aristotlelib.set_api_key(ARISTOTLE_API_KEY) try: project = await Project.create(prompt=prompt) return json.dumps({ "project_id": project.project_id, "status": project.status.value, }) except Exception as e: return json.dumps({"error": f"Aristotle submit error: {e}"}) async def check_aristotle_status(project_id: str) -> dict: """Check the status of an Aristotle project. Returns a dict (non-blocking).""" import aristotlelib from aristotlelib import Project aristotlelib.set_api_key(ARISTOTLE_API_KEY) try: project = await Project.from_id(project_id) return { "project_id": project.project_id, "status": project.status.value, "percent_complete": project.percent_complete, } except Exception as e: return {"error": str(e)} async def get_aristotle_result(project_id: str) -> str: """Download and return the Lean code from a completed Aristotle project.""" import aristotlelib from aristotlelib import Project aristotlelib.set_api_key(ARISTOTLE_API_KEY) try: project = await Project.from_id(project_id) with tempfile.TemporaryDirectory() as tmpdir: result_path = await project.get_solution( destination=os.path.join(tmpdir, "result") ) result_path = str(result_path) if result_path.endswith(".tar.gz") or result_path.endswith(".tgz"): extract_dir = os.path.join(tmpdir, "extracted") os.makedirs(extract_dir, exist_ok=True) with tarfile.open(result_path) as tar: tar.extractall(extract_dir) lean_contents = [] for root, _dirs, files in os.walk(extract_dir): for f in sorted(files): if f.endswith(".lean"): with open(os.path.join(root, f)) as fh: lean_contents.append(fh.read()) if not lean_contents: return "No Lean files found in Aristotle result." return "\n\n".join(lean_contents) else: with open(result_path) as f: return f.read() except Exception as e: return f"Error downloading Aristotle result: {e}" # --------------------------------------------------------------------------- # Tool definitions for OpenAI function calling # --------------------------------------------------------------------------- TOOL_DEFINITIONS = [ { "type": "function", "function": { "name": "search_theorems", "description": ( "Search for mathematical theorems on arXiv using TheoremSearch. " "Returns theorem statements, slogans, and paper references." ), "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Natural language search query about the mathematical topic", } }, "required": ["query"], }, }, }, { "type": "function", "function": { "name": "search_lean_library", "description": ( "Search Mathlib (Lean 4 math library) for relevant declarations, " "theorems, and definitions. Use to find existing Lean names and API." ), "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": ( "Search query - a Lean name like 'Nat.add_comm' " "or natural language like 'sum of even numbers is even'" ), } }, "required": ["query"], }, }, }, { "type": "function", "function": { "name": "search_loogle", "description": ( "Search Mathlib by TYPE SIGNATURE pattern using Loogle. " "Use this to find lemmas by their type shape. Examples:\n" "- 'List.map' (by name)\n" "- 'Nat -> Nat -> Nat' (by type)\n" "- '_ + _ = _ + _' (by pattern)\n" "- 'List.map (_ ∘ _) _ = _' (mixed)\n" "Returns declaration names, types, and modules." ), "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Loogle query — a name, type pattern, or expression pattern", } }, "required": ["query"], }, }, }, { "type": "function", "function": { "name": "check_lean_code", "description": ( "Quickly check if Lean 4 code compiles using the Axle verifier (seconds). " "Code MUST start with 'import Mathlib'. Use for fast verification." ), "parameters": { "type": "object", "properties": { "code": { "type": "string", "description": "Complete Lean 4 code. Must start with 'import Mathlib'.", } }, "required": ["code"], }, }, }, { "type": "function", "function": { "name": "submit_to_aristotle", "description": ( "Submit a proof request to Aristotle (automated theorem prover). " "Aristotle proves graduate/research-level math in Lean 4. " "Returns a project_id immediately — use check_aristotle_status to poll " "and get_aristotle_result when COMPLETE.\n\n" "PREFERRED FORMAT: Lean 4 code with sorry placeholders. This preserves " "the exact theorem signature and lets Aristotle focus on filling proofs:\n" " 'Fill in the sorries:\\n```lean\\nimport Mathlib\\n\\ntheorem my_thm ... := by\\n sorry\\n```'\n\n" "ALTERNATIVE FORMATS (when you don't have a Lean statement yet):\n" "- Natural language description of the result to prove\n" "- Sub-lemmas that the result decomposes into\n\n" "SUBMIT MULTIPLE JOBS for different sub-lemmas or strategies.\n" "After submitting, continue researching — do NOT wait immediately." ), "parameters": { "type": "object", "properties": { "prompt": { "type": "string", "description": ( "PREFERRED: Lean 4 code with `sorry` in place of proofs. " "ALTERNATIVE: Natural language description of the theorem to prove." ), } }, "required": ["prompt"], }, }, }, { "type": "function", "function": { "name": "check_aristotle_status", "description": ( "Non-blocking check of an Aristotle project's status. " "Returns status and percent_complete. Use this to poll while " "doing other work." ), "parameters": { "type": "object", "properties": { "project_id": { "type": "string", "description": "The project_id from submit_to_aristotle.", } }, "required": ["project_id"], }, }, }, # wait_for_aristotle REMOVED — it blocked the agent for 2+ hours. # Agent must use check_aristotle_status (non-blocking) + get_aristotle_result instead. { "type": "function", "function": { "name": "get_aristotle_result", "description": ( "Download the Lean code from a COMPLETED Aristotle project. " "Only call after check_aristotle_status shows COMPLETE." ), "parameters": { "type": "object", "properties": { "project_id": { "type": "string", "description": "The project_id of a completed Aristotle project.", } }, "required": ["project_id"], }, }, }, { "type": "function", "function": { "name": "repair_lean_proofs", "description": ( "Automatically attempt to fill sorry placeholders with automation tactics " "(grind, simp, omega, norm_num, nlinarith, aesop). FAST (~1-2 seconds). " "Call this BEFORE submitting to Aristotle — it may resolve simple sorries instantly. " "Returns repaired code and whether sorry remains." ), "parameters": { "type": "object", "properties": { "code": { "type": "string", "description": "Lean 4 code with sorry placeholders to attempt repair.", } }, "required": ["code"], }, }, }, { "type": "function", "function": { "name": "extract_sorry_lemmas", "description": ( "Extract sorry'd subgoals from Lean code into standalone lemma stubs. " "Use this when you have code that compiles with sorry — it decomposes the sorry " "into independent lemma statements that can each be submitted to Aristotle. " "Returns the lemma names and decomposed code." ), "parameters": { "type": "object", "properties": { "code": { "type": "string", "description": "Lean 4 code containing sorry placeholders.", } }, "required": ["code"], }, }, }, { "type": "function", "function": { "name": "final_answer", "description": ( "Submit the final answer to the user. Call this when you have " "a complete natural language answer and Lean code." ), "parameters": { "type": "object", "properties": { "answer": { "type": "string", "description": "Clear natural language explanation. Use LaTeX ($...$ and $$...$$) for math.", }, "lean_code": { "type": "string", "description": "Complete Lean 4 code (should start with 'import Mathlib').", }, "verified": { "type": "boolean", "description": "Whether the Lean code was verified successfully by Axle.", }, }, "required": ["answer", "lean_code", "verified"], }, }, }, ]