Spaces:
Running
Running
ship: SZLHOLDINGS/lean-kernel — live Lean v4.13.0 + Mathlib kernel, 7 API endpoints, honest build status
5520f8e verified | #!/usr/bin/env python3 | |
| """SZLHOLDINGS/lean-kernel — live Lean/Lake verification kernel. | |
| FastAPI service that: | |
| - clones / uses the pinned lutar-lean repo, | |
| - runs `lake build` on demand (streaming), | |
| - serves canonical numbers LIVE from lean_numbers.py, | |
| - exposes the theorem table (PROVEN/AXIOM/SORRY + file:line), | |
| - verifies Λ-receipts against the canonical geometric-mean definition, | |
| - serves + exercises the 10 golden reference vectors. | |
| HONESTY: if `lake build` fails (e.g. no Mathlib cache / low disk on the box), | |
| healthz and the UI report the failure verbatim. Nothing is faked green. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import math | |
| import os | |
| import subprocess | |
| import time | |
| from pathlib import Path | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse | |
| APP_DIR = Path(__file__).resolve().parent | |
| REPO_DIR = Path(os.environ.get("LUTAR_REPO", "/opt/lutar-lean")) | |
| DATA_DIR = APP_DIR / "data" | |
| ELAN_BIN = os.environ.get("ELAN_BIN", "/root/.elan/bin") | |
| os.environ["PATH"] = f"{ELAN_BIN}:{os.environ.get('PATH','')}" | |
| REF_VECTORS_PATH = REPO_DIR / "reference-vectors.json" | |
| if not REF_VECTORS_PATH.exists(): | |
| REF_VECTORS_PATH = DATA_DIR / "reference-vectors.json" | |
| # Cache of last build result (build is expensive; numbers are recomputed live). | |
| _BUILD_STATE: dict = {"status": "unknown", "ran_at": None, "duration_s": None, | |
| "exit_code": None, "tail": ""} | |
| app = FastAPI(title="SZLHOLDINGS lean-kernel", version="1.0.0") | |
| # -------------------------------------------------------------------------- | |
| # Canonical Λ — geometric mean (matches Lutar/Invariant.lean & lambda-spec.md) | |
| # -------------------------------------------------------------------------- | |
| def compute_lambda(values: list[float]) -> float: | |
| if not values: | |
| return 0.0 | |
| for v in values: | |
| if not math.isfinite(v) or v < 0: | |
| return 0.0 | |
| if v == 0: | |
| return 0.0 | |
| k = len(values) | |
| log_sum = sum(math.log(v) for v in values) | |
| return math.exp(log_sum / k) | |
| def _run(cmd: list[str], cwd: Path, timeout: int = 60) -> subprocess.CompletedProcess: | |
| return subprocess.run(cmd, cwd=str(cwd), capture_output=True, text=True, | |
| timeout=timeout) | |
| def _repo_sha() -> str: | |
| try: | |
| r = _run(["git", "rev-parse", "HEAD"], REPO_DIR, timeout=15) | |
| return r.stdout.strip() or "unknown" | |
| except Exception: | |
| return "unknown" | |
| # -------------------------------------------------------------------------- | |
| # GET /api/lean/healthz | |
| # -------------------------------------------------------------------------- | |
| def healthz(): | |
| return JSONResponse({ | |
| "ok": True, | |
| "service": "SZLHOLDINGS/lean-kernel", | |
| "repo": "szl-holdings/lutar-lean", | |
| "repo_present": REPO_DIR.exists(), | |
| "repo_sha": _repo_sha(), | |
| "toolchain": _toolchain(), | |
| "build": _BUILD_STATE, | |
| "ts": int(time.time()), | |
| }) | |
| def _toolchain() -> str: | |
| tc = REPO_DIR / "lean-toolchain" | |
| if tc.exists(): | |
| return tc.read_text().strip() | |
| return "unknown" | |
| # -------------------------------------------------------------------------- | |
| # GET /api/lean/numbers — LIVE from lean_numbers.py against current commit | |
| # -------------------------------------------------------------------------- | |
| def numbers(): | |
| try: | |
| r = _run(["python3", str(APP_DIR / "lean_numbers.py"), | |
| "--repo-path", str(REPO_DIR)], APP_DIR, timeout=60) | |
| data = json.loads(r.stdout) | |
| data["live"] = True | |
| return JSONResponse(data) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e), "live": False}, status_code=500) | |
| # -------------------------------------------------------------------------- | |
| # GET /api/lean/theorems — full classified table | |
| # -------------------------------------------------------------------------- | |
| def theorems(): | |
| try: | |
| r = _run(["python3", str(APP_DIR / "theorem_scan.py"), str(REPO_DIR)], | |
| APP_DIR, timeout=60) | |
| return JSONResponse(json.loads(r.stdout)) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| # -------------------------------------------------------------------------- | |
| # POST /api/lean/verify — verify a Λ-receipt against the canonical definition | |
| # -------------------------------------------------------------------------- | |
| async def verify(request: Request): | |
| try: | |
| body = await request.json() | |
| except Exception: | |
| return JSONResponse({"verified": False, "reason": "invalid JSON"}, | |
| status_code=400) | |
| axes = body.get("axes") | |
| claimed = body.get("lambda", body.get("Lambda")) | |
| if axes is None: | |
| return JSONResponse({"verified": False, "reason": "missing 'axes'"}, | |
| status_code=400) | |
| if isinstance(axes, dict): | |
| values = [float(v) for v in axes.values()] | |
| elif isinstance(axes, list): | |
| values = [float(v) for v in axes] | |
| else: | |
| return JSONResponse({"verified": False, "reason": "'axes' must be list or object"}, | |
| status_code=400) | |
| recomputed = compute_lambda(values) | |
| if claimed is None: | |
| return JSONResponse({ | |
| "verified": None, | |
| "recomputed_lambda": recomputed, | |
| "reason": "no claimed lambda to check; returning canonical recompute", | |
| "theorem": "Lutar.Invariant.Λ_def (closed form (∏xᵢ)^(1/k))", | |
| }) | |
| tol_abs, tol_rel = 1e-12, 1e-9 | |
| diff = abs(recomputed - float(claimed)) | |
| ok = diff <= tol_abs + tol_rel * max(abs(recomputed), abs(float(claimed))) | |
| # which theorem confirms it | |
| if all(v == values[0] for v in values): | |
| thm = "Lutar.a3_normalize_proof (Λ k (const c) = c)" | |
| elif min(values) == 0: | |
| thm = "Lutar.Invariant.Λ_def + zero-pinning (any axis 0 ⇒ Λ=0)" | |
| else: | |
| thm = "Lutar.Invariant.Λ_def (closed-form geomean) + Lutar.min_le_Λ/Λ_le_max bound" | |
| return JSONResponse({ | |
| "verified": bool(ok), | |
| "claimed_lambda": float(claimed), | |
| "recomputed_lambda": recomputed, | |
| "abs_diff": diff, | |
| "tolerance": {"abs": tol_abs, "rel": tol_rel}, | |
| "theorem": thm, | |
| "definition": "Λ_k(x) = (∏ xᵢ)^(1/k) (unweighted geometric mean)", | |
| }) | |
| # -------------------------------------------------------------------------- | |
| # GET /api/lean/build — trigger `lake build`, stream output (SSE) | |
| # -------------------------------------------------------------------------- | |
| async def build(stream: bool = True): | |
| if not stream: | |
| res = await _do_build_collect() | |
| return JSONResponse(res) | |
| async def gen(): | |
| start = time.time() | |
| yield _sse({"event": "start", "msg": "lake build starting", "cwd": str(REPO_DIR)}) | |
| if not REPO_DIR.exists(): | |
| yield _sse({"event": "error", "msg": "repo not present"}) | |
| return | |
| proc = await asyncio.create_subprocess_exec( | |
| f"{ELAN_BIN}/lake", "build", | |
| cwd=str(REPO_DIR), | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.STDOUT, | |
| env={**os.environ}, | |
| ) | |
| tail = [] | |
| assert proc.stdout | |
| async for raw in proc.stdout: | |
| line = raw.decode(errors="replace").rstrip("\n") | |
| tail.append(line) | |
| tail[:] = tail[-200:] | |
| yield _sse({"event": "line", "line": line}) | |
| rc = await proc.wait() | |
| dur = round(time.time() - start, 2) | |
| status = "pass" if rc == 0 else "fail" | |
| _BUILD_STATE.update({"status": status, "ran_at": int(time.time()), | |
| "duration_s": dur, "exit_code": rc, | |
| "tail": "\n".join(tail[-40:])}) | |
| yield _sse({"event": "done", "status": status, "exit_code": rc, | |
| "duration_s": dur}) | |
| return StreamingResponse(gen(), media_type="text/event-stream") | |
| async def _do_build_collect() -> dict: | |
| start = time.time() | |
| if not REPO_DIR.exists(): | |
| return {"status": "fail", "reason": "repo not present"} | |
| proc = await asyncio.create_subprocess_exec( | |
| f"{ELAN_BIN}/lake", "build", | |
| cwd=str(REPO_DIR), | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.STDOUT, | |
| env={**os.environ}, | |
| ) | |
| out, _ = await proc.communicate() | |
| rc = proc.returncode | |
| dur = round(time.time() - start, 2) | |
| status = "pass" if rc == 0 else "fail" | |
| tail = "\n".join(out.decode(errors="replace").splitlines()[-40:]) | |
| _BUILD_STATE.update({"status": status, "ran_at": int(time.time()), | |
| "duration_s": dur, "exit_code": rc, "tail": tail}) | |
| return {"status": status, "exit_code": rc, "duration_s": dur, "tail": tail} | |
| def _sse(obj: dict) -> str: | |
| return f"data: {json.dumps(obj)}\n\n" | |
| # -------------------------------------------------------------------------- | |
| # GET /api/lean/vectors — the 10 golden reference vectors | |
| # -------------------------------------------------------------------------- | |
| def vectors(): | |
| try: | |
| return JSONResponse(json.loads(REF_VECTORS_PATH.read_text())) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| # -------------------------------------------------------------------------- | |
| # POST /api/lean/vectors/exercise — recompute each vector's Λ, pass/fail vs pinned | |
| # -------------------------------------------------------------------------- | |
| def exercise_vectors(): | |
| try: | |
| spec = json.loads(REF_VECTORS_PATH.read_text()) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| tol_abs = spec.get("toleranceAbs", 1e-12) | |
| tol_rel = spec.get("toleranceRel", 1e-9) | |
| results = [] | |
| all_pass = True | |
| for v in spec.get("vectors", []): | |
| recomputed = compute_lambda([float(a) for a in v["axes"]]) | |
| pinned = float(v["lambda"]) | |
| diff = abs(recomputed - pinned) | |
| ok = diff <= tol_abs + tol_rel * max(abs(recomputed), abs(pinned)) | |
| all_pass = all_pass and ok | |
| results.append({"id": v["id"], "pinned_lambda": pinned, | |
| "recomputed_lambda": recomputed, "abs_diff": diff, | |
| "pass": ok}) | |
| return JSONResponse({ | |
| "definition": spec.get("formula", "Λ_k(x) = (∏ xᵢ)^(1/k)"), | |
| "tolerance": {"abs": tol_abs, "rel": tol_rel}, | |
| "all_pass": all_pass, | |
| "count": len(results), | |
| "results": results, | |
| }) | |
| # -------------------------------------------------------------------------- | |
| # UI | |
| # -------------------------------------------------------------------------- | |
| def ui(): | |
| return (APP_DIR / "index.html").read_text() | |