File size: 6,088 Bytes
2286d62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
320ef8e
2286d62
320ef8e
 
 
 
 
 
 
2286d62
320ef8e
 
2286d62
0e30efb
 
 
 
 
 
2286d62
 
 
0e30efb
2286d62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0e30efb
 
2286d62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0e30efb
2286d62
0e30efb
 
 
 
 
320ef8e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""管理员认证系统"""
import os
import secrets
import hashlib
import hmac
import time
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta

logger = logging.getLogger(__name__)

class AdminAuth:
    """管理员认证管理器"""

    def __init__(self):
        self.secret_key = self._get_or_create_secret_key()
        self.session_timeout = 24 * 60 * 60  # 24小时
        self._sessions: Dict[str, Dict[str, Any]] = {}

    def _get_or_create_secret_key(self) -> str:
        """获取或创建密钥

        注意:此密钥目前仅用于未来扩展(当前会话存储在内存中,不依赖密钥持久化)。
        为避免在 ASGI 事件循环中调用 asyncio.run(会导致崩溃),这里不再从持久化层读取/写入。
        如需固定密钥,请设置环境变量 ADMIN_SECRET_KEY。
        """
        env_key = os.getenv("ADMIN_SECRET_KEY")
        if env_key:
            return env_key

        logger.info("生成新的管理员密钥")
        return secrets.token_urlsafe(32)

    def is_auth_required(self) -> bool:
        """检查是否需要认证(仅当设置了 ADMIN_PASSWORD 环境变量时才需要)"""
        return bool(os.getenv("ADMIN_PASSWORD"))

    def get_admin_password(self) -> Optional[str]:
        """获取管理员密码,未设置环境变量时返回 None"""
        env_password = os.getenv("ADMIN_PASSWORD")
        if env_password:
            return env_password
        return None

    def hash_password(self, password: str) -> str:
        """哈希密码"""
        salt = secrets.token_bytes(32)
        pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
        return salt.hex() + pwdhash.hex()

    def verify_password(self, password: str, hashed: str) -> bool:
        """验证密码"""
        try:
            salt = bytes.fromhex(hashed[:64])
            stored_hash = bytes.fromhex(hashed[64:])
            pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
            return hmac.compare_digest(stored_hash, pwdhash)
        except Exception:
            return False

    def authenticate(self, password: str) -> bool:
        """验证管理员密码"""
        admin_password = self.get_admin_password()
        if admin_password is None:
            return True
        return hmac.compare_digest(password, admin_password)

    def create_session(self, user_id: str = "admin") -> str:
        """创建会话"""
        session_id = secrets.token_urlsafe(32)
        expires_at = time.time() + self.session_timeout

        self._sessions[session_id] = {
            "user_id": user_id,
            "created_at": time.time(),
            "expires_at": expires_at,
            "last_activity": time.time()
        }

        # 清理过期会话
        self._cleanup_expired_sessions()

        logger.info(f"创建管理员会话: {session_id[:8]}...")
        return session_id

    def validate_session(self, session_id: str) -> bool:
        """验证会话"""
        if not session_id or session_id not in self._sessions:
            return False

        session = self._sessions[session_id]
        current_time = time.time()

        # 检查是否过期
        if current_time > session["expires_at"]:
            del self._sessions[session_id]
            return False

        # 更新最后活动时间
        session["last_activity"] = current_time
        return True

    def revoke_session(self, session_id: str) -> bool:
        """撤销会话"""
        if session_id in self._sessions:
            del self._sessions[session_id]
            logger.info(f"撤销管理员会话: {session_id[:8]}...")
            return True
        return False

    def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]:
        """获取会话信息"""
        if session_id in self._sessions:
            session = self._sessions[session_id].copy()
            session["created_at"] = datetime.fromtimestamp(session["created_at"]).isoformat()
            session["expires_at"] = datetime.fromtimestamp(session["expires_at"]).isoformat()
            session["last_activity"] = datetime.fromtimestamp(session["last_activity"]).isoformat()
            return session
        return None

    def _cleanup_expired_sessions(self):
        """清理过期会话"""
        current_time = time.time()
        expired_sessions = [
            sid for sid, session in self._sessions.items()
            if current_time > session["expires_at"]
        ]

        for sid in expired_sessions:
            del self._sessions[sid]

        if expired_sessions:
            logger.info(f"清理了 {len(expired_sessions)} 个过期会话")

    def get_active_sessions(self) -> Dict[str, Dict[str, Any]]:
        """获取所有活跃会话"""
        self._cleanup_expired_sessions()
        return {
            sid: self.get_session_info(sid)
            for sid in self._sessions.keys()
        }


# 全局认证实例
_auth_instance: Optional[AdminAuth] = None

def get_admin_auth() -> AdminAuth:
    """获取管理员认证实例(单例模式)"""
    global _auth_instance
    if _auth_instance is None:
        _auth_instance = AdminAuth()
    return _auth_instance


# 便捷函数
def authenticate_admin(password: str) -> bool:
    """验证管理员密码"""
    return get_admin_auth().authenticate(password)

def create_admin_session() -> str:
    """创建管理员会话"""
    return get_admin_auth().create_session()

def validate_admin_session(session_id: str) -> bool:
    """验证管理员会话"""
    return get_admin_auth().validate_session(session_id)

def revoke_admin_session(session_id: str) -> bool:
    """撤销管理员会话"""
    return get_admin_auth().revoke_session(session_id)

def get_admin_password() -> Optional[str]:
    """获取管理员密码"""
    return get_admin_auth().get_admin_password()


def is_auth_required() -> bool:
    """检查是否需要认证"""
    return get_admin_auth().is_auth_required()