| | from fastapi import FastAPI, Request, Response |
| | from fastapi.responses import StreamingResponse |
| | from fastapi.middleware.cors import CORSMiddleware |
| | import httpx |
| | import asyncio |
| | from typing import Optional |
| |
|
| | app = FastAPI() |
| |
|
| | |
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | |
| | TARGET_URL = "http://beibeioo.top" |
| |
|
| | |
| | async def get_http_client(): |
| | return httpx.AsyncClient( |
| | timeout=30.0, |
| | follow_redirects=True, |
| | http2=True |
| | ) |
| |
|
| | @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH"]) |
| | async def proxy(path: str, request: Request): |
| | try: |
| | |
| | url = f"{TARGET_URL}/{path}" |
| | |
| | |
| | headers = dict(request.headers) |
| | |
| | headers.pop('host', None) |
| | headers.pop('connection', None) |
| | headers.pop('content-length', None) |
| | headers.pop('transfer-encoding', None) |
| | |
| | |
| | if 'referer' in headers: |
| | headers['referer'] = headers['referer'].replace(str(request.base_url), TARGET_URL) |
| | |
| | |
| | params = dict(request.query_params) |
| | |
| | |
| | body = await request.body() |
| | |
| | async with await get_http_client() as client: |
| | response = await client.request( |
| | method=request.method, |
| | url=url, |
| | params=params, |
| | headers=headers, |
| | content=body if body else None, |
| | ) |
| | |
| | |
| | response_headers = dict(response.headers) |
| | |
| | response_headers.pop('transfer-encoding', None) |
| | response_headers.pop('content-encoding', None) |
| | |
| | |
| | for key, value in response_headers.items(): |
| | if isinstance(value, str) and TARGET_URL in value: |
| | response_headers[key] = value.replace(TARGET_URL, str(request.base_url).rstrip('/')) |
| | |
| | |
| | return StreamingResponse( |
| | response.iter_bytes(), |
| | status_code=response.status_code, |
| | headers=response_headers, |
| | media_type=response.headers.get('content-type') |
| | ) |
| | |
| | except httpx.TimeoutException: |
| | return Response( |
| | content="请求超时", |
| | status_code=504, |
| | media_type="text/plain" |
| | ) |
| | except httpx.RequestError: |
| | return Response( |
| | content="无法连接到目标服务器", |
| | status_code=502, |
| | media_type="text/plain" |
| | ) |
| | except Exception as e: |
| | return Response( |
| | content=f"服务器错误: {str(e)}", |
| | status_code=500, |
| | media_type="text/plain" |
| | ) |
| |
|
| | @app.get("/healthcheck") |
| | async def healthcheck(): |
| | """健康检查端点""" |
| | return {"status": "healthy"} |
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| | uvicorn.run(app, host="0.0.0.0", port=7860) |