cheekeong2025's picture
Update loader.py
2083ff0 verified
# loader.py β€” Public proxy loader for a private Static Space (CSP-safe + modal fix)
import os
import re
from typing import Optional
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import Response, HTMLResponse, PlainTextResponse, JSONResponse
app = FastAPI()
# -------------------------
# Config via Space Secrets:
# -------------------------
PRIVATE_SPACE_ID = os.getenv("PRIVATE_SPACE_ID")
HF_TOKEN = os.getenv("HF_TOKEN", "").strip()
TIMEOUT = float(os.getenv("UPSTREAM_TIMEOUT", "30"))
if not PRIVATE_SPACE_ID:
raise RuntimeError("PRIVATE_SPACE_ID is not set. Go to Settings β†’ Repository secrets and add it.")
# Private static endpoint derived from owner/repo
PRIVATE_STATIC_URL = (
f"https://{PRIVATE_SPACE_ID.split('/')[0]}-"
f"{PRIVATE_SPACE_ID.split('/')[1].replace('_','-').lower()}.static.hf.space"
)
AUTH_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
# β€”β€”β€” Tailwind replacement (CSP-safe) β€”β€”β€”
_TW_SCRIPT_RE = re.compile(
r'<script\s+src\s*=\s*"https://cdn\.tailwindcss\.com"\s*>\s*</script>',
flags=re.IGNORECASE
)
_TW_CSS_TAG = (
'<link rel="stylesheet" '
'href="https://cdn.jsdelivr.net/npm/tailwindcss@2.2.19/dist/tailwind.min.css">'
)
_INLINE_SAFETY_CSS = """
<style id="loader-inline-safety">
.hidden { display: none !important; }
#confirmationModal.hidden,
#confirmationModal:not(.show) { display: none !important; }
</style>
""".strip()
_HEAD_OPEN_RE = re.compile(r"<head[^>]*>", re.IGNORECASE)
def _rewrite_html_for_csp(html: str) -> str:
html2 = _TW_SCRIPT_RE.sub(_TW_CSS_TAG, html)
if _HEAD_OPEN_RE.search(html2):
html2 = _HEAD_OPEN_RE.sub(lambda m: f"{m.group(0)}\n{_INLINE_SAFETY_CSS}\n", html2, count=1)
else:
html2 = f"{_INLINE_SAFETY_CSS}\n{html2}"
return html2
def _media_type_from_headers(h: httpx.Headers) -> str:
ctype = h.get("content-type")
return ctype if ctype else "text/plain"
async def _fetch_upstream(path: Optional[str] = "", *, as_text=False) -> httpx.Response:
url = PRIVATE_STATIC_URL.rstrip("/")
if path:
url = f"{url}/{path.lstrip('/')}"
async with httpx.AsyncClient(timeout=TIMEOUT, follow_redirects=True) as client:
r = await client.get(url, headers=AUTH_HEADERS)
return r
# -------------------------
# Lightweight health & ping endpoints
# -------------------------
@app.get("/ping", response_class=PlainTextResponse)
async def ping():
"""Lightweight health check for cron-job.org or uptime services."""
return "OK"
@app.get("/health", response_class=JSONResponse)
async def health():
"""JSON status endpoint for debugging."""
return {"status": "ok", "private_static_url": PRIVATE_STATIC_URL}
# -------------------------
# Main proxy routes
# -------------------------
@app.get("/")
async def root(request: Request):
"""Serve the root HTML, rewriting Tailwind and injecting safety CSS.
Returns minimal text for cron-job.org."""
# Optional: if request from cron-job.org, skip heavy content
user_agent = request.headers.get("user-agent", "").lower()
if "cron-job" in user_agent or "uptime" in user_agent:
return PlainTextResponse("OK")
try:
r = await _fetch_upstream("index.html", as_text=True)
if r.status_code == 404:
r = await _fetch_upstream("", as_text=True)
r.raise_for_status()
except httpx.HTTPError as e:
return PlainTextResponse(f"Error fetching root: {e}", status_code=502)
media_type = _media_type_from_headers(r.headers)
text = _rewrite_html_for_csp(r.text)
return HTMLResponse(content=text, status_code=200, media_type=media_type)
@app.get("/{path:path}")
async def proxy(path: str, request: Request):
"""Proxy all static assets, rewriting HTML if needed."""
safe_path = path.strip().lstrip("/")
if not safe_path:
return await root(request)
try:
r = await _fetch_upstream(safe_path)
r.raise_for_status()
except httpx.HTTPStatusError as e:
return PlainTextResponse(f"{e.response.status_code} upstream for /{safe_path}",
status_code=e.response.status_code)
except httpx.HTTPError as e:
return PlainTextResponse(f"Upstream fetch error for /{safe_path}: {e}",
status_code=502)
media_type = _media_type_from_headers(r.headers)
# If HTML, apply the same rewrite + safety CSS
if "text/html" in media_type.lower():
text = _rewrite_html_for_csp(r.text)
return HTMLResponse(content=text, status_code=r.status_code, media_type=media_type)
# Pass-through for non-HTML
headers = {h: r.headers[h] for h in ("etag", "last-modified", "cache-control") if r.headers.get(h)}
return Response(content=r.content, status_code=r.status_code, media_type=media_type, headers=headers)