File size: 4,905 Bytes
5a34c96 be0f3f7 2b936df b02512e 5e0ba5a be0f3f7 b02512e 2083ff0 be0f3f7 b02512e 1957ca3 b02512e 2b936df b02512e 2083ff0 3a5de32 5a34c96 b02512e 3a5de32 5a34c96 2b936df b02512e 2b936df b02512e 2b936df b02512e 5e0ba5a 5a34c96 2083ff0 b02512e 5a34c96 b02512e 2083ff0 b02512e 2083ff0 b02512e 3a5de32 2083ff0 b02512e 2083ff0 b02512e 2083ff0 be0f3f7 2083ff0 be0f3f7 2083ff0 3a5de32 b02512e 5a34c96 b02512e be0f3f7 2083ff0 be0f3f7 5a34c96 b02512e 2083ff0 b02512e be0f3f7 b02512e 2083ff0 b02512e 2083ff0 b02512e 5a34c96 b02512e 5a34c96 b02512e 2083ff0 5a34c96 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# loader.py — Public proxy loader for a private Static Space (CSP-safe + modal fix)
import os
import re
from typing import Optional
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import Response, HTMLResponse, PlainTextResponse, JSONResponse
app = FastAPI()
# -------------------------
# Config via Space Secrets:
# -------------------------
PRIVATE_SPACE_ID = os.getenv("PRIVATE_SPACE_ID")
HF_TOKEN = os.getenv("HF_TOKEN", "").strip()
TIMEOUT = float(os.getenv("UPSTREAM_TIMEOUT", "30"))
if not PRIVATE_SPACE_ID:
raise RuntimeError("PRIVATE_SPACE_ID is not set. Go to Settings → Repository secrets and add it.")
# Private static endpoint derived from owner/repo
PRIVATE_STATIC_URL = (
f"https://{PRIVATE_SPACE_ID.split('/')[0]}-"
f"{PRIVATE_SPACE_ID.split('/')[1].replace('_','-').lower()}.static.hf.space"
)
AUTH_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
# ——— Tailwind replacement (CSP-safe) ———
_TW_SCRIPT_RE = re.compile(
r'<script\s+src\s*=\s*"https://cdn\.tailwindcss\.com"\s*>\s*</script>',
flags=re.IGNORECASE
)
_TW_CSS_TAG = (
'<link rel="stylesheet" '
'href="https://cdn.jsdelivr.net/npm/tailwindcss@2.2.19/dist/tailwind.min.css">'
)
_INLINE_SAFETY_CSS = """
<style id="loader-inline-safety">
.hidden { display: none !important; }
#confirmationModal.hidden,
#confirmationModal:not(.show) { display: none !important; }
</style>
""".strip()
_HEAD_OPEN_RE = re.compile(r"<head[^>]*>", re.IGNORECASE)
def _rewrite_html_for_csp(html: str) -> str:
html2 = _TW_SCRIPT_RE.sub(_TW_CSS_TAG, html)
if _HEAD_OPEN_RE.search(html2):
html2 = _HEAD_OPEN_RE.sub(lambda m: f"{m.group(0)}\n{_INLINE_SAFETY_CSS}\n", html2, count=1)
else:
html2 = f"{_INLINE_SAFETY_CSS}\n{html2}"
return html2
def _media_type_from_headers(h: httpx.Headers) -> str:
ctype = h.get("content-type")
return ctype if ctype else "text/plain"
async def _fetch_upstream(path: Optional[str] = "", *, as_text=False) -> httpx.Response:
url = PRIVATE_STATIC_URL.rstrip("/")
if path:
url = f"{url}/{path.lstrip('/')}"
async with httpx.AsyncClient(timeout=TIMEOUT, follow_redirects=True) as client:
r = await client.get(url, headers=AUTH_HEADERS)
return r
# -------------------------
# Lightweight health & ping endpoints
# -------------------------
@app.get("/ping", response_class=PlainTextResponse)
async def ping():
"""Lightweight health check for cron-job.org or uptime services."""
return "OK"
@app.get("/health", response_class=JSONResponse)
async def health():
"""JSON status endpoint for debugging."""
return {"status": "ok", "private_static_url": PRIVATE_STATIC_URL}
# -------------------------
# Main proxy routes
# -------------------------
@app.get("/")
async def root(request: Request):
"""Serve the root HTML, rewriting Tailwind and injecting safety CSS.
Returns minimal text for cron-job.org."""
# Optional: if request from cron-job.org, skip heavy content
user_agent = request.headers.get("user-agent", "").lower()
if "cron-job" in user_agent or "uptime" in user_agent:
return PlainTextResponse("OK")
try:
r = await _fetch_upstream("index.html", as_text=True)
if r.status_code == 404:
r = await _fetch_upstream("", as_text=True)
r.raise_for_status()
except httpx.HTTPError as e:
return PlainTextResponse(f"Error fetching root: {e}", status_code=502)
media_type = _media_type_from_headers(r.headers)
text = _rewrite_html_for_csp(r.text)
return HTMLResponse(content=text, status_code=200, media_type=media_type)
@app.get("/{path:path}")
async def proxy(path: str, request: Request):
"""Proxy all static assets, rewriting HTML if needed."""
safe_path = path.strip().lstrip("/")
if not safe_path:
return await root(request)
try:
r = await _fetch_upstream(safe_path)
r.raise_for_status()
except httpx.HTTPStatusError as e:
return PlainTextResponse(f"{e.response.status_code} upstream for /{safe_path}",
status_code=e.response.status_code)
except httpx.HTTPError as e:
return PlainTextResponse(f"Upstream fetch error for /{safe_path}: {e}",
status_code=502)
media_type = _media_type_from_headers(r.headers)
# If HTML, apply the same rewrite + safety CSS
if "text/html" in media_type.lower():
text = _rewrite_html_for_csp(r.text)
return HTMLResponse(content=text, status_code=r.status_code, media_type=media_type)
# Pass-through for non-HTML
headers = {h: r.headers[h] for h in ("etag", "last-modified", "cache-control") if r.headers.get(h)}
return Response(content=r.content, status_code=r.status_code, media_type=media_type, headers=headers)
|