api / app.py
roseforyou's picture
Create app.py
b39a46e verified
import httpx
import logging
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
PROXY_URL = os.getenv("PROXY_URL", "https://api.x.ai")
CONNECT_TIMEOUT = float(os.getenv("CONNECT_TIMEOUT", "5.0"))
READ_TIMEOUT = float(os.getenv("READ_TIMEOUT", "60.0"))
WRITE_TIMEOUT = float(os.getenv("WRITE_TIMEOUT", "30.0"))
POOL_TIMEOUT = float(os.getenv("POOL_TIMEOUT", "5.0"))
MAX_CONNECTIONS = int(os.getenv("MAX_CONNECTIONS", "100"))
MAX_KEEPALIVE = int(os.getenv("MAX_KEEPALIVE", "20"))
client: httpx.AsyncClient
@asynccontextmanager
async def lifespan(app: FastAPI):
global client
limits = httpx.Limits(max_connections=MAX_CONNECTIONS, max_keepalive_connections=MAX_KEEPALIVE)
timeout_config = httpx.Timeout(connect=CONNECT_TIMEOUT, read=READ_TIMEOUT, write=WRITE_TIMEOUT, pool=POOL_TIMEOUT)
client = httpx.AsyncClient(base_url=PROXY_URL, limits=limits, timeout=timeout_config, http2=True)
logger.info(f"启动 httpx 客户端,目标: {PROXY_URL}")
yield
await client.aclose()
logger.info("关闭 httpx 客户端")
app = FastAPI(lifespan=lifespan)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
async def proxy(request: Request, path: str):
url = httpx.URL(path=f"/{path}", query=request.url.query.encode("utf-8"))
target_url_log = client.base_url.join(url)
logger.info(f"收到请求: {request.method} {request.url}")
logger.info(f"转发目标: {request.method} {target_url_log}")
headers_to_forward = {
key: value
for key, value in request.headers.items()
if key.lower() not in ["host", "connection", "transfer-encoding", "content-length"]
}
request_body = await request.body()
try:
proxy_request = client.build_request(
method=request.method,
url=url,
headers=headers_to_forward,
content=request_body,
)
response = await client.send(proxy_request, stream=True)
except httpx.TimeoutException:
logger.error(f"请求 Proxy API 超时: {request.method} {target_url_log}")
error_content = b'{"error": {"code": 504, "message": "Gateway Timeout"}}'
return StreamingResponse(content=error_content, status_code=504, media_type="application/json")
except httpx.RequestError as e:
logger.error(f"请求 Proxy API 出错: {request.method} {target_url_log}, Error: {e.__class__.__name__} - {e}")
error_content = b'{"error": {"code": 502, "message": "Bad Gateway"}}'
return StreamingResponse(content=error_content, status_code=502, media_type="application/json")
except Exception as e:
logger.error(f"代理内部错误: {request.method} {target_url_log}, Error: {e.__class__.__name__} - {e}", exc_info=True)
error_content = b'{"error": {"code": 500, "message": "Internal Server Error"}}'
return StreamingResponse(content=error_content, status_code=500, media_type="application/json")
response_headers_to_forward = {
key: value
for key, value in response.headers.items()
if key.lower() not in ["content-encoding", "transfer-encoding", "connection"]
}
return StreamingResponse(
content=response.aiter_raw(),
status_code=response.status_code,
headers=response_headers_to_forward,
media_type=response.headers.get("content-type")
)
@app.get("/")
async def root():
return {"message": "Generic Proxy is running. Use /{path} to forward requests."}
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "7860"))
host = os.getenv("HOST", "0.0.0.0")
log_level = os.getenv("LOG_LEVEL", "info").lower()
uvicorn.run(app, host=host, port=port, log_level=log_level)