| """ |
| API 认证模块 |
| """ |
|
|
| from typing import Optional |
| from fastapi import HTTPException, status, Security |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
|
| from app.core.config import get_config |
|
|
| DEFAULT_API_KEY = "" |
| DEFAULT_APP_KEY = "grok2api" |
| DEFAULT_PUBLIC_KEY = "" |
| DEFAULT_PUBLIC_ENABLED = False |
|
|
| |
| security = HTTPBearer( |
| auto_error=False, |
| scheme_name="API Key", |
| description="Enter your API Key in the format: Bearer <key>", |
| ) |
|
|
|
|
| def get_admin_api_key() -> str: |
| """ |
| 获取后台 API Key。 |
| |
| 为空时表示不启用后台接口认证。 |
| """ |
| api_key = get_config("app.api_key", DEFAULT_API_KEY) |
| return api_key or "" |
|
|
| def get_app_key() -> str: |
| """ |
| 获取 App Key(后台管理密码)。 |
| """ |
| app_key = get_config("app.app_key", DEFAULT_APP_KEY) |
| return app_key or "" |
|
|
| def get_public_api_key() -> str: |
| """ |
| 获取 Public API Key。 |
| |
| 为空时表示不启用 public 接口认证。 |
| """ |
| public_key = get_config("app.public_key", DEFAULT_PUBLIC_KEY) |
| return public_key or "" |
|
|
| def is_public_enabled() -> bool: |
| """ |
| 是否开启 public 功能入口。 |
| """ |
| return bool(get_config("app.public_enabled", DEFAULT_PUBLIC_ENABLED)) |
|
|
|
|
| async def verify_api_key( |
| auth: Optional[HTTPAuthorizationCredentials] = Security(security), |
| ) -> Optional[str]: |
| """ |
| 验证 Bearer Token |
| |
| 如果 config.toml 中未配置 api_key,则不启用认证。 |
| """ |
| api_key = get_admin_api_key() |
| if not api_key: |
| return None |
|
|
| if not auth: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Missing authentication token", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| if auth.credentials != api_key: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid authentication token", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| return auth.credentials |
|
|
|
|
| async def verify_app_key( |
| auth: Optional[HTTPAuthorizationCredentials] = Security(security), |
| ) -> Optional[str]: |
| """ |
| 验证后台登录密钥(app_key)。 |
| |
| app_key 必须配置,否则拒绝登录。 |
| """ |
| app_key = get_app_key() |
|
|
| if not app_key: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="App key is not configured", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| if not auth: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Missing authentication token", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| if auth.credentials != app_key: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid authentication token", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| return auth.credentials |
|
|
|
|
| async def verify_public_key( |
| auth: Optional[HTTPAuthorizationCredentials] = Security(security), |
| ) -> Optional[str]: |
| """ |
| 验证 Public Key(public 接口使用)。 |
| |
| 默认不公开,需配置 public_key 才能访问;若开启 public_enabled 且未配置 public_key,则放开访问。 |
| """ |
| public_key = get_public_api_key() |
| public_enabled = is_public_enabled() |
|
|
| if not public_key: |
| if public_enabled: |
| return None |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Public access is disabled", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| if not auth: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Missing authentication token", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| if auth.credentials != public_key: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid authentication token", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| return auth.credentials |
|
|