from fastapi import FastAPI, Request, Response from fastapi.responses import StreamingResponse import httpx import os import random app = FastAPI() TARGET_URL = "https://generativelanguage.googleapis.com" TIMEOUT = 120.0 ACCESS_PASSWORD = os.getenv("ACCESS_PASSWORD", "") # ============== 匿名化配置 ============== # 需要移除的泄露源 IP 的请求头 HEADERS_TO_REMOVE = { # IP 相关 "x-forwarded-for", "x-real-ip", "x-originating-ip", "x-remote-ip", "x-remote-addr", "x-client-ip", "x-host", "x-forwarded-host", "x-forwarded-proto", "x-forwarded-port", "x-forwarded-server", "forwarded", "via", "true-client-ip", "cf-connecting-ip", "cf-ipcountry", "cf-ray", "cf-visitor", # HF/云服务商特征 "x-request-id", "x-trace-id", "x-amzn-trace-id", "x-cloud-trace-context", "x-appengine-country", "x-appengine-city", "x-appengine-region", "x-vercel-id", "x-vercel-ip-country", "x-vercel-ip-city", "x-railway-request-id", # 浏览器/客户端指纹 "x-correlation-id", "x-session-id", "x-device-id", # 其他 "host", "content-length", "x-proxy-password", "connection", "keep-alive", "proxy-connection", "proxy-authorization", "te", "trailer", "transfer-encoding", "upgrade", } # 伪装的 User-Agent 池 USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) Gecko/20100101 Firefox/134.0", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.2 Safari/605.1.15", ] # 伪装的 Accept-Language 池 ACCEPT_LANGUAGES = [ "en-US,en;q=0.9", "en-GB,en;q=0.9", "en-US,en;q=0.9,zh-CN;q=0.8", "en,zh-CN;q=0.9,zh;q=0.8", ] def anonymize_headers(original_headers: dict) -> dict: """清理并匿名化请求头""" clean_headers = {} for key, value in original_headers.items(): key_lower = key.lower() # 跳过需要移除的头 if key_lower in HEADERS_TO_REMOVE: continue # 跳过任何包含 IP 特征的自定义头 if any(x in key_lower for x in ["ip", "forward", "real", "client", "trace", "ray", "cf-"]): continue clean_headers[key] = value # 强制设置干净的头 clean_headers["User-Agent"] = random.choice(USER_AGENTS) clean_headers["Accept-Language"] = random.choice(ACCEPT_LANGUAGES) clean_headers["Accept"] = "application/json, text/plain, */*" clean_headers["Accept-Encoding"] = "gzip, deflate, br" # 确保不暴露是代理请求 clean_headers.pop("Forwarded", None) clean_headers.pop("Via", None) return clean_headers def anonymize_response_headers(headers: dict) -> dict: """清理响应头,移除可能暴露代理信息的字段""" skip_headers = { "content-encoding", "transfer-encoding", "content-length", "alt-svc", # 可能暴露服务信息 "server", # 服务器信息 "x-served-by", "x-cache", "x-cache-hits", "x-timer", } clean = {} for key, value in headers.items(): if key.lower() not in skip_headers: clean[key] = value return clean @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) async def proxy(request: Request, path: str): # 密码验证 if ACCESS_PASSWORD: auth_header = request.headers.get("X-Proxy-Password", "") if auth_header != ACCESS_PASSWORD: return Response( content='{"error": "Unauthorized"}', status_code=401, media_type="application/json" ) # 构建目标 URL target_url = f"{TARGET_URL}/{path}" if request.url.query: target_url += f"?{request.url.query}" # 获取请求体 body = await request.body() # 匿名化请求头 headers = anonymize_headers(dict(request.headers)) # 使用干净的 httpx 客户端(不传递任何默认头) async with httpx.AsyncClient( timeout=TIMEOUT, follow_redirects=True, http2=True, # 使用 HTTP/2 更难被识别 ) as client: is_stream = "stream" in path.lower() or (body and b'"stream":true' in body.lower()) if is_stream: req = client.build_request( method=request.method, url=target_url, headers=headers, content=body ) response = await client.send(req, stream=True) async def generate(): async for chunk in response.aiter_bytes(): yield chunk await response.aclose() return StreamingResponse( generate(), status_code=response.status_code, headers=anonymize_response_headers(dict(response.headers)), media_type=response.headers.get("content-type") ) else: response = await client.request( method=request.method, url=target_url, headers=headers, content=body ) return Response( content=response.content, status_code=response.status_code, headers=anonymize_response_headers(dict(response.headers)), media_type=response.headers.get("content-type") ) @app.get("/") async def root(): # 不暴露任何有用信息 return {"status": "ok"} # 健康检查也保持匿名 @app.get("/health") async def health(): return {"status": "ok"}