File size: 6,088 Bytes
2286d62 320ef8e 2286d62 320ef8e 2286d62 320ef8e 2286d62 0e30efb 2286d62 0e30efb 2286d62 0e30efb 2286d62 0e30efb 2286d62 0e30efb 320ef8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
"""管理员认证系统"""
import os
import secrets
import hashlib
import hmac
import time
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class AdminAuth:
"""管理员认证管理器"""
def __init__(self):
self.secret_key = self._get_or_create_secret_key()
self.session_timeout = 24 * 60 * 60 # 24小时
self._sessions: Dict[str, Dict[str, Any]] = {}
def _get_or_create_secret_key(self) -> str:
"""获取或创建密钥
注意:此密钥目前仅用于未来扩展(当前会话存储在内存中,不依赖密钥持久化)。
为避免在 ASGI 事件循环中调用 asyncio.run(会导致崩溃),这里不再从持久化层读取/写入。
如需固定密钥,请设置环境变量 ADMIN_SECRET_KEY。
"""
env_key = os.getenv("ADMIN_SECRET_KEY")
if env_key:
return env_key
logger.info("生成新的管理员密钥")
return secrets.token_urlsafe(32)
def is_auth_required(self) -> bool:
"""检查是否需要认证(仅当设置了 ADMIN_PASSWORD 环境变量时才需要)"""
return bool(os.getenv("ADMIN_PASSWORD"))
def get_admin_password(self) -> Optional[str]:
"""获取管理员密码,未设置环境变量时返回 None"""
env_password = os.getenv("ADMIN_PASSWORD")
if env_password:
return env_password
return None
def hash_password(self, password: str) -> str:
"""哈希密码"""
salt = secrets.token_bytes(32)
pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
return salt.hex() + pwdhash.hex()
def verify_password(self, password: str, hashed: str) -> bool:
"""验证密码"""
try:
salt = bytes.fromhex(hashed[:64])
stored_hash = bytes.fromhex(hashed[64:])
pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
return hmac.compare_digest(stored_hash, pwdhash)
except Exception:
return False
def authenticate(self, password: str) -> bool:
"""验证管理员密码"""
admin_password = self.get_admin_password()
if admin_password is None:
return True
return hmac.compare_digest(password, admin_password)
def create_session(self, user_id: str = "admin") -> str:
"""创建会话"""
session_id = secrets.token_urlsafe(32)
expires_at = time.time() + self.session_timeout
self._sessions[session_id] = {
"user_id": user_id,
"created_at": time.time(),
"expires_at": expires_at,
"last_activity": time.time()
}
# 清理过期会话
self._cleanup_expired_sessions()
logger.info(f"创建管理员会话: {session_id[:8]}...")
return session_id
def validate_session(self, session_id: str) -> bool:
"""验证会话"""
if not session_id or session_id not in self._sessions:
return False
session = self._sessions[session_id]
current_time = time.time()
# 检查是否过期
if current_time > session["expires_at"]:
del self._sessions[session_id]
return False
# 更新最后活动时间
session["last_activity"] = current_time
return True
def revoke_session(self, session_id: str) -> bool:
"""撤销会话"""
if session_id in self._sessions:
del self._sessions[session_id]
logger.info(f"撤销管理员会话: {session_id[:8]}...")
return True
return False
def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]:
"""获取会话信息"""
if session_id in self._sessions:
session = self._sessions[session_id].copy()
session["created_at"] = datetime.fromtimestamp(session["created_at"]).isoformat()
session["expires_at"] = datetime.fromtimestamp(session["expires_at"]).isoformat()
session["last_activity"] = datetime.fromtimestamp(session["last_activity"]).isoformat()
return session
return None
def _cleanup_expired_sessions(self):
"""清理过期会话"""
current_time = time.time()
expired_sessions = [
sid for sid, session in self._sessions.items()
if current_time > session["expires_at"]
]
for sid in expired_sessions:
del self._sessions[sid]
if expired_sessions:
logger.info(f"清理了 {len(expired_sessions)} 个过期会话")
def get_active_sessions(self) -> Dict[str, Dict[str, Any]]:
"""获取所有活跃会话"""
self._cleanup_expired_sessions()
return {
sid: self.get_session_info(sid)
for sid in self._sessions.keys()
}
# 全局认证实例
_auth_instance: Optional[AdminAuth] = None
def get_admin_auth() -> AdminAuth:
"""获取管理员认证实例(单例模式)"""
global _auth_instance
if _auth_instance is None:
_auth_instance = AdminAuth()
return _auth_instance
# 便捷函数
def authenticate_admin(password: str) -> bool:
"""验证管理员密码"""
return get_admin_auth().authenticate(password)
def create_admin_session() -> str:
"""创建管理员会话"""
return get_admin_auth().create_session()
def validate_admin_session(session_id: str) -> bool:
"""验证管理员会话"""
return get_admin_auth().validate_session(session_id)
def revoke_admin_session(session_id: str) -> bool:
"""撤销管理员会话"""
return get_admin_auth().revoke_session(session_id)
def get_admin_password() -> Optional[str]:
"""获取管理员密码"""
return get_admin_auth().get_admin_password()
def is_auth_required() -> bool:
"""检查是否需要认证"""
return get_admin_auth().is_auth_required()
|