Gem2a / app /api /auth.py
misonL's picture
Upgrade Pydantic and import List type
88a29ea
Raw
History Blame Contribute Delete
4.82 kB
# app/api/auth.py
import os
import time
import logging
from fastapi import Request, HTTPException, Depends, status
from typing import Dict, List # 导入 List
# 配置日志
logger = logging.getLogger(__name__)
# 从环境变量获取 API Keys
ADMIN_API_KEYS = [key.strip() for key in os.getenv("ADMIN_API_KEYS", "").split(",") if key.strip()]
# 如果未配置管理员 API Keys,则发出警告
if not ADMIN_API_KEYS:
logger.warning("未配置 ADMIN_API_KEYS。某些管理功能可能无法访问。")
USER_API_KEYS = os.getenv("USER_API_KEYS", "").split(",")
USER_API_KEYS = [key.strip() for key in USER_API_KEYS if key.strip()]
ALL_API_KEYS = list(set(ADMIN_API_KEYS + USER_API_KEYS)) # 合并并去重所有 keys
# 简单的内存存储,用于跟踪活跃 key (与 metrics 模块共享或在此处独立管理)
# 为了简化,这里先独立管理,后续可以考虑统一
active_keys_status: Dict[str, float] = {}
# 速率限制配置
# 从环境变量获取速率限制,如果未设置则使用默认值
RATE_LIMIT_PER_MINUTE = int(os.getenv("RATE_LIMIT_PER_MINUTE", "60")) # 每分钟请求数
RATE_LIMIT_WINDOW_SECONDS = 60 # 速率限制时间窗口(秒)
# 存储每个认证令牌的请求时间戳
request_timestamps: Dict[str, List[float]] = {}
# 认证令牌认证依赖
async def get_auth_token(request: Request) -> str: # 重命名函数为 get_auth_token
"""从请求头中获取认证令牌并进行验证,返回认证令牌。"""
auth_token = None # 更改变量名为 auth_token
# 优先从 Authorization: Bearer 头中获取认证令牌 (OpenAI 兼容)
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.startswith("Bearer "):
auth_token = auth_header.split(" ")[1] # 使用 auth_token
logger.debug(f"从 Authorization 头获取到认证令牌 (Bearer Token): {auth_token[:4]}... 来自 IP: {request.client.host}") # 仅记录前4位,保护隐私
else:
# 如果 Authorization 头不存在或格式不正确,尝试从 X-Auth-Token 头获取
auth_token = request.headers.get("X-Auth-Token") # 更改为 X-Auth-Token
logger.debug(f"从 X-Auth-Token 头获取到认证令牌: {auth_token[:4] if auth_token else 'None'}... 来自 IP: {request.client.host}") # 仅记录前4位,保护隐私
if not auth_token or auth_token not in ALL_API_KEYS: # 使用 auth_token
# 记录尝试的令牌,如果令牌为 None 或空字符串,则直接显示 'None' 或 'Empty'
log_token_display = auth_token if auth_token else 'None' # 使用 auth_token
logger.warning(f"认证失败: 无效或缺失的认证令牌 '{log_token_display}' 来自 IP: {request.client.host}") # 更改提示
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="无效或缺失的认证令牌" # 更改提示
)
# 更新活跃令牌状态
active_keys_status[auth_token] = time.time() # 使用 auth_token
# --- 速率限制逻辑 ---
current_time = time.time()
# 清理过期的时间戳
request_timestamps[auth_token] = [
t for t in request_timestamps.get(auth_token, []) if current_time - t < RATE_LIMIT_WINDOW_SECONDS
]
# 检查是否超过速率限制
if len(request_timestamps[auth_token]) >= RATE_LIMIT_PER_MINUTE:
logger.warning(f"认证令牌 '{auth_token}' 达到速率限制。来自 IP: {request.client.host}")
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"请求过于频繁,请稍后再试。当前限制为每分钟 {RATE_LIMIT_PER_MINUTE} 次。",
)
# 添加当前请求的时间戳
request_timestamps[auth_token].append(current_time)
# --- 速率限制逻辑结束 ---
return auth_token # 返回 auth_token
async def get_admin_api_key(auth_token: str = Depends(get_auth_token)) -> str: # 更改参数名为 auth_token,依赖 get_auth_token
"""验证认证令牌是否为管理员令牌。"""
if auth_token not in ADMIN_API_KEYS: # 使用 auth_token
logger.warning(f"权限不足: 认证令牌 '{auth_token}' 不是管理员令牌,尝试访问受限资源。") # 更改提示
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="操作需要管理员权限"
)
return auth_token # 返回 auth_token
async def get_user_api_key(auth_token: str = Depends(get_auth_token)) -> str: # 更改参数名为 auth_token,依赖 get_auth_token
"""验证认证令牌是否为有效用户令牌 (管理员或普通用户均可)。"""
# 任何在 ALL_API_KEYS 中的令牌都应该被认为是有效的用户令牌
# get_auth_token 已经做了这个检查
return auth_token # 返回 auth_token