grouter / app.py
StarrySkyWorld's picture
Update app.py
2d3d21c verified
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
import httpx
import os
import random
app = FastAPI()
TARGET_URL = "https://generativelanguage.googleapis.com"
TIMEOUT = 120.0
ACCESS_PASSWORD = os.getenv("ACCESS_PASSWORD", "")
# ============== 匿名化配置 ==============
# 需要移除的泄露源 IP 的请求头
HEADERS_TO_REMOVE = {
# IP 相关
"x-forwarded-for",
"x-real-ip",
"x-originating-ip",
"x-remote-ip",
"x-remote-addr",
"x-client-ip",
"x-host",
"x-forwarded-host",
"x-forwarded-proto",
"x-forwarded-port",
"x-forwarded-server",
"forwarded",
"via",
"true-client-ip",
"cf-connecting-ip",
"cf-ipcountry",
"cf-ray",
"cf-visitor",
# HF/云服务商特征
"x-request-id",
"x-trace-id",
"x-amzn-trace-id",
"x-cloud-trace-context",
"x-appengine-country",
"x-appengine-city",
"x-appengine-region",
"x-vercel-id",
"x-vercel-ip-country",
"x-vercel-ip-city",
"x-railway-request-id",
# 浏览器/客户端指纹
"x-correlation-id",
"x-session-id",
"x-device-id",
# 其他
"host",
"content-length",
"x-proxy-password",
"connection",
"keep-alive",
"proxy-connection",
"proxy-authorization",
"te",
"trailer",
"transfer-encoding",
"upgrade",
}
# 伪装的 User-Agent 池
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) Gecko/20100101 Firefox/134.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Safari/605.1.15",
]
# 伪装的 Accept-Language 池
ACCEPT_LANGUAGES = [
"en-US,en;q=0.9",
"en-GB,en;q=0.9",
"en-US,en;q=0.9,zh-CN;q=0.8",
"en,zh-CN;q=0.9,zh;q=0.8",
]
def anonymize_headers(original_headers: dict) -> dict:
"""清理并匿名化请求头"""
clean_headers = {}
for key, value in original_headers.items():
key_lower = key.lower()
# 跳过需要移除的头
if key_lower in HEADERS_TO_REMOVE:
continue
# 跳过任何包含 IP 特征的自定义头
if any(x in key_lower for x in ["ip", "forward", "real", "client", "trace", "ray", "cf-"]):
continue
clean_headers[key] = value
# 强制设置干净的头
clean_headers["User-Agent"] = random.choice(USER_AGENTS)
clean_headers["Accept-Language"] = random.choice(ACCEPT_LANGUAGES)
clean_headers["Accept"] = "application/json, text/plain, */*"
clean_headers["Accept-Encoding"] = "gzip, deflate, br"
# 确保不暴露是代理请求
clean_headers.pop("Forwarded", None)
clean_headers.pop("Via", None)
return clean_headers
def anonymize_response_headers(headers: dict) -> dict:
"""清理响应头,移除可能暴露代理信息的字段"""
skip_headers = {
"content-encoding",
"transfer-encoding",
"content-length",
"alt-svc", # 可能暴露服务信息
"server", # 服务器信息
"x-served-by",
"x-cache",
"x-cache-hits",
"x-timer",
}
clean = {}
for key, value in headers.items():
if key.lower() not in skip_headers:
clean[key] = value
return clean
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
async def proxy(request: Request, path: str):
# 密码验证
if ACCESS_PASSWORD:
auth_header = request.headers.get("X-Proxy-Password", "")
if auth_header != ACCESS_PASSWORD:
return Response(
content='{"error": "Unauthorized"}',
status_code=401,
media_type="application/json"
)
# 构建目标 URL
target_url = f"{TARGET_URL}/{path}"
if request.url.query:
target_url += f"?{request.url.query}"
# 获取请求体
body = await request.body()
# 匿名化请求头
headers = anonymize_headers(dict(request.headers))
# 使用干净的 httpx 客户端(不传递任何默认头)
async with httpx.AsyncClient(
timeout=TIMEOUT,
follow_redirects=True,
http2=True, # 使用 HTTP/2 更难被识别
) as client:
is_stream = "stream" in path.lower() or (body and b'"stream":true' in body.lower())
if is_stream:
req = client.build_request(
method=request.method,
url=target_url,
headers=headers,
content=body
)
response = await client.send(req, stream=True)
async def generate():
async for chunk in response.aiter_bytes():
yield chunk
await response.aclose()
return StreamingResponse(
generate(),
status_code=response.status_code,
headers=anonymize_response_headers(dict(response.headers)),
media_type=response.headers.get("content-type")
)
else:
response = await client.request(
method=request.method,
url=target_url,
headers=headers,
content=body
)
return Response(
content=response.content,
status_code=response.status_code,
headers=anonymize_response_headers(dict(response.headers)),
media_type=response.headers.get("content-type")
)
@app.get("/")
async def root():
# 不暴露任何有用信息
return {"status": "ok"}
# 健康检查也保持匿名
@app.get("/health")
async def health():
return {"status": "ok"}