Spaces:
Sleeping
Sleeping
| # loader.py — Public proxy to a PRIVATE Static Space (auth via resolve/raw) + preflight checks | |
| import os | |
| import mimetypes | |
| from urllib.parse import urljoin | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import Response, HTMLResponse, PlainTextResponse, JSONResponse | |
| import httpx | |
| app = FastAPI() | |
| # ===== Config (from Space Secrets) ===== | |
| SPACE_ID = os.getenv("PRIVATE_SPACE_ID") | |
| HF_TOKEN = os.getenv("HF_TOKEN") # token with READ access | |
| REVISION = os.getenv("REVISION", "main") # branch/tag/commit | |
| # ====================================== | |
| if not SPACE_ID: | |
| raise RuntimeError("Set PRIVATE_SPACE_ID in Settings → Repository secrets.") | |
| if not HF_TOKEN: | |
| raise RuntimeError("Set HF_TOKEN (READ access to the private Space) in Settings → Repository secrets.") | |
| HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| BASE_RESOLVE = f"https://huggingface.co/spaces/{SPACE_ID}/resolve/{REVISION}/" | |
| BASE_RAW = f"https://huggingface.co/spaces/{SPACE_ID}/raw/{REVISION}/" | |
| # ---------- Preflight: validate token and repo access ---------- | |
| async def _preflight(): | |
| async with httpx.AsyncClient(timeout=20) as client: | |
| # 1) whoami (confirms token is valid) | |
| w = await client.get("https://huggingface.co/api/whoami-v2", headers=HEADERS) | |
| if w.status_code != 200: | |
| raise RuntimeError(f"Token invalid: whoami {w.status_code} {w.text}") | |
| ident = w.json().get("name") or w.json() | |
| print(f"[preflight] whoami = {ident}") | |
| # 2) repo info (confirms you can read the private Space) | |
| ri = await client.get(f"https://huggingface.co/api/spaces/{SPACE_ID}", headers=HEADERS) | |
| if ri.status_code != 200: | |
| raise RuntimeError( | |
| f"Cannot access space '{SPACE_ID}': {ri.status_code} {ri.text}. " | |
| "Check PRIVATE_SPACE_ID spelling/case and that this token has access." | |
| ) | |
| info = ri.json() | |
| print(f"[preflight] space='{SPACE_ID}' private={info.get('private')} repo_owner={info.get('author')}") | |
| # Optional: verify the revision exists by probing index.html under resolve | |
| test_url = f"{BASE_RESOLVE}index.html" | |
| tr = await client.get(test_url, headers=HEADERS) | |
| print(f"[preflight] probe {test_url} -> {tr.status_code}") | |
| # Run preflight once on startup | |
| async def _startup(): | |
| try: | |
| await _preflight() | |
| except Exception as e: | |
| # Don't crash the server; surface the error in /health | |
| global _preflight_error | |
| _preflight_error = str(e) | |
| print(f"[preflight] ERROR: {_preflight_error}") | |
| else: | |
| _preflight_error = None | |
| def _join(base: str, path: str) -> str: | |
| return urljoin(base, path.lstrip("/")) | |
| def _with_q(url: str, req: Request) -> str: | |
| q = str(req.url.query or "") | |
| return f"{url}?{q}" if q else url | |
| def _looks_file(path: str) -> bool: | |
| return "." in path.split("/")[-1] | |
| def _is_html_path(path: str) -> bool: | |
| return (not _looks_file(path)) or path.lower().endswith(".html") or path == "" | |
| def _mime(path: str, default="application/octet-stream") -> str: | |
| if _is_html_path(path): | |
| return "text/html; charset=utf-8" | |
| m, _ = mimetypes.guess_type(path) | |
| return m or default | |
| async def _get(client: httpx.AsyncClient, url: str) -> httpx.Response: | |
| r = await client.get(url, headers=HEADERS) | |
| print(f"[proxy] GET {url} -> {r.status_code}") | |
| return r | |
| async def _fetch(client: httpx.AsyncClient, path: str, req: Request): | |
| """ | |
| Order (first 200 wins): | |
| resolve:path | |
| resolve:static/path | |
| (dir) resolve:path/index.html | |
| (dir) resolve:static/path/index.html | |
| raw:path | |
| raw:static/path | |
| (dir) raw:path/index.html | |
| (dir) raw:static/path/index.html | |
| """ | |
| as_dir = not _looks_file(path) | |
| tried = [] | |
| candidates = [ | |
| _join(BASE_RESOLVE, path), | |
| _join(BASE_RESOLVE, f"static/{path.lstrip('/')}"), | |
| ] | |
| if as_dir: | |
| candidates += [ | |
| _join(BASE_RESOLVE, path.rstrip("/") + "/index.html"), | |
| _join(BASE_RESOLVE, f"static/{path.rstrip('/')}/index.html"), | |
| ] | |
| candidates += [ | |
| _join(BASE_RAW, path), | |
| _join(BASE_RAW, f"static/{path.lstrip('/')}"), | |
| ] | |
| if as_dir: | |
| candidates += [ | |
| _join(BASE_RAW, path.rstrip("/") + "/index.html"), | |
| _join(BASE_RAW, f"static/{path.rstrip('/')}/index.html"), | |
| ] | |
| last = None | |
| for base_u in candidates: | |
| url = _with_q(base_u, req) | |
| r = await _get(client, url) | |
| tried.append(base_u) | |
| if r.status_code == 200: | |
| return r, " -> ".join(tried) | |
| last = r | |
| return last, " -> ".join(tried) | |
| async def health(): | |
| return { | |
| "status": "ok" if _preflight_error is None else "error", | |
| "space": SPACE_ID, | |
| "revision": REVISION, | |
| "preflight_error": _preflight_error, | |
| } | |
| async def debug_fetch(path: str, request: Request): | |
| try: | |
| async with httpx.AsyncClient(timeout=None, follow_redirects=True) as client: | |
| r, tried = await _fetch(client, path, request) | |
| return JSONResponse({"status": r.status_code, "content_type": r.headers.get("content-type"), "tried": tried}) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def root(request: Request): | |
| try: | |
| async with httpx.AsyncClient(timeout=None, follow_redirects=True) as client: | |
| r, tried = await _fetch(client, "index.html", request) | |
| if r.status_code == 404: | |
| r, tried2 = await _fetch(client, "", request) | |
| print(f"[root] tried: {tried} || {tried2}") | |
| else: | |
| print(f"[root] tried: {tried}") | |
| except Exception as e: | |
| return PlainTextResponse(f"Error fetching root: {e}", status_code=500) | |
| # Always render root as HTML | |
| return HTMLResponse(r.text, status_code=r.status_code, media_type="text/html; charset=utf-8") | |
| async def proxy(path: str, request: Request): | |
| try: | |
| async with httpx.AsyncClient(timeout=None, follow_redirects=True) as client: | |
| r, tried = await _fetch(client, path, request) | |
| print(f"[proxy] path={path} status={r.status_code}, upstream-ctype={r.headers.get('content-type')}") | |
| except Exception as e: | |
| return PlainTextResponse(f"Error fetching {path}: {e}", status_code=500) | |
| if _is_html_path(path): | |
| return HTMLResponse(r.text, status_code=r.status_code, media_type="text/html; charset=utf-8") | |
| ctype = r.headers.get("content-type") or _mime(path) | |
| return Response(content=r.content, media_type=ctype, status_code=r.status_code) |