superproxy-1 / proxy.py
tanbushi's picture
update
faaab61
import httpx
import asyncio
from starlette.responses import Response
from api_key_sb import get_api_key_info, update_api_key_ran_at, supabase as sb_client # 移除 get_supabase_client
# from supabase import Client
from dotenv import load_dotenv
import os
from datetime import timezone, timedelta, datetime
from fastapi import FastAPI, Request, HTTPException, Depends, status
import httpx
import asyncio
from starlette.responses import Response
from api_key_sb import get_api_key_info, update_api_key_ran_at, supabase as sb_client
from dotenv import load_dotenv
import os
from datetime import timezone, timedelta, datetime
from fastapi import FastAPI, Request, HTTPException, status
import logging
import json
load_dotenv()
logger = logging.getLogger(__name__)
def _create_error_response(status_code: int, detail: str, error_type: str = "ProxyError"):
"""Creates a standardized error response."""
return Response(
content=json.dumps({"detail": detail, "error_type": error_type}),
status_code=status_code,
media_type="application/json"
)
async def do_proxy(url: str, method: str, headers: dict, content: str, max_retries: int = 3):
logger.info(f"Proxy service started for URL: {url}")
client = None
try:
async with httpx.AsyncClient() as client:
for attempt in range(max_retries):
model_name = None
try:
content_json = json.loads(content)
model_name = content_json.get('model')
if model_name:
logger.info(f"从内容中提取的模型名称: {model_name}")
else:
logger.warning("内容中未找到 'model' 键。")
except json.JSONDecodeError:
logger.warning("内容不是有效的 JSON 格式,无法提取 'model' 键。")
except Exception as e:
logger.error(f"提取模型名称时发生错误: {str(e)}")
# api_key_info = await get_api_key_info('chat', model_name)
api_key_info = await get_api_key_info(model_name)
if not api_key_info:
return _create_error_response(status.HTTP_500_INTERNAL_SERVER_ERROR, "无法获取API密钥信息", "APIKeyError")
api_key = api_key_info.get('api_key')
api_key_id = api_key_info.get('api_key_id')
if not api_key:
return _create_error_response(status.HTTP_500_INTERNAL_SERVER_ERROR, "API密钥为空", "APIKeyError")
api_key_show = api_key[:5]+'*****'+api_key[-5:]
logger.info(f"使用API密钥:{api_key_show}")
headers["Authorization"] = f"Bearer {api_key}"
response = await client.request(
method=method,
url=url,
headers=headers,
content=content,
timeout=30
)
if response.status_code == 429:
retry_after = response.headers.get("Retry-After", "5")
wait_time = int(retry_after) + attempt * 2
logger.warning(f"⚠️ 429错误!{wait_time}秒后重试 (尝试:{attempt+1})")
await asyncio.sleep(wait_time)
try:
beijing_tz = timezone(timedelta(hours=8))
now_beijing = datetime.now(beijing_tz)
future_time = now_beijing + timedelta(days=1)
future_timestamp = future_time.isoformat()
logger.info(f'future_timestamp: {future_timestamp}')
await update_api_key_ran_at(api_key_id, future_time)
logger.info('API密钥运行时间更新成功')
except HTTPException as e:
logger.error(f"更新API密钥运行时间失败!错误信息:{e.detail}")
return _create_error_response(e.status_code, e.detail, "APIKeyUpdateError")
except Exception as e:
logger.error(f"更新API密钥运行时间失败!错误信息:{str(e)}")
return _create_error_response(status.HTTP_500_INTERNAL_SERVER_ERROR, f"更新API密钥运行时间失败!错误信息:{str(e)}", "APIKeyUpdateError")
continue
response.raise_for_status()
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
except httpx.HTTPStatusError as e:
logger.error(f"🚨 服务器错误 {e.response.status_code}: {e.request.url}")
return _create_error_response(e.response.status_code, str(e), "HTTPStatusError")
except HTTPException as e:
logger.error(f"🔥 HTTPException: {e.detail}")
return _create_error_response(e.status_code, e.detail, "HTTPException")
except Exception as e:
print('请检查当前是否运行在本机开发环境')
logger.error(f"🔥 致命错误: {type(e).__name__}: {str(e)}")
return _create_error_response(status.HTTP_500_INTERNAL_SERVER_ERROR, f"内部服务器错误: {str(e)}", "InternalServerError")