File size: 4,820 Bytes
31ebcce
 
 
 
 
 
88a29ea
31ebcce
 
 
 
 
70d5596
 
 
 
31ebcce
 
 
 
 
 
 
 
 
 
1ce5f62
 
 
 
 
 
 
31ebcce
059b2ab
 
 
 
 
5fb2a2c
 
059b2ab
 
5fb2a2c
059b2ab
 
 
 
 
 
 
 
31ebcce
059b2ab
31ebcce
 
059b2ab
 
1ce5f62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
059b2ab
31ebcce
 
059b2ab
 
 
 
31ebcce
 
 
059b2ab
31ebcce
 
059b2ab
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# app/api/auth.py

import os
import time
import logging
from fastapi import Request, HTTPException, Depends, status
from typing import Dict, List # 导入 List

# 配置日志
logger = logging.getLogger(__name__)

# 从环境变量获取 API Keys
ADMIN_API_KEYS = [key.strip() for key in os.getenv("ADMIN_API_KEYS", "").split(",") if key.strip()]
# 如果未配置管理员 API Keys,则发出警告
if not ADMIN_API_KEYS:
    logger.warning("未配置 ADMIN_API_KEYS。某些管理功能可能无法访问。")

USER_API_KEYS = os.getenv("USER_API_KEYS", "").split(",")
USER_API_KEYS = [key.strip() for key in USER_API_KEYS if key.strip()]

ALL_API_KEYS = list(set(ADMIN_API_KEYS + USER_API_KEYS)) # 合并并去重所有 keys

# 简单的内存存储,用于跟踪活跃 key (与 metrics 模块共享或在此处独立管理)
# 为了简化,这里先独立管理,后续可以考虑统一
active_keys_status: Dict[str, float] = {}

# 速率限制配置
# 从环境变量获取速率限制,如果未设置则使用默认值
RATE_LIMIT_PER_MINUTE = int(os.getenv("RATE_LIMIT_PER_MINUTE", "60")) # 每分钟请求数
RATE_LIMIT_WINDOW_SECONDS = 60 # 速率限制时间窗口(秒)

# 存储每个认证令牌的请求时间戳
request_timestamps: Dict[str, List[float]] = {}

# 认证令牌认证依赖
async def get_auth_token(request: Request) -> str: # 重命名函数为 get_auth_token
    """从请求头中获取认证令牌并进行验证,返回认证令牌。"""
    auth_token = None # 更改变量名为 auth_token
    # 优先从 Authorization: Bearer 头中获取认证令牌 (OpenAI 兼容)
    auth_header = request.headers.get("Authorization")
    if auth_header and auth_header.startswith("Bearer "):
        auth_token = auth_header.split(" ")[1] # 使用 auth_token
        logger.debug(f"从 Authorization 头获取到认证令牌 (Bearer Token): {auth_token[:4]}... 来自 IP: {request.client.host}") # 仅记录前4位,保护隐私
    else:
        # 如果 Authorization 头不存在或格式不正确,尝试从 X-Auth-Token 头获取
        auth_token = request.headers.get("X-Auth-Token") # 更改为 X-Auth-Token
        logger.debug(f"从 X-Auth-Token 头获取到认证令牌: {auth_token[:4] if auth_token else 'None'}... 来自 IP: {request.client.host}") # 仅记录前4位,保护隐私

    if not auth_token or auth_token not in ALL_API_KEYS: # 使用 auth_token
        # 记录尝试的令牌,如果令牌为 None 或空字符串,则直接显示 'None' 或 'Empty'
        log_token_display = auth_token if auth_token else 'None' # 使用 auth_token
        logger.warning(f"认证失败: 无效或缺失的认证令牌 '{log_token_display}' 来自 IP: {request.client.host}") # 更改提示
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED, detail="无效或缺失的认证令牌" # 更改提示
        )

    # 更新活跃令牌状态
    active_keys_status[auth_token] = time.time() # 使用 auth_token

    # --- 速率限制逻辑 ---
    current_time = time.time()
    # 清理过期的时间戳
    request_timestamps[auth_token] = [
        t for t in request_timestamps.get(auth_token, []) if current_time - t < RATE_LIMIT_WINDOW_SECONDS
    ]
    
    # 检查是否超过速率限制
    if len(request_timestamps[auth_token]) >= RATE_LIMIT_PER_MINUTE:
        logger.warning(f"认证令牌 '{auth_token}' 达到速率限制。来自 IP: {request.client.host}")
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail=f"请求过于频繁,请稍后再试。当前限制为每分钟 {RATE_LIMIT_PER_MINUTE} 次。",
        )
    
    # 添加当前请求的时间戳
    request_timestamps[auth_token].append(current_time)
    # --- 速率限制逻辑结束 ---

    return auth_token # 返回 auth_token


async def get_admin_api_key(auth_token: str = Depends(get_auth_token)) -> str: # 更改参数名为 auth_token,依赖 get_auth_token
    """验证认证令牌是否为管理员令牌。"""
    if auth_token not in ADMIN_API_KEYS: # 使用 auth_token
        logger.warning(f"权限不足: 认证令牌 '{auth_token}' 不是管理员令牌,尝试访问受限资源。") # 更改提示
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN, detail="操作需要管理员权限"
        )
    return auth_token # 返回 auth_token


async def get_user_api_key(auth_token: str = Depends(get_auth_token)) -> str: # 更改参数名为 auth_token,依赖 get_auth_token
    """验证认证令牌是否为有效用户令牌 (管理员或普通用户均可)。"""
    # 任何在 ALL_API_KEYS 中的令牌都应该被认为是有效的用户令牌
    # get_auth_token 已经做了这个检查
    return auth_token # 返回 auth_token