import httpx import logging import os from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) PROXY_URL = os.getenv("PROXY_URL", "https://api.x.ai") CONNECT_TIMEOUT = float(os.getenv("CONNECT_TIMEOUT", "5.0")) READ_TIMEOUT = float(os.getenv("READ_TIMEOUT", "60.0")) WRITE_TIMEOUT = float(os.getenv("WRITE_TIMEOUT", "30.0")) POOL_TIMEOUT = float(os.getenv("POOL_TIMEOUT", "5.0")) MAX_CONNECTIONS = int(os.getenv("MAX_CONNECTIONS", "100")) MAX_KEEPALIVE = int(os.getenv("MAX_KEEPALIVE", "20")) client: httpx.AsyncClient @asynccontextmanager async def lifespan(app: FastAPI): global client limits = httpx.Limits(max_connections=MAX_CONNECTIONS, max_keepalive_connections=MAX_KEEPALIVE) timeout_config = httpx.Timeout(connect=CONNECT_TIMEOUT, read=READ_TIMEOUT, write=WRITE_TIMEOUT, pool=POOL_TIMEOUT) client = httpx.AsyncClient(base_url=PROXY_URL, limits=limits, timeout=timeout_config, http2=True) logger.info(f"启动 httpx 客户端,目标: {PROXY_URL}") yield await client.aclose() logger.info("关闭 httpx 客户端") app = FastAPI(lifespan=lifespan) @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) async def proxy(request: Request, path: str): url = httpx.URL(path=f"/{path}", query=request.url.query.encode("utf-8")) target_url_log = client.base_url.join(url) logger.info(f"收到请求: {request.method} {request.url}") logger.info(f"转发目标: {request.method} {target_url_log}") headers_to_forward = { key: value for key, value in request.headers.items() if key.lower() not in ["host", "connection", "transfer-encoding", "content-length"] } request_body = await request.body() try: proxy_request = client.build_request( method=request.method, url=url, headers=headers_to_forward, content=request_body, ) response = await client.send(proxy_request, stream=True) except httpx.TimeoutException: logger.error(f"请求 Proxy API 超时: {request.method} {target_url_log}") error_content = b'{"error": {"code": 504, "message": "Gateway Timeout"}}' return StreamingResponse(content=error_content, status_code=504, media_type="application/json") except httpx.RequestError as e: logger.error(f"请求 Proxy API 出错: {request.method} {target_url_log}, Error: {e.__class__.__name__} - {e}") error_content = b'{"error": {"code": 502, "message": "Bad Gateway"}}' return StreamingResponse(content=error_content, status_code=502, media_type="application/json") except Exception as e: logger.error(f"代理内部错误: {request.method} {target_url_log}, Error: {e.__class__.__name__} - {e}", exc_info=True) error_content = b'{"error": {"code": 500, "message": "Internal Server Error"}}' return StreamingResponse(content=error_content, status_code=500, media_type="application/json") response_headers_to_forward = { key: value for key, value in response.headers.items() if key.lower() not in ["content-encoding", "transfer-encoding", "connection"] } return StreamingResponse( content=response.aiter_raw(), status_code=response.status_code, headers=response_headers_to_forward, media_type=response.headers.get("content-type") ) @app.get("/") async def root(): return {"message": "Generic Proxy is running. Use /{path} to forward requests."} if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "7860")) host = os.getenv("HOST", "0.0.0.0") log_level = os.getenv("LOG_LEVEL", "info").lower() uvicorn.run(app, host=host, port=port, log_level=log_level)