# uvicorn app:app --host 0.0.0.0 --port 7860 --reload from fastapi import FastAPI, Request, HTTPException, Depends, status from fastapi.responses import Response # 导入Response from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials # 导入HTTPBearer和HTTPAuthorizationCredentials import httpx # 使用httpx替代requests,因为requests是同步的,而FastAPI是异步的 import os, json, pprint from dotenv import load_dotenv from enum import Enum from supabase import create_client, Client from datetime import datetime, timezone, timedelta, time # 导入 datetime 和 timezone # 加载环境变量 load_dotenv() # 从环境变量获取代理自身的API密钥 proxy_api_key = os.getenv("PROXY_API_KEY") # Supabase 配置 SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") # 初始化 Supabase 客户端 supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) # 定义HTTPBearer安全方案 security = HTTPBearer() # 枚举校验协议类型 class ProtocolType(str, Enum): HTTP = "http" HTTPS = "https" # 依赖函数:验证代理的API密钥 def verify_proxy_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): if not proxy_api_key or credentials.credentials != proxy_api_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing proxy API key", headers={"WWW-Authenticate": "Bearer"}, ) return credentials.credentials app = FastAPI(title="Gemini Proxy API") @app.get("/") def greet_json(): return {"Hello": "World!"} # 3. 健康检查接口(可选,用于验证服务是否正常) @app.get("/health") async def health_check(): return {"status": "ok", "message": "Proxy service is running."} @app.api_route("/v1/{protocol}/{host}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) # @app.api_route("/v1/{protocol}/{host}/{path:path}", methods=["POST"]) async def proxy(request: Request, protocol: ProtocolType, host:str, path: str, proxy_api_key: str = Depends(verify_proxy_api_key)): # 添加代理认证依赖 send_req = await make_send_req(request, protocol, host, path) print(send_req) print(send_req.get('headers')) # 使用httpx异步客户端发起请求 async with httpx.AsyncClient() as client: response = await client.request( method=send_req.get('method'), url=send_req.get('url'), headers=send_req.get('headers'), content=send_req.get('content'), # httpx使用content参数传递请求体 timeout=30 # 超时时间,可调整 ) try: # 创建北京时间时区对象(UTC+8) beijing_tz = timezone(timedelta(hours=8)) now_beijing = datetime.now(beijing_tz) # 带时区的datetime对象 current_local_time = now_beijing.isoformat() # current_local_time = int(now_beijing.timestamp()) # 正确的时间戳 # current_local_time = datetime.now().isoformat() supabase.table("airs_api_keys").update({"ran_at": current_local_time}).eq("id", send_req.get('api_key_info').get('api_key_id')).execute() print('更新成功') except Exception as e: raise HTTPException(status_code=500, detail="更新API密钥运行时间失败!") return Response( content=response.content, status_code=response.status_code, headers=dict(response.headers) ) async def get_api_key_info(model: str = None): try: # 根据用户提供的SQL语句修改,从 'airs_model_api_keys_view' 视图中获取 'api_key' if model: response = supabase.from_('airs_model_api_keys_view').select('api_key', 'api_key_id').eq('model_name', model).order('api_key_ran_at', desc=False).limit(1).execute() else: # 如果没有提供model,则获取所有模型的api_key raise HTTPException(status_code=400, detail="请提供模型名称!") if response.data: api_key_info = response.data[0] return api_key_info else: return None except Exception as e: print(f"从Supabase获取API密钥失败: {e}") return None async def make_send_req(request: Request, protocol: ProtocolType, host:str, path: str): _method = request.method _url = f"{protocol.value}://{host}/{path}" _headers = dict(request.headers) # 移除FastAPI自带的Host头,避免Gemini校验报错 _headers.pop("host", None) _headers.pop("authorization", None) _headers.pop("user-agent", None) _headers.pop("content-length", None) # 将User-Agent改为curl/8.7.1,以模拟curl请求 _headers["User-Agent"] = "curl/8.7.1" try: # 读取客户端请求体 _content = await request.body() content_json = json.loads(_content) try: _model = content_json.get("model") except Exception as e: print(f"获取模型名称失败: {e}") return None except Exception as e: print(f"读取客户端请求体失败: {e}") return None _api_key_info = await get_api_key_info(_model) if not _api_key_info: raise HTTPException(status_code=400, detail="未找到匹配的API密钥!") _headers["Authorization"] = f"Bearer {_api_key_info['api_key']}" send_req={ "method": _method, "url": _url, "headers": _headers, "content": _content, "model": _model, "api_key_info": _api_key_info } return send_req