superproxy-all / app.py
tanbushi's picture
update
0341228
# uvicorn app:app --host 0.0.0.0 --port 7860 --reload
from fastapi import FastAPI, Request, HTTPException, Depends, status
from fastapi.responses import Response # 导入Response
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials # 导入HTTPBearer和HTTPAuthorizationCredentials
import httpx # 使用httpx替代requests,因为requests是同步的,而FastAPI是异步的
import os, json, pprint
from dotenv import load_dotenv
from enum import Enum
from supabase import create_client, Client
from datetime import datetime, timezone, timedelta, time # 导入 datetime 和 timezone
# 加载环境变量
load_dotenv()
# 从环境变量获取代理自身的API密钥
proxy_api_key = os.getenv("PROXY_API_KEY")
# Supabase 配置
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
# 初始化 Supabase 客户端
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
# 定义HTTPBearer安全方案
security = HTTPBearer()
# 枚举校验协议类型
class ProtocolType(str, Enum):
HTTP = "http"
HTTPS = "https"
# 依赖函数:验证代理的API密钥
def verify_proxy_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
if not proxy_api_key or credentials.credentials != proxy_api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or missing proxy API key",
headers={"WWW-Authenticate": "Bearer"},
)
return credentials.credentials
app = FastAPI(title="Gemini Proxy API")
@app.get("/")
def greet_json():
return {"Hello": "World!"}
# 3. 健康检查接口(可选,用于验证服务是否正常)
@app.get("/health")
async def health_check():
return {"status": "ok", "message": "Proxy service is running."}
@app.api_route("/v1/{protocol}/{host}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
# @app.api_route("/v1/{protocol}/{host}/{path:path}", methods=["POST"])
async def proxy(request: Request, protocol: ProtocolType, host:str, path: str, proxy_api_key: str = Depends(verify_proxy_api_key)): # 添加代理认证依赖
send_req = await make_send_req(request, protocol, host, path)
print(send_req)
print(send_req.get('headers'))
# 使用httpx异步客户端发起请求
async with httpx.AsyncClient() as client:
response = await client.request(
method=send_req.get('method'),
url=send_req.get('url'),
headers=send_req.get('headers'),
content=send_req.get('content'), # httpx使用content参数传递请求体
timeout=30 # 超时时间,可调整
)
try:
# 创建北京时间时区对象(UTC+8)
beijing_tz = timezone(timedelta(hours=8))
now_beijing = datetime.now(beijing_tz) # 带时区的datetime对象
current_local_time = now_beijing.isoformat()
# current_local_time = int(now_beijing.timestamp()) # 正确的时间戳
# current_local_time = datetime.now().isoformat()
supabase.table("airs_api_keys").update({"ran_at": current_local_time}).eq("id", send_req.get('api_key_info').get('api_key_id')).execute()
print('更新成功')
except Exception as e:
raise HTTPException(status_code=500, detail="更新API密钥运行时间失败!")
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
async def get_api_key_info(model: str = None):
try:
# 根据用户提供的SQL语句修改,从 'airs_model_api_keys_view' 视图中获取 'api_key'
if model:
response = supabase.from_('airs_model_api_keys_view').select('api_key', 'api_key_id').eq('model_name', model).order('api_key_ran_at', desc=False).limit(1).execute()
else:
# 如果没有提供model,则获取所有模型的api_key
raise HTTPException(status_code=400, detail="请提供模型名称!")
if response.data:
api_key_info = response.data[0]
return api_key_info
else:
return None
except Exception as e:
print(f"从Supabase获取API密钥失败: {e}")
return None
async def make_send_req(request: Request, protocol: ProtocolType, host:str, path: str):
_method = request.method
_url = f"{protocol.value}://{host}/{path}"
_headers = dict(request.headers)
# 移除FastAPI自带的Host头,避免Gemini校验报错
_headers.pop("host", None)
_headers.pop("authorization", None)
_headers.pop("user-agent", None)
_headers.pop("content-length", None)
# 将User-Agent改为curl/8.7.1,以模拟curl请求
_headers["User-Agent"] = "curl/8.7.1"
try:
# 读取客户端请求体
_content = await request.body()
content_json = json.loads(_content)
try:
_model = content_json.get("model")
except Exception as e:
print(f"获取模型名称失败: {e}")
return None
except Exception as e:
print(f"读取客户端请求体失败: {e}")
return None
_api_key_info = await get_api_key_info(_model)
if not _api_key_info:
raise HTTPException(status_code=400, detail="未找到匹配的API密钥!")
_headers["Authorization"] = f"Bearer {_api_key_info['api_key']}"
send_req={
"method": _method,
"url": _url,
"headers": _headers,
"content": _content,
"model": _model,
"api_key_info": _api_key_info
}
return send_req