# app/api/auth.py import os import time import logging from fastapi import Request, HTTPException, Depends, status from typing import Dict, List # 导入 List # 配置日志 logger = logging.getLogger(__name__) # 从环境变量获取 API Keys ADMIN_API_KEYS = [key.strip() for key in os.getenv("ADMIN_API_KEYS", "").split(",") if key.strip()] # 如果未配置管理员 API Keys,则发出警告 if not ADMIN_API_KEYS: logger.warning("未配置 ADMIN_API_KEYS。某些管理功能可能无法访问。") USER_API_KEYS = os.getenv("USER_API_KEYS", "").split(",") USER_API_KEYS = [key.strip() for key in USER_API_KEYS if key.strip()] ALL_API_KEYS = list(set(ADMIN_API_KEYS + USER_API_KEYS)) # 合并并去重所有 keys # 简单的内存存储,用于跟踪活跃 key (与 metrics 模块共享或在此处独立管理) # 为了简化,这里先独立管理,后续可以考虑统一 active_keys_status: Dict[str, float] = {} # 速率限制配置 # 从环境变量获取速率限制,如果未设置则使用默认值 RATE_LIMIT_PER_MINUTE = int(os.getenv("RATE_LIMIT_PER_MINUTE", "60")) # 每分钟请求数 RATE_LIMIT_WINDOW_SECONDS = 60 # 速率限制时间窗口(秒) # 存储每个认证令牌的请求时间戳 request_timestamps: Dict[str, List[float]] = {} # 认证令牌认证依赖 async def get_auth_token(request: Request) -> str: # 重命名函数为 get_auth_token """从请求头中获取认证令牌并进行验证,返回认证令牌。""" auth_token = None # 更改变量名为 auth_token # 优先从 Authorization: Bearer 头中获取认证令牌 (OpenAI 兼容) auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): auth_token = auth_header.split(" ")[1] # 使用 auth_token logger.debug(f"从 Authorization 头获取到认证令牌 (Bearer Token): {auth_token[:4]}... 来自 IP: {request.client.host}") # 仅记录前4位,保护隐私 else: # 如果 Authorization 头不存在或格式不正确,尝试从 X-Auth-Token 头获取 auth_token = request.headers.get("X-Auth-Token") # 更改为 X-Auth-Token logger.debug(f"从 X-Auth-Token 头获取到认证令牌: {auth_token[:4] if auth_token else 'None'}... 来自 IP: {request.client.host}") # 仅记录前4位,保护隐私 if not auth_token or auth_token not in ALL_API_KEYS: # 使用 auth_token # 记录尝试的令牌,如果令牌为 None 或空字符串,则直接显示 'None' 或 'Empty' log_token_display = auth_token if auth_token else 'None' # 使用 auth_token logger.warning(f"认证失败: 无效或缺失的认证令牌 '{log_token_display}' 来自 IP: {request.client.host}") # 更改提示 raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效或缺失的认证令牌" # 更改提示 ) # 更新活跃令牌状态 active_keys_status[auth_token] = time.time() # 使用 auth_token # --- 速率限制逻辑 --- current_time = time.time() # 清理过期的时间戳 request_timestamps[auth_token] = [ t for t in request_timestamps.get(auth_token, []) if current_time - t < RATE_LIMIT_WINDOW_SECONDS ] # 检查是否超过速率限制 if len(request_timestamps[auth_token]) >= RATE_LIMIT_PER_MINUTE: logger.warning(f"认证令牌 '{auth_token}' 达到速率限制。来自 IP: {request.client.host}") raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f"请求过于频繁,请稍后再试。当前限制为每分钟 {RATE_LIMIT_PER_MINUTE} 次。", ) # 添加当前请求的时间戳 request_timestamps[auth_token].append(current_time) # --- 速率限制逻辑结束 --- return auth_token # 返回 auth_token async def get_admin_api_key(auth_token: str = Depends(get_auth_token)) -> str: # 更改参数名为 auth_token,依赖 get_auth_token """验证认证令牌是否为管理员令牌。""" if auth_token not in ADMIN_API_KEYS: # 使用 auth_token logger.warning(f"权限不足: 认证令牌 '{auth_token}' 不是管理员令牌,尝试访问受限资源。") # 更改提示 raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="操作需要管理员权限" ) return auth_token # 返回 auth_token async def get_user_api_key(auth_token: str = Depends(get_auth_token)) -> str: # 更改参数名为 auth_token,依赖 get_auth_token """验证认证令牌是否为有效用户令牌 (管理员或普通用户均可)。""" # 任何在 ALL_API_KEYS 中的令牌都应该被认为是有效的用户令牌 # get_auth_token 已经做了这个检查 return auth_token # 返回 auth_token