|
|
import httpx |
|
|
from fastapi import FastAPI, Request |
|
|
from starlette.responses import HTMLResponse, StreamingResponse |
|
|
|
|
|
app = FastAPI() |
|
|
TARGET = "https://ewt360.com" |
|
|
|
|
|
|
|
|
INJECT_JS = """ |
|
|
<script> |
|
|
function clickZa() { |
|
|
document.querySelectorAll('button').forEach(btn => { |
|
|
if (btn.textContent.trim() === '在') btn.click(); |
|
|
}); |
|
|
} |
|
|
document.addEventListener('DOMContentLoaded', clickZa); |
|
|
|
|
|
// 处理 SPA / 懒加载:DOM 变化时再检测一次 |
|
|
new MutationObserver(clickZa).observe(document.documentElement, { |
|
|
childList: true, |
|
|
subtree: true |
|
|
}); |
|
|
</script> |
|
|
""" |
|
|
|
|
|
def _inject(html: str) -> str: |
|
|
"""把注入脚本塞到 </body> 前;若无 </body> 则直接追加""" |
|
|
return html.replace("</body>", INJECT_JS + "</body>") if "</body>" in html else html + INJECT_JS |
|
|
|
|
|
async def _fetch_upstream(req: Request, path: str) -> httpx.Response: |
|
|
"""把客户端请求转发到目标站""" |
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client: |
|
|
upstream_url = f"{TARGET}/{path}" |
|
|
headers = {k: v for k, v in req.headers.items() |
|
|
if k.lower() not in {"host", "content-length", "accept-encoding", "connection"}} |
|
|
return await client.request( |
|
|
req.method, |
|
|
upstream_url, |
|
|
params=req.query_params, |
|
|
content=await req.body(), |
|
|
headers=headers |
|
|
) |
|
|
|
|
|
|
|
|
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) |
|
|
async def proxy(path: str, request: Request): |
|
|
upstream = await _fetch_upstream(request, path) |
|
|
ct = upstream.headers.get("content-type", "") |
|
|
|
|
|
|
|
|
if "text/html" in ct: |
|
|
html = _inject(upstream.text) |
|
|
|
|
|
safe_headers = {k: v for k, v in upstream.headers.items() |
|
|
if k.lower() not in {"content-length", "content-encoding"}} |
|
|
return HTMLResponse(html, status_code=upstream.status_code, headers=safe_headers) |
|
|
|
|
|
|
|
|
return StreamingResponse( |
|
|
upstream.aiter_bytes(), |
|
|
status_code=upstream.status_code, |
|
|
media_type=ct or None, |
|
|
headers={k: v for k, v in upstream.headers.items() |
|
|
if k.lower() not in {"content-length", "content-encoding"}} |
|
|
) |
|
|
|