ewt / main.py
sehsapneb's picture
Create main.py
ae8b72a verified
import httpx
from fastapi import FastAPI, Request
from starlette.responses import HTMLResponse, StreamingResponse
app = FastAPI()
TARGET = "https://ewt360.com" # 目标站
# -------- 工具函数 ---------------------------------------------------------
INJECT_JS = """
<script>
function clickZa() {
document.querySelectorAll('button').forEach(btn => {
if (btn.textContent.trim() === '在') btn.click();
});
}
document.addEventListener('DOMContentLoaded', clickZa);
// 处理 SPA / 懒加载:DOM 变化时再检测一次
new MutationObserver(clickZa).observe(document.documentElement, {
childList: true,
subtree: true
});
</script>
"""
def _inject(html: str) -> str:
"""把注入脚本塞到 </body> 前;若无 </body> 则直接追加"""
return html.replace("</body>", INJECT_JS + "</body>") if "</body>" in html else html + INJECT_JS
async def _fetch_upstream(req: Request, path: str) -> httpx.Response:
"""把客户端请求转发到目标站"""
async with httpx.AsyncClient(follow_redirects=True, timeout=30) as client:
upstream_url = f"{TARGET}/{path}"
headers = {k: v for k, v in req.headers.items()
if k.lower() not in {"host", "content-length", "accept-encoding", "connection"}}
return await client.request(
req.method,
upstream_url,
params=req.query_params,
content=await req.body(),
headers=headers
)
# -------- 统一代理入口 ------------------------------------------------------
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"])
async def proxy(path: str, request: Request):
upstream = await _fetch_upstream(request, path)
ct = upstream.headers.get("content-type", "")
# 对 HTML 页面做脚本注入
if "text/html" in ct:
html = _inject(upstream.text)
# 过滤掉与内容长度 / 压缩相关的头,避免浏览器警告
safe_headers = {k: v for k, v in upstream.headers.items()
if k.lower() not in {"content-length", "content-encoding"}}
return HTMLResponse(html, status_code=upstream.status_code, headers=safe_headers)
# 其他资源直接流式返回(JS / CSS / 图片等)
return StreamingResponse(
upstream.aiter_bytes(),
status_code=upstream.status_code,
media_type=ct or None,
headers={k: v for k, v in upstream.headers.items()
if k.lower() not in {"content-length", "content-encoding"}}
)