proxy / app.py
dpv007's picture
Update app.py
2651019 verified
from fastapi import FastAPI, Request, Response
from fastapi.responses import HTMLResponse
import httpx
from bs4 import BeautifulSoup
from urllib.parse import urljoin, quote
app = FastAPI()
HTML_INDEX = """
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>HF Proxy Browser</title>
<style>
body { font-family: sans-serif; margin: 0; padding: 0; }
#bar {
padding: 10px;
background: #111827;
color: #e5e7eb;
display: flex;
gap: 8px;
align-items: center;
}
input[type="text"] {
flex: 1;
padding: 6px 8px;
border-radius: 4px;
border: 1px solid #4b5563;
background: #111827;
color: #e5e7eb;
}
button {
padding: 6px 12px;
border-radius: 4px;
border: none;
cursor: pointer;
}
#go {
background: #3b82f6;
color: white;
}
#frame {
width: 100%;
height: calc(100vh - 48px);
border: none;
}
</style>
</head>
<body>
<div id="bar">
<span>Proxy URL:</span>
<input id="url" type="text" placeholder="https://example.com" />
<button id="go">Go</button>
</div>
<iframe id="frame"></iframe>
<script>
const input = document.getElementById('url');
const frame = document.getElementById('frame');
const btn = document.getElementById('go');
function load() {
let url = input.value.trim();
if (!url) return;
if (!url.startsWith('http://') && !url.startsWith('https://')) {
url = 'https://' + url;
}
frame.src = '/proxy?url=' + encodeURIComponent(url);
}
btn.addEventListener('click', load);
input.addEventListener('keydown', e => {
if (e.key === 'Enter') {
e.preventDefault();
load();
}
});
</script>
</body>
</html>
"""
@app.get("/", response_class=HTMLResponse)
async def index():
return HTML_INDEX
async def fetch_url(url: str, request: Request) -> httpx.Response:
"""
Fetch target URL via httpx, forwarding some useful headers
(like Range for video/audio).
"""
client_headers = request.headers
headers = {
"User-Agent": client_headers.get(
"user-agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0 Safari/537.36",
),
"Accept": client_headers.get("accept", "*/*"),
"Accept-Language": client_headers.get("accept-language", "en-US,en;q=0.9"),
}
# Forward Range header for video/audio seeking
range_header = client_headers.get("range")
if range_header:
headers["Range"] = range_header
async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client:
resp = await client.get(url, headers=headers)
return resp
def rewrite_html(html: str, base_url: str) -> str:
"""
Rewrite links in HTML so sub-resources (scripts, css, images, video, etc.)
go through /proxy as well.
"""
soup = BeautifulSoup(html, "html.parser")
def proxify(attr: str, tag):
if attr not in tag.attrs:
return
original = tag.attrs.get(attr)
if not original:
return
absolute = urljoin(base_url, original)
tag.attrs[attr] = f"/proxy?url={quote(absolute, safe='')}"
# Tags that can contain URLs
for tag in soup.find_all(
[
"a",
"img",
"script",
"link",
"form",
"iframe",
"video",
"audio",
"source",
]
):
if tag.name in ("a", "link"):
proxify("href", tag)
if tag.name in ("img", "script", "iframe", "video", "audio", "source"):
proxify("src", tag)
if tag.name == "form":
proxify("action", tag)
# video poster attribute (thumbnail)
if tag.name == "video":
proxify("poster", tag)
# Optional: add a small banner so you know it's proxied
banner = soup.new_tag("div")
banner.string = f"Proxied via HF Space — {base_url}"
banner["style"] = (
"position:fixed;bottom:0;left:0;right:0;"
"background:#111827;color:#e5e7eb;"
"font-size:12px;padding:4px 8px;z-index:9999;"
)
if soup.body:
soup.body.append(banner)
return str(soup)
@app.get("/proxy")
async def proxy(url: str, request: Request):
"""
Reverse-proxy endpoint: /proxy?url=https://example.com
Supports:
- HTML (rewritten)
- Images
- JS / CSS
- Video / audio (with Range header forwarded)
"""
try:
upstream = await fetch_url(url, request)
except Exception as e:
return HTMLResponse(
f"<h1>Error</h1><p>Could not fetch {url}</p><pre>{e}</pre>",
status_code=502,
)
content_type = upstream.headers.get("content-type", "")
# HTML: rewrite links so further requests go via /proxy
if "text/html" in content_type:
rewritten = rewrite_html(upstream.text, base_url=url)
return HTMLResponse(content=rewritten, status_code=upstream.status_code)
# Non-HTML (images, videos, audio, JS, CSS, fonts...): pass through
safe_headers = {}
for k, v in upstream.headers.items():
lk = k.lower()
# Strip hop-by-hop and encoding headers (let FastAPI handle compression)
if lk in ("content-encoding", "transfer-encoding", "connection"):
continue
safe_headers[k] = v
return Response(
content=upstream.content,
status_code=upstream.status_code,
headers=safe_headers,
media_type=content_type or None,
)