| | """认证模块 - API令牌验证""" |
| |
|
| | from typing import Optional, Dict |
| | from fastapi import Depends, HTTPException |
| | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| |
|
| | from app.core.config import setting |
| | from app.core.logger import logger |
| | from app.services.api_keys import api_key_manager |
| |
|
| |
|
| | |
| | security = HTTPBearer(auto_error=False) |
| |
|
| |
|
| | def _build_error(message: str, code: str = "invalid_token") -> dict: |
| | """构建认证错误""" |
| | return { |
| | "error": { |
| | "message": message, |
| | "type": "authentication_error", |
| | "code": code |
| | } |
| | } |
| |
|
| |
|
| | class AuthManager: |
| | """认证管理器 - 验证API令牌""" |
| |
|
| | @staticmethod |
| | async def verify(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Dict: |
| | """验证令牌,返回 Key 信息""" |
| | api_key = setting.grok_config.get("api_key") |
| | |
| | |
| | if not hasattr(api_key_manager, '_keys'): |
| | await api_key_manager.init() |
| |
|
| | |
| | if not credentials: |
| | |
| | if not api_key and not api_key_manager.get_all_keys(): |
| | logger.debug("[Auth] 未设置API_KEY,跳过验证") |
| | return {"key": None, "name": "Anonymous"} |
| | |
| | raise HTTPException( |
| | status_code=401, |
| | detail=_build_error("缺少认证令牌", "missing_token") |
| | ) |
| |
|
| | token = credentials.credentials |
| | |
| | |
| | key_info = api_key_manager.validate_key(token) |
| | |
| | if key_info: |
| | return key_info |
| |
|
| | raise HTTPException( |
| | status_code=401, |
| | detail=_build_error(f"令牌无效,长度: {len(token)}", "invalid_token") |
| | ) |
| |
|
| |
|
| | |
| | auth_manager = AuthManager() |