GAP / app /api /endpoints.py
misonL's picture
Upload 52 files
e82bac2 verified
Raw
History Blame Contribute Delete
11.1 kB
import asyncio # 导入 asyncio 模块
import json # 导入 json 模块
import logging # 导入 logging 模块
import time # 导入 time 模块,用于 /v1/models 端点生成时间戳
from typing import List, Dict, Any, Optional # 导入类型提示
from fastapi import APIRouter, HTTPException, Request, Depends, status # 导入 FastAPI 相关组件:路由、HTTP异常、请求对象、依赖注入、状态码
from fastapi.responses import StreamingResponse # 导入流式响应对象
from app import config # 导入应用配置模块
# 从其他模块导入必要的组件
from app.api.models import ChatCompletionRequest, ChatCompletionResponse, ModelList # 导入 API 请求和响应模型
from app.core.services.gemini import GeminiClient # 导入 Gemini 客户端类 (新路径)
from app.core.keys.manager import APIKeyManager # 导入类型 (新路径)
import httpx # 导入 httpx 用于类型提示
from app.api.middleware import verify_proxy_key # 导入代理密钥验证中间件/依赖项
# 导入处理器函数
from app.core.processing.main_handler import process_request # 导入核心请求处理函数 (新路径)
# 导入依赖注入函数
from app.core.dependencies import get_key_manager, get_http_client # 导入获取 Key Manager 和 HTTP Client 的依赖函数
# --- 此模块内需要的全局变量 ---
logger = logging.getLogger('my_logger') # 获取日志记录器实例
router = APIRouter() # 创建一个 FastAPI APIRouter 实例,用于定义 API 路由
# 导入缓存管理器和管理员令牌验证依赖
from app.core.cache.manager import CacheManager # 导入 CacheManager (新路径)
from app.core.dependencies import verify_admin_token
# 导入缓存相关的模型
from app.api.models import CachedContentEntry, CacheListResponse
@router.get("/v1/models", response_model=ModelList) # 定义 GET /v1/models 端点,响应模型为 ModelList
async def list_models(
key_manager: APIKeyManager = Depends(get_key_manager), # 注入 Key Manager
http_client: httpx.AsyncClient = Depends(get_http_client) # 注入 HTTP Client
):
"""
处理获取可用模型列表的 GET 请求。
"""
active_keys_count = key_manager.get_active_keys_count() # 获取当前有效的 API 密钥数量
# 如果 GeminiClient.AVAILABLE_MODELS 为空,则确保填充它
if not GeminiClient.AVAILABLE_MODELS and active_keys_count > 0: # 如果可用模型列表为空且有活跃 Key
logger.info("首次请求模型列表,尝试获取...") # 首次请求模型列表,尝试获取
try:
key_to_use = None # 初始化要使用的 Key
with key_manager.keys_lock: # 使用锁安全地访问密钥列表
if key_manager.api_keys: key_to_use = key_manager.api_keys[0] # 如果有有效密钥,选择第一个用于获取模型列表
if key_to_use: # 如果找到了要使用的 Key (If a key to use is found)
# 使用注入的 http_client 调用静态方法
all_models = await GeminiClient.list_available_models(key_to_use, http_client) # 调用 Gemini 客户端获取所有可用模型
# 确保 AVAILABLE_MODELS 被正确更新
GeminiClient.AVAILABLE_MODELS = [model.replace("models/", "") for model in all_models] # 清理模型名称(移除 "models/" 前缀)并存储到类变量
logger.info(f"成功获取可用模型: {GeminiClient.AVAILABLE_MODELS}") # 成功获取可用模型
else: logger.error("无法找到有效 Key 来获取模型列表。") # 无法找到有效 Key 来获取模型列表
except Exception as e:
logger.error(f"获取模型列表失败: {e}") # 记录获取模型列表失败错误
GeminiClient.AVAILABLE_MODELS = [] # 获取模型列表失败时,重置为空列表
# 使用标准日志记录
logger.info("接收到列出模型的请求", extra={'request_type': 'list_models', 'status_code': 200}) # 接收到列出模型的请求
# 返回列表,确保使用可能已更新的 AVAILABLE_MODELS
return ModelList(data=[{"id": model, "object": "model", "created": int(time.time()), "owned_by": "organization-owner"} for model in GeminiClient.AVAILABLE_MODELS]) # 构建并返回符合 OpenAI API 格式的模型列表响应
@router.post("/v1/chat/completions", response_model=ChatCompletionResponse, status_code=status.HTTP_200_OK) # 定义 POST /v1/chat/completions 端点,响应模型为 ChatCompletionResponse,状态码为 200
async def chat_completions(
request_data: ChatCompletionRequest, # 请求体数据,FastAPI 会自动解析 JSON 并验证其结构是否符合 ChatCompletionRequest 模型
request: Request, # FastAPI 的原始 Request 对象,包含请求头、客户端 IP 等信息
# 使用新的代理 Key 验证依赖,并将验证通过的 Key 和配置注入
auth_data: Dict[str, Any] = Depends(verify_proxy_key), # 依赖注入:调用 verify_proxy_key 函数进行验证,并将验证通过的代理密钥和配置注入到此参数
key_manager: APIKeyManager = Depends(get_key_manager), # 注入 Key Manager
http_client: httpx.AsyncClient = Depends(get_http_client) # 注入 HTTP Client
):
"""
处理聊天补全的 POST 请求(流式和非流式)。
"""
request_type = 'stream' if request_data.stream else 'non-stream' # 判断请求是流式还是非流式
# 调用 request_processor 中的核心处理逻辑
# 将验证通过的 auth_data 和注入的实例传递给处理器函数
response = await process_request( # 调用核心处理函数处理请求
chat_request=request_data,
http_request=request,
request_type=request_type,
auth_data=auth_data, # 传递认证数据
key_manager=key_manager, # 传递 Key Manager 实例
http_client=http_client # 传递 HTTP Client 实例
)
if response is None:
# process_request 理想情况下应该在无法返回时引发异常
# 有效响应(例如,客户端在响应开始前断开连接)。
# 如果它返回 None,则意味着这里需要处理一个问题。
logger.error("process_request 意外返回 None。") # process_request 意外返回 None
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="请求处理中断或失败") # 引发 500 异常
return response # 返回处理器生成的响应(可能是 StreamingResponse 或 JSONResponse)
@router.get("/debug/config", include_in_schema=False)
async def debug_config():
"""
调试接口:返回应用程序读取到的 WEB_UI_PASSWORDS 配置。
"""
return {"WEB_UI_PASSWORDS": config.WEB_UI_PASSWORDS}
# 缓存管理端点 (需要管理员令牌)
@router.get("/cache", response_model=CacheListResponse, dependencies=[Depends(verify_admin_token)])
async def list_caches():
"""
获取缓存列表。
注意: app/core/cache_manager.py 中的列表功能尚未实现,此处返回模拟数据。
"""
logger.info("接收到获取缓存列表的请求")
# TODO: 调用 cache_manager.list_caches() (如果实现的话)
# 目前返回模拟数据
mock_caches = [
CachedContentEntry(
id="mock-cache-id-1",
content_hash="mockhash123",
associated_key_id="mock-key-1",
created_at=datetime.utcnow(),
expires_at=datetime.utcnow() + timedelta(days=1)
),
CachedContentEntry(
id="mock-cache-id-2",
content_hash="mockhash456",
associated_key_id="mock-key-2",
created_at=datetime.utcnow() - timedelta(hours=1),
expires_at=datetime.utcnow() + timedelta(hours=23)
)
]
return CacheListResponse(total=len(mock_caches), caches=mock_caches)
@router.get("/cache/{cache_id}", response_model=CachedContentEntry, dependencies=[Depends(verify_admin_token)])
async def get_cache_details(cache_id: str):
"""
根据缓存 ID 获取特定缓存的详细信息。
注意: app/core/cache_manager.py 中的获取详细信息功能尚未实现,此处返回模拟数据。
"""
logger.info(f"接收到获取缓存详细信息的请求,ID: {cache_id}")
# TODO: 调用 cache_manager.get_cache_details(cache_id) (如果实现的话)
# 目前返回模拟数据或 404
if cache_id == "mock-cache-id-1":
return CachedContentEntry(
id="mock-cache-id-1",
content_hash="mockhash123",
associated_key_id="mock-key-1",
created_at=datetime.utcnow(),
expires_at=datetime.utcnow() + timedelta(days=1)
)
elif cache_id == "mock-cache-id-2":
return CachedContentEntry(
id="mock-cache-id-2",
content_hash="mockhash456",
associated_key_id="mock-key-2",
created_at=datetime.utcnow() - timedelta(hours=1),
expires_at=datetime.utcnow() + timedelta(hours=23)
)
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="缓存未找到")
@router.delete("/cache/{cache_id}", dependencies=[Depends(verify_admin_token)])
async def delete_single_cache(cache_id: str):
"""
根据缓存 ID 删除特定缓存。
"""
logger.info(f"接收到删除缓存的请求,ID: {cache_id}")
success = await cache_manager.delete_cache(cache_id)
if success:
return {"message": f"缓存 {cache_id} 删除成功"}
else:
# 即使 cache_manager.delete_cache 返回 False,也可能只是因为缓存不存在或内部错误
# 根据 cache_manager 的当前实现,它总是返回 False 并记录警告
# 在实际实现中,这里需要更精细的错误处理
logger.warning(f"尝试删除缓存 {cache_id} 失败或 cache_manager 未完全实现")
# 为了不暴露内部实现细节,即使失败也返回成功,或者根据实际错误类型返回 404/500
# 这里暂时返回成功,假设调用了 delete_cache 函数
return {"message": f"尝试删除缓存 {cache_id},请检查日志确认结果"}
@router.delete("/cache", dependencies=[Depends(verify_admin_token)])
async def clear_all_caches():
"""
清空所有缓存。
注意: app/core/cache_manager.py 中的清空功能尚未实现,此处调用 invalidate_expired_caches 作为占位。
"""
logger.info("接收到清空所有缓存的请求")
# TODO: 调用 cache_manager.clear_all_caches() (如果实现的话)
# 目前调用 invalidate_expired_caches 作为占位
await cache_manager.invalidate_expired_caches()
# 同样,根据 cache_manager 的当前实现,这里无法确认是否真的清空了所有缓存
logger.warning("清空所有缓存的功能未完全实现,调用了 invalidate_expired_caches 作为占位")
return {"message": "尝试清空所有缓存,请检查日志确认结果"}
# 导入 datetime 和 timedelta 用于模拟数据
from datetime import datetime, timedelta