|
|
"""管理员认证系统""" |
|
|
import os |
|
|
import secrets |
|
|
import hashlib |
|
|
import hmac |
|
|
import time |
|
|
import logging |
|
|
from typing import Optional, Dict, Any |
|
|
from datetime import datetime, timedelta |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class AdminAuth: |
|
|
"""管理员认证管理器""" |
|
|
|
|
|
def __init__(self): |
|
|
self.secret_key = self._get_or_create_secret_key() |
|
|
self.session_timeout = 24 * 60 * 60 |
|
|
self._sessions: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
|
def _get_or_create_secret_key(self) -> str: |
|
|
"""获取或创建密钥 |
|
|
|
|
|
注意:此密钥目前仅用于未来扩展(当前会话存储在内存中,不依赖密钥持久化)。 |
|
|
为避免在 ASGI 事件循环中调用 asyncio.run(会导致崩溃),这里不再从持久化层读取/写入。 |
|
|
如需固定密钥,请设置环境变量 ADMIN_SECRET_KEY。 |
|
|
""" |
|
|
env_key = os.getenv("ADMIN_SECRET_KEY") |
|
|
if env_key: |
|
|
return env_key |
|
|
|
|
|
logger.info("生成新的管理员密钥") |
|
|
return secrets.token_urlsafe(32) |
|
|
|
|
|
def is_auth_required(self) -> bool: |
|
|
"""检查是否需要认证(仅当设置了 ADMIN_PASSWORD 环境变量时才需要)""" |
|
|
return bool(os.getenv("ADMIN_PASSWORD")) |
|
|
|
|
|
def get_admin_password(self) -> Optional[str]: |
|
|
"""获取管理员密码,未设置环境变量时返回 None""" |
|
|
env_password = os.getenv("ADMIN_PASSWORD") |
|
|
if env_password: |
|
|
return env_password |
|
|
return None |
|
|
|
|
|
def hash_password(self, password: str) -> str: |
|
|
"""哈希密码""" |
|
|
salt = secrets.token_bytes(32) |
|
|
pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000) |
|
|
return salt.hex() + pwdhash.hex() |
|
|
|
|
|
def verify_password(self, password: str, hashed: str) -> bool: |
|
|
"""验证密码""" |
|
|
try: |
|
|
salt = bytes.fromhex(hashed[:64]) |
|
|
stored_hash = bytes.fromhex(hashed[64:]) |
|
|
pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000) |
|
|
return hmac.compare_digest(stored_hash, pwdhash) |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
def authenticate(self, password: str) -> bool: |
|
|
"""验证管理员密码""" |
|
|
admin_password = self.get_admin_password() |
|
|
if admin_password is None: |
|
|
return True |
|
|
return hmac.compare_digest(password, admin_password) |
|
|
|
|
|
def create_session(self, user_id: str = "admin") -> str: |
|
|
"""创建会话""" |
|
|
session_id = secrets.token_urlsafe(32) |
|
|
expires_at = time.time() + self.session_timeout |
|
|
|
|
|
self._sessions[session_id] = { |
|
|
"user_id": user_id, |
|
|
"created_at": time.time(), |
|
|
"expires_at": expires_at, |
|
|
"last_activity": time.time() |
|
|
} |
|
|
|
|
|
|
|
|
self._cleanup_expired_sessions() |
|
|
|
|
|
logger.info(f"创建管理员会话: {session_id[:8]}...") |
|
|
return session_id |
|
|
|
|
|
def validate_session(self, session_id: str) -> bool: |
|
|
"""验证会话""" |
|
|
if not session_id or session_id not in self._sessions: |
|
|
return False |
|
|
|
|
|
session = self._sessions[session_id] |
|
|
current_time = time.time() |
|
|
|
|
|
|
|
|
if current_time > session["expires_at"]: |
|
|
del self._sessions[session_id] |
|
|
return False |
|
|
|
|
|
|
|
|
session["last_activity"] = current_time |
|
|
return True |
|
|
|
|
|
def revoke_session(self, session_id: str) -> bool: |
|
|
"""撤销会话""" |
|
|
if session_id in self._sessions: |
|
|
del self._sessions[session_id] |
|
|
logger.info(f"撤销管理员会话: {session_id[:8]}...") |
|
|
return True |
|
|
return False |
|
|
|
|
|
def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]: |
|
|
"""获取会话信息""" |
|
|
if session_id in self._sessions: |
|
|
session = self._sessions[session_id].copy() |
|
|
session["created_at"] = datetime.fromtimestamp(session["created_at"]).isoformat() |
|
|
session["expires_at"] = datetime.fromtimestamp(session["expires_at"]).isoformat() |
|
|
session["last_activity"] = datetime.fromtimestamp(session["last_activity"]).isoformat() |
|
|
return session |
|
|
return None |
|
|
|
|
|
def _cleanup_expired_sessions(self): |
|
|
"""清理过期会话""" |
|
|
current_time = time.time() |
|
|
expired_sessions = [ |
|
|
sid for sid, session in self._sessions.items() |
|
|
if current_time > session["expires_at"] |
|
|
] |
|
|
|
|
|
for sid in expired_sessions: |
|
|
del self._sessions[sid] |
|
|
|
|
|
if expired_sessions: |
|
|
logger.info(f"清理了 {len(expired_sessions)} 个过期会话") |
|
|
|
|
|
def get_active_sessions(self) -> Dict[str, Dict[str, Any]]: |
|
|
"""获取所有活跃会话""" |
|
|
self._cleanup_expired_sessions() |
|
|
return { |
|
|
sid: self.get_session_info(sid) |
|
|
for sid in self._sessions.keys() |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_auth_instance: Optional[AdminAuth] = None |
|
|
|
|
|
def get_admin_auth() -> AdminAuth: |
|
|
"""获取管理员认证实例(单例模式)""" |
|
|
global _auth_instance |
|
|
if _auth_instance is None: |
|
|
_auth_instance = AdminAuth() |
|
|
return _auth_instance |
|
|
|
|
|
|
|
|
|
|
|
def authenticate_admin(password: str) -> bool: |
|
|
"""验证管理员密码""" |
|
|
return get_admin_auth().authenticate(password) |
|
|
|
|
|
def create_admin_session() -> str: |
|
|
"""创建管理员会话""" |
|
|
return get_admin_auth().create_session() |
|
|
|
|
|
def validate_admin_session(session_id: str) -> bool: |
|
|
"""验证管理员会话""" |
|
|
return get_admin_auth().validate_session(session_id) |
|
|
|
|
|
def revoke_admin_session(session_id: str) -> bool: |
|
|
"""撤销管理员会话""" |
|
|
return get_admin_auth().revoke_session(session_id) |
|
|
|
|
|
def get_admin_password() -> Optional[str]: |
|
|
"""获取管理员密码""" |
|
|
return get_admin_auth().get_admin_password() |
|
|
|
|
|
|
|
|
def is_auth_required() -> bool: |
|
|
"""检查是否需要认证""" |
|
|
return get_admin_auth().is_auth_required() |
|
|
|