Spaces:
Paused
Paused
| """认证模块 - API令牌验证""" | |
| from typing import Optional, Dict | |
| from fastapi import Depends, HTTPException | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from app.core.config import setting | |
| from app.core.logger import logger | |
| from app.services.api_keys import api_key_manager | |
| # Bearer安全方案 | |
| security = HTTPBearer(auto_error=False) | |
| def _build_error(message: str, code: str = "invalid_token") -> dict: | |
| """构建认证错误""" | |
| return { | |
| "error": { | |
| "message": message, | |
| "type": "authentication_error", | |
| "code": code | |
| } | |
| } | |
| class AuthManager: | |
| """认证管理器 - 验证API令牌""" | |
| async def verify(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Dict: | |
| """验证令牌,返回 Key 信息""" | |
| api_key = setting.grok_config.get("api_key") | |
| # 初始化检查 | |
| if not hasattr(api_key_manager, '_keys'): | |
| await api_key_manager.init() | |
| # 检查令牌 | |
| if not credentials: | |
| # 如果未设置全局Key且没有多Key,则跳过(开发模式) | |
| if not api_key and not api_key_manager.get_all_keys(): | |
| logger.debug("[Auth] 未设置API_KEY,跳过验证") | |
| return {"key": None, "name": "Anonymous"} | |
| raise HTTPException( | |
| status_code=401, | |
| detail=_build_error("缺少认证令牌", "missing_token") | |
| ) | |
| token = credentials.credentials | |
| # 验证令牌 (支持多 Key) | |
| key_info = api_key_manager.validate_key(token) | |
| if key_info: | |
| return key_info | |
| raise HTTPException( | |
| status_code=401, | |
| detail=_build_error(f"令牌无效,长度: {len(token)}", "invalid_token") | |
| ) | |
| # 全局实例 | |
| auth_manager = AuthManager() |