""" API 认证模块 """ from typing import Optional from fastapi import HTTPException, status, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from app.core.config import get_config DEFAULT_API_KEY = "" DEFAULT_APP_KEY = "grok2api" DEFAULT_PUBLIC_KEY = "" DEFAULT_PUBLIC_ENABLED = False # 定义 Bearer Scheme security = HTTPBearer( auto_error=False, scheme_name="API Key", description="Enter your API Key in the format: Bearer ", ) def get_admin_api_key() -> str: """ 获取后台 API Key。 为空时表示不启用后台接口认证。 """ api_key = get_config("app.api_key", DEFAULT_API_KEY) return api_key or "" def get_app_key() -> str: """ 获取 App Key(后台管理密码)。 """ app_key = get_config("app.app_key", DEFAULT_APP_KEY) return app_key or "" def get_public_api_key() -> str: """ 获取 Public API Key。 为空时表示不启用 public 接口认证。 """ public_key = get_config("app.public_key", DEFAULT_PUBLIC_KEY) return public_key or "" def is_public_enabled() -> bool: """ 是否开启 public 功能入口。 """ return bool(get_config("app.public_enabled", DEFAULT_PUBLIC_ENABLED)) async def verify_api_key( auth: Optional[HTTPAuthorizationCredentials] = Security(security), ) -> Optional[str]: """ 验证 Bearer Token 如果 config.toml 中未配置 api_key,则不启用认证。 """ api_key = get_admin_api_key() if not api_key: return None if not auth: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing authentication token", headers={"WWW-Authenticate": "Bearer"}, ) if auth.credentials != api_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token", headers={"WWW-Authenticate": "Bearer"}, ) return auth.credentials async def verify_app_key( auth: Optional[HTTPAuthorizationCredentials] = Security(security), ) -> Optional[str]: """ 验证后台登录密钥(app_key)。 app_key 必须配置,否则拒绝登录。 """ app_key = get_app_key() if not app_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="App key is not configured", headers={"WWW-Authenticate": "Bearer"}, ) if not auth: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing authentication token", headers={"WWW-Authenticate": "Bearer"}, ) if auth.credentials != app_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token", headers={"WWW-Authenticate": "Bearer"}, ) return auth.credentials async def verify_public_key( auth: Optional[HTTPAuthorizationCredentials] = Security(security), ) -> Optional[str]: """ 验证 Public Key(public 接口使用)。 默认不公开,需配置 public_key 才能访问;若开启 public_enabled 且未配置 public_key,则放开访问。 """ public_key = get_public_api_key() public_enabled = is_public_enabled() if not public_key: if public_enabled: return None raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Public access is disabled", headers={"WWW-Authenticate": "Bearer"}, ) if not auth: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing authentication token", headers={"WWW-Authenticate": "Bearer"}, ) if auth.credentials != public_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token", headers={"WWW-Authenticate": "Bearer"}, ) return auth.credentials