|
|
|
|
|
import os |
|
|
import re |
|
|
from typing import Optional |
|
|
|
|
|
import httpx |
|
|
from fastapi import FastAPI, Request |
|
|
from fastapi.responses import Response, HTMLResponse, PlainTextResponse, JSONResponse |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PRIVATE_SPACE_ID = os.getenv("PRIVATE_SPACE_ID") |
|
|
HF_TOKEN = os.getenv("HF_TOKEN", "").strip() |
|
|
TIMEOUT = float(os.getenv("UPSTREAM_TIMEOUT", "30")) |
|
|
|
|
|
if not PRIVATE_SPACE_ID: |
|
|
raise RuntimeError("PRIVATE_SPACE_ID is not set. Go to Settings β Repository secrets and add it.") |
|
|
|
|
|
|
|
|
PRIVATE_STATIC_URL = ( |
|
|
f"https://{PRIVATE_SPACE_ID.split('/')[0]}-" |
|
|
f"{PRIVATE_SPACE_ID.split('/')[1].replace('_','-').lower()}.static.hf.space" |
|
|
) |
|
|
AUTH_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} |
|
|
|
|
|
|
|
|
_TW_SCRIPT_RE = re.compile( |
|
|
r'<script\s+src\s*=\s*"https://cdn\.tailwindcss\.com"\s*>\s*</script>', |
|
|
flags=re.IGNORECASE |
|
|
) |
|
|
_TW_CSS_TAG = ( |
|
|
'<link rel="stylesheet" ' |
|
|
'href="https://cdn.jsdelivr.net/npm/tailwindcss@2.2.19/dist/tailwind.min.css">' |
|
|
) |
|
|
|
|
|
_INLINE_SAFETY_CSS = """ |
|
|
<style id="loader-inline-safety"> |
|
|
.hidden { display: none !important; } |
|
|
#confirmationModal.hidden, |
|
|
#confirmationModal:not(.show) { display: none !important; } |
|
|
</style> |
|
|
""".strip() |
|
|
|
|
|
_HEAD_OPEN_RE = re.compile(r"<head[^>]*>", re.IGNORECASE) |
|
|
|
|
|
|
|
|
def _rewrite_html_for_csp(html: str) -> str: |
|
|
html2 = _TW_SCRIPT_RE.sub(_TW_CSS_TAG, html) |
|
|
if _HEAD_OPEN_RE.search(html2): |
|
|
html2 = _HEAD_OPEN_RE.sub(lambda m: f"{m.group(0)}\n{_INLINE_SAFETY_CSS}\n", html2, count=1) |
|
|
else: |
|
|
html2 = f"{_INLINE_SAFETY_CSS}\n{html2}" |
|
|
return html2 |
|
|
|
|
|
|
|
|
def _media_type_from_headers(h: httpx.Headers) -> str: |
|
|
ctype = h.get("content-type") |
|
|
return ctype if ctype else "text/plain" |
|
|
|
|
|
|
|
|
async def _fetch_upstream(path: Optional[str] = "", *, as_text=False) -> httpx.Response: |
|
|
url = PRIVATE_STATIC_URL.rstrip("/") |
|
|
if path: |
|
|
url = f"{url}/{path.lstrip('/')}" |
|
|
async with httpx.AsyncClient(timeout=TIMEOUT, follow_redirects=True) as client: |
|
|
r = await client.get(url, headers=AUTH_HEADERS) |
|
|
return r |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/ping", response_class=PlainTextResponse) |
|
|
async def ping(): |
|
|
"""Lightweight health check for cron-job.org or uptime services.""" |
|
|
return "OK" |
|
|
|
|
|
@app.get("/health", response_class=JSONResponse) |
|
|
async def health(): |
|
|
"""JSON status endpoint for debugging.""" |
|
|
return {"status": "ok", "private_static_url": PRIVATE_STATIC_URL} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(request: Request): |
|
|
"""Serve the root HTML, rewriting Tailwind and injecting safety CSS. |
|
|
Returns minimal text for cron-job.org.""" |
|
|
|
|
|
user_agent = request.headers.get("user-agent", "").lower() |
|
|
if "cron-job" in user_agent or "uptime" in user_agent: |
|
|
return PlainTextResponse("OK") |
|
|
|
|
|
try: |
|
|
r = await _fetch_upstream("index.html", as_text=True) |
|
|
if r.status_code == 404: |
|
|
r = await _fetch_upstream("", as_text=True) |
|
|
r.raise_for_status() |
|
|
except httpx.HTTPError as e: |
|
|
return PlainTextResponse(f"Error fetching root: {e}", status_code=502) |
|
|
|
|
|
media_type = _media_type_from_headers(r.headers) |
|
|
text = _rewrite_html_for_csp(r.text) |
|
|
return HTMLResponse(content=text, status_code=200, media_type=media_type) |
|
|
|
|
|
|
|
|
@app.get("/{path:path}") |
|
|
async def proxy(path: str, request: Request): |
|
|
"""Proxy all static assets, rewriting HTML if needed.""" |
|
|
safe_path = path.strip().lstrip("/") |
|
|
if not safe_path: |
|
|
return await root(request) |
|
|
|
|
|
try: |
|
|
r = await _fetch_upstream(safe_path) |
|
|
r.raise_for_status() |
|
|
except httpx.HTTPStatusError as e: |
|
|
return PlainTextResponse(f"{e.response.status_code} upstream for /{safe_path}", |
|
|
status_code=e.response.status_code) |
|
|
except httpx.HTTPError as e: |
|
|
return PlainTextResponse(f"Upstream fetch error for /{safe_path}: {e}", |
|
|
status_code=502) |
|
|
|
|
|
media_type = _media_type_from_headers(r.headers) |
|
|
|
|
|
|
|
|
if "text/html" in media_type.lower(): |
|
|
text = _rewrite_html_for_csp(r.text) |
|
|
return HTMLResponse(content=text, status_code=r.status_code, media_type=media_type) |
|
|
|
|
|
|
|
|
headers = {h: r.headers[h] for h in ("etag", "last-modified", "cache-control") if r.headers.get(h)} |
|
|
return Response(content=r.content, status_code=r.status_code, media_type=media_type, headers=headers) |
|
|
|