File size: 3,811 Bytes
595e7f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
认证相关装饰器和工具函数
"""
from functools import wraps
from fastapi import HTTPException, Header
from typing import Optional


def extract_admin_key(key: Optional[str] = None, authorization: Optional[str] = None) -> Optional[str]:
    """
    统一提取管理员密钥

    优先级:
    1. URL 参数 ?key=xxx
    2. Authorization Header (支持 Bearer token 格式)

    Args:
        key: URL 查询参数中的密钥
        authorization: Authorization Header 中的密钥

    Returns:
        提取到的密钥,如果都为空则返回 None
    """
    if key:
        return key
    if authorization:
        # 支持 Bearer token 格式
        return authorization.replace("Bearer ", "") if authorization.startswith("Bearer ") else authorization
    return None


def require_path_prefix(path_prefix_value: str):
    """
    验证路径前缀的装饰器

    Args:
        path_prefix_value: 正确的路径前缀值

    Returns:
        装饰器函数

    Example:
        @app.get("/{path_prefix}/admin")
        @require_path_prefix(PATH_PREFIX)
        async def admin_home(path_prefix: str, ...):
            ...
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, path_prefix: str, **kwargs):
            if path_prefix != path_prefix_value:
                # 返回 404 而不是 401,假装端点不存在(安全性考虑)
                raise HTTPException(404, "Not Found")
            return await func(*args, path_prefix=path_prefix, **kwargs)
        return wrapper
    return decorator


def require_admin_auth(admin_key_value: str):
    """
    验证管理员权限的装饰器

    支持两种认证方式:
    1. URL 参数:?key=xxx
    2. Authorization Header:Bearer xxx 或直接传密钥

    Args:
        admin_key_value: 正确的管理员密钥

    Returns:
        装饰器函数

    Example:
        @app.get("/{path_prefix}/admin")
        @require_admin_auth(ADMIN_KEY)
        async def admin_home(key: str = None, authorization: str = Header(None), ...):
            ...
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, key: str = None, authorization: str = Header(None), **kwargs):
            admin_key = extract_admin_key(key, authorization)
            if admin_key != admin_key_value:
                # 返回 404 而不是 401,假装端点不存在(安全性考虑)
                raise HTTPException(404, "Not Found")
            return await func(*args, key=key, authorization=authorization, **kwargs)
        return wrapper
    return decorator


def require_path_and_admin(path_prefix_value: str, admin_key_value: str):
    """
    同时验证路径前缀和管理员权限的组合装饰器

    Args:
        path_prefix_value: 正确的路径前缀值
        admin_key_value: 正确的管理员密钥

    Returns:
        装饰器函数

    Example:
        @app.get("/{path_prefix}/admin")
        @require_path_and_admin(PATH_PREFIX, ADMIN_KEY)
        async def admin_home(path_prefix: str, key: str = None, authorization: str = Header(None), ...):
            ...
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, path_prefix: str, key: str = None, authorization: str = Header(None), **kwargs):
            # 验证路径前缀
            if path_prefix != path_prefix_value:
                raise HTTPException(404, "Not Found")

            # 验证管理员密钥
            admin_key = extract_admin_key(key, authorization)
            if admin_key != admin_key_value:
                raise HTTPException(404, "Not Found")

            return await func(*args, path_prefix=path_prefix, key=key, authorization=authorization, **kwargs)
        return wrapper
    return decorator