# loader.py — Public proxy to a PRIVATE Static Space (auth via resolve/raw) + preflight checks import os import mimetypes from urllib.parse import urljoin from fastapi import FastAPI, Request from fastapi.responses import Response, HTMLResponse, PlainTextResponse, JSONResponse import httpx app = FastAPI() # ===== Config (from Space Secrets) ===== SPACE_ID = os.getenv("PRIVATE_SPACE_ID") HF_TOKEN = os.getenv("HF_TOKEN") # token with READ access REVISION = os.getenv("REVISION", "main") # branch/tag/commit # ====================================== if not SPACE_ID: raise RuntimeError("Set PRIVATE_SPACE_ID in Settings → Repository secrets.") if not HF_TOKEN: raise RuntimeError("Set HF_TOKEN (READ access to the private Space) in Settings → Repository secrets.") HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} BASE_RESOLVE = f"https://huggingface.co/spaces/{SPACE_ID}/resolve/{REVISION}/" BASE_RAW = f"https://huggingface.co/spaces/{SPACE_ID}/raw/{REVISION}/" # ---------- Preflight: validate token and repo access ---------- async def _preflight(): async with httpx.AsyncClient(timeout=20) as client: # 1) whoami (confirms token is valid) w = await client.get("https://huggingface.co/api/whoami-v2", headers=HEADERS) if w.status_code != 200: raise RuntimeError(f"Token invalid: whoami {w.status_code} {w.text}") ident = w.json().get("name") or w.json() print(f"[preflight] whoami = {ident}") # 2) repo info (confirms you can read the private Space) ri = await client.get(f"https://huggingface.co/api/spaces/{SPACE_ID}", headers=HEADERS) if ri.status_code != 200: raise RuntimeError( f"Cannot access space '{SPACE_ID}': {ri.status_code} {ri.text}. " "Check PRIVATE_SPACE_ID spelling/case and that this token has access." ) info = ri.json() print(f"[preflight] space='{SPACE_ID}' private={info.get('private')} repo_owner={info.get('author')}") # Optional: verify the revision exists by probing index.html under resolve test_url = f"{BASE_RESOLVE}index.html" tr = await client.get(test_url, headers=HEADERS) print(f"[preflight] probe {test_url} -> {tr.status_code}") # Run preflight once on startup @app.on_event("startup") async def _startup(): try: await _preflight() except Exception as e: # Don't crash the server; surface the error in /health global _preflight_error _preflight_error = str(e) print(f"[preflight] ERROR: {_preflight_error}") else: _preflight_error = None def _join(base: str, path: str) -> str: return urljoin(base, path.lstrip("/")) def _with_q(url: str, req: Request) -> str: q = str(req.url.query or "") return f"{url}?{q}" if q else url def _looks_file(path: str) -> bool: return "." in path.split("/")[-1] def _is_html_path(path: str) -> bool: return (not _looks_file(path)) or path.lower().endswith(".html") or path == "" def _mime(path: str, default="application/octet-stream") -> str: if _is_html_path(path): return "text/html; charset=utf-8" m, _ = mimetypes.guess_type(path) return m or default async def _get(client: httpx.AsyncClient, url: str) -> httpx.Response: r = await client.get(url, headers=HEADERS) print(f"[proxy] GET {url} -> {r.status_code}") return r async def _fetch(client: httpx.AsyncClient, path: str, req: Request): """ Order (first 200 wins): resolve:path resolve:static/path (dir) resolve:path/index.html (dir) resolve:static/path/index.html raw:path raw:static/path (dir) raw:path/index.html (dir) raw:static/path/index.html """ as_dir = not _looks_file(path) tried = [] candidates = [ _join(BASE_RESOLVE, path), _join(BASE_RESOLVE, f"static/{path.lstrip('/')}"), ] if as_dir: candidates += [ _join(BASE_RESOLVE, path.rstrip("/") + "/index.html"), _join(BASE_RESOLVE, f"static/{path.rstrip('/')}/index.html"), ] candidates += [ _join(BASE_RAW, path), _join(BASE_RAW, f"static/{path.lstrip('/')}"), ] if as_dir: candidates += [ _join(BASE_RAW, path.rstrip("/") + "/index.html"), _join(BASE_RAW, f"static/{path.rstrip('/')}/index.html"), ] last = None for base_u in candidates: url = _with_q(base_u, req) r = await _get(client, url) tried.append(base_u) if r.status_code == 200: return r, " -> ".join(tried) last = r return last, " -> ".join(tried) @app.get("/health") async def health(): return { "status": "ok" if _preflight_error is None else "error", "space": SPACE_ID, "revision": REVISION, "preflight_error": _preflight_error, } @app.get("/_debug/fetch/{path:path}") async def debug_fetch(path: str, request: Request): try: async with httpx.AsyncClient(timeout=None, follow_redirects=True) as client: r, tried = await _fetch(client, path, request) return JSONResponse({"status": r.status_code, "content_type": r.headers.get("content-type"), "tried": tried}) except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) @app.get("/") async def root(request: Request): try: async with httpx.AsyncClient(timeout=None, follow_redirects=True) as client: r, tried = await _fetch(client, "index.html", request) if r.status_code == 404: r, tried2 = await _fetch(client, "", request) print(f"[root] tried: {tried} || {tried2}") else: print(f"[root] tried: {tried}") except Exception as e: return PlainTextResponse(f"Error fetching root: {e}", status_code=500) # Always render root as HTML return HTMLResponse(r.text, status_code=r.status_code, media_type="text/html; charset=utf-8") @app.get("/{path:path}") async def proxy(path: str, request: Request): try: async with httpx.AsyncClient(timeout=None, follow_redirects=True) as client: r, tried = await _fetch(client, path, request) print(f"[proxy] path={path} status={r.status_code}, upstream-ctype={r.headers.get('content-type')}") except Exception as e: return PlainTextResponse(f"Error fetching {path}: {e}", status_code=500) if _is_html_path(path): return HTMLResponse(r.text, status_code=r.status_code, media_type="text/html; charset=utf-8") ctype = r.headers.get("content-type") or _mime(path) return Response(content=r.content, media_type=ctype, status_code=r.status_code)