kiroproxy / kiro_proxy /core /admin_auth.py
KiroProxy User
Fix admin auth crash in async context
320ef8e
"""管理员认证系统"""
import os
import secrets
import hashlib
import hmac
import time
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class AdminAuth:
"""管理员认证管理器"""
def __init__(self):
self.secret_key = self._get_or_create_secret_key()
self.session_timeout = 24 * 60 * 60 # 24小时
self._sessions: Dict[str, Dict[str, Any]] = {}
def _get_or_create_secret_key(self) -> str:
"""获取或创建密钥
注意:此密钥目前仅用于未来扩展(当前会话存储在内存中,不依赖密钥持久化)。
为避免在 ASGI 事件循环中调用 asyncio.run(会导致崩溃),这里不再从持久化层读取/写入。
如需固定密钥,请设置环境变量 ADMIN_SECRET_KEY。
"""
env_key = os.getenv("ADMIN_SECRET_KEY")
if env_key:
return env_key
logger.info("生成新的管理员密钥")
return secrets.token_urlsafe(32)
def is_auth_required(self) -> bool:
"""检查是否需要认证(仅当设置了 ADMIN_PASSWORD 环境变量时才需要)"""
return bool(os.getenv("ADMIN_PASSWORD"))
def get_admin_password(self) -> Optional[str]:
"""获取管理员密码,未设置环境变量时返回 None"""
env_password = os.getenv("ADMIN_PASSWORD")
if env_password:
return env_password
return None
def hash_password(self, password: str) -> str:
"""哈希密码"""
salt = secrets.token_bytes(32)
pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
return salt.hex() + pwdhash.hex()
def verify_password(self, password: str, hashed: str) -> bool:
"""验证密码"""
try:
salt = bytes.fromhex(hashed[:64])
stored_hash = bytes.fromhex(hashed[64:])
pwdhash = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
return hmac.compare_digest(stored_hash, pwdhash)
except Exception:
return False
def authenticate(self, password: str) -> bool:
"""验证管理员密码"""
admin_password = self.get_admin_password()
if admin_password is None:
return True
return hmac.compare_digest(password, admin_password)
def create_session(self, user_id: str = "admin") -> str:
"""创建会话"""
session_id = secrets.token_urlsafe(32)
expires_at = time.time() + self.session_timeout
self._sessions[session_id] = {
"user_id": user_id,
"created_at": time.time(),
"expires_at": expires_at,
"last_activity": time.time()
}
# 清理过期会话
self._cleanup_expired_sessions()
logger.info(f"创建管理员会话: {session_id[:8]}...")
return session_id
def validate_session(self, session_id: str) -> bool:
"""验证会话"""
if not session_id or session_id not in self._sessions:
return False
session = self._sessions[session_id]
current_time = time.time()
# 检查是否过期
if current_time > session["expires_at"]:
del self._sessions[session_id]
return False
# 更新最后活动时间
session["last_activity"] = current_time
return True
def revoke_session(self, session_id: str) -> bool:
"""撤销会话"""
if session_id in self._sessions:
del self._sessions[session_id]
logger.info(f"撤销管理员会话: {session_id[:8]}...")
return True
return False
def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]:
"""获取会话信息"""
if session_id in self._sessions:
session = self._sessions[session_id].copy()
session["created_at"] = datetime.fromtimestamp(session["created_at"]).isoformat()
session["expires_at"] = datetime.fromtimestamp(session["expires_at"]).isoformat()
session["last_activity"] = datetime.fromtimestamp(session["last_activity"]).isoformat()
return session
return None
def _cleanup_expired_sessions(self):
"""清理过期会话"""
current_time = time.time()
expired_sessions = [
sid for sid, session in self._sessions.items()
if current_time > session["expires_at"]
]
for sid in expired_sessions:
del self._sessions[sid]
if expired_sessions:
logger.info(f"清理了 {len(expired_sessions)} 个过期会话")
def get_active_sessions(self) -> Dict[str, Dict[str, Any]]:
"""获取所有活跃会话"""
self._cleanup_expired_sessions()
return {
sid: self.get_session_info(sid)
for sid in self._sessions.keys()
}
# 全局认证实例
_auth_instance: Optional[AdminAuth] = None
def get_admin_auth() -> AdminAuth:
"""获取管理员认证实例(单例模式)"""
global _auth_instance
if _auth_instance is None:
_auth_instance = AdminAuth()
return _auth_instance
# 便捷函数
def authenticate_admin(password: str) -> bool:
"""验证管理员密码"""
return get_admin_auth().authenticate(password)
def create_admin_session() -> str:
"""创建管理员会话"""
return get_admin_auth().create_session()
def validate_admin_session(session_id: str) -> bool:
"""验证管理员会话"""
return get_admin_auth().validate_session(session_id)
def revoke_admin_session(session_id: str) -> bool:
"""撤销管理员会话"""
return get_admin_auth().revoke_session(session_id)
def get_admin_password() -> Optional[str]:
"""获取管理员密码"""
return get_admin_auth().get_admin_password()
def is_auth_required() -> bool:
"""检查是否需要认证"""
return get_admin_auth().is_auth_required()