| """账户管理模块
|
|
|
| 负责账户配置、多账户协调和会话缓存管理
|
| """
|
| import asyncio
|
| import json
|
| import logging
|
| import os
|
| import time
|
| from dataclasses import dataclass
|
| from datetime import datetime, timedelta, timezone
|
| from typing import Dict, List, Optional, TYPE_CHECKING
|
|
|
| from fastapi import HTTPException
|
|
|
| if TYPE_CHECKING:
|
| from core.jwt import JWTManager
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| if os.path.exists("/data"):
|
| ACCOUNTS_FILE = "/data/accounts.json"
|
| else:
|
| ACCOUNTS_FILE = "data/accounts.json"
|
|
|
|
|
| @dataclass
|
| class AccountConfig:
|
| """单个账户配置"""
|
| account_id: str
|
| secure_c_ses: str
|
| host_c_oses: Optional[str]
|
| csesidx: str
|
| config_id: str
|
| expires_at: Optional[str] = None
|
| disabled: bool = False
|
|
|
| def get_remaining_hours(self) -> Optional[float]:
|
| """计算账户剩余小时数"""
|
| if not self.expires_at:
|
| return None
|
| try:
|
|
|
| beijing_tz = timezone(timedelta(hours=8))
|
| expire_time = datetime.strptime(self.expires_at, "%Y-%m-%d %H:%M:%S")
|
| expire_time = expire_time.replace(tzinfo=beijing_tz)
|
|
|
|
|
| now = datetime.now(beijing_tz)
|
|
|
|
|
| remaining = (expire_time - now).total_seconds() / 3600
|
| return remaining
|
| except Exception:
|
| return None
|
|
|
| def is_expired(self) -> bool:
|
| """检查账户是否已过期"""
|
| remaining = self.get_remaining_hours()
|
| if remaining is None:
|
| return False
|
| return remaining <= 0
|
|
|
|
|
| def format_account_expiration(remaining_hours: Optional[float]) -> tuple:
|
| """
|
| 格式化账户过期时间显示(基于12小时过期周期)
|
|
|
| Args:
|
| remaining_hours: 剩余小时数(None表示未设置过期时间)
|
|
|
| Returns:
|
| (status, status_color, expire_display) 元组
|
| """
|
| if remaining_hours is None:
|
|
|
| return ("未设置", "#9e9e9e", "未设置")
|
| elif remaining_hours <= 0:
|
| return ("已过期", "#f44336", "已过期")
|
| elif remaining_hours < 3:
|
| return ("即将过期", "#ff9800", f"{remaining_hours:.1f} 小时")
|
| else:
|
| return ("正常", "#4caf50", f"{remaining_hours:.1f} 小时")
|
|
|
|
|
| class AccountManager:
|
| """单个账户管理器"""
|
| def __init__(self, config: AccountConfig, http_client, user_agent: str, account_failure_threshold: int, rate_limit_cooldown_seconds: int):
|
| self.config = config
|
| self.http_client = http_client
|
| self.user_agent = user_agent
|
| self.account_failure_threshold = account_failure_threshold
|
| self.rate_limit_cooldown_seconds = rate_limit_cooldown_seconds
|
| self.jwt_manager: Optional['JWTManager'] = None
|
| self.is_available = True
|
| self.last_error_time = 0.0
|
| self.last_429_time = 0.0
|
| self.error_count = 0
|
| self.conversation_count = 0
|
|
|
| async def get_jwt(self, request_id: str = "") -> str:
|
| """获取 JWT token (带错误处理)"""
|
|
|
| if self.config.is_expired():
|
| self.is_available = False
|
| logger.warning(f"[ACCOUNT] [{self.config.account_id}] 账户已过期,已自动禁用")
|
| raise HTTPException(403, f"Account {self.config.account_id} has expired")
|
|
|
| try:
|
| if self.jwt_manager is None:
|
|
|
| from core.jwt import JWTManager
|
| self.jwt_manager = JWTManager(self.config, self.http_client, self.user_agent)
|
| jwt = await self.jwt_manager.get(request_id)
|
| self.is_available = True
|
| self.error_count = 0
|
| return jwt
|
| except Exception as e:
|
| self.last_error_time = time.time()
|
| self.error_count += 1
|
|
|
| if self.error_count >= self.account_failure_threshold:
|
| self.is_available = False
|
| logger.error(f"[ACCOUNT] [{self.config.account_id}] JWT获取连续失败{self.error_count}次,账户已永久禁用")
|
| else:
|
|
|
| logger.warning(f"[ACCOUNT] [{self.config.account_id}] JWT获取失败({self.error_count}/{self.account_failure_threshold}): {type(e).__name__}")
|
| raise
|
|
|
| def should_retry(self) -> bool:
|
| """检查账户是否可重试(429错误10分钟后恢复,普通错误永久禁用)"""
|
| if self.is_available:
|
| return True
|
|
|
| current_time = time.time()
|
|
|
|
|
| if self.last_429_time > 0:
|
| if current_time - self.last_429_time > self.rate_limit_cooldown_seconds:
|
| return True
|
| return False
|
|
|
|
|
| return False
|
|
|
| def get_cooldown_info(self) -> tuple[int, str | None]:
|
| """
|
| 获取账户冷却信息
|
|
|
| Returns:
|
| (cooldown_seconds, cooldown_reason) 元组
|
| - cooldown_seconds: 剩余冷却秒数,0表示无冷却,-1表示永久禁用
|
| - cooldown_reason: 冷却原因,None表示无冷却
|
| """
|
| current_time = time.time()
|
|
|
|
|
| if self.last_429_time > 0:
|
| remaining_429 = self.rate_limit_cooldown_seconds - (current_time - self.last_429_time)
|
| if remaining_429 > 0:
|
| return (int(remaining_429), "429限流")
|
|
|
|
|
|
|
| if self.is_available:
|
| return (0, None)
|
|
|
|
|
| return (-1, "错误禁用")
|
|
|
|
|
| class MultiAccountManager:
|
| """多账户协调器"""
|
| def __init__(self, session_cache_ttl_seconds: int):
|
| self.accounts: Dict[str, AccountManager] = {}
|
| self.account_list: List[str] = []
|
| self.current_index = 0
|
| self._cache_lock = asyncio.Lock()
|
| self._index_lock = asyncio.Lock()
|
|
|
| self.global_session_cache: Dict[str, dict] = {}
|
| self.cache_max_size = 1000
|
| self.cache_ttl = session_cache_ttl_seconds
|
|
|
| self._session_locks: Dict[str, asyncio.Lock] = {}
|
| self._session_locks_lock = asyncio.Lock()
|
| self._session_locks_max_size = 2000
|
|
|
| def _clean_expired_cache(self):
|
| """清理过期的缓存条目"""
|
| current_time = time.time()
|
| expired_keys = [
|
| key for key, value in self.global_session_cache.items()
|
| if current_time - value["updated_at"] > self.cache_ttl
|
| ]
|
| for key in expired_keys:
|
| del self.global_session_cache[key]
|
| if expired_keys:
|
| logger.info(f"[CACHE] 清理 {len(expired_keys)} 个过期会话缓存")
|
|
|
| def _ensure_cache_size(self):
|
| """确保缓存不超过最大大小(LRU策略)"""
|
| if len(self.global_session_cache) > self.cache_max_size:
|
|
|
| sorted_items = sorted(
|
| self.global_session_cache.items(),
|
| key=lambda x: x[1]["updated_at"]
|
| )
|
| remove_count = len(sorted_items) - int(self.cache_max_size * 0.8)
|
| for key, _ in sorted_items[:remove_count]:
|
| del self.global_session_cache[key]
|
| logger.info(f"[CACHE] LRU清理 {remove_count} 个最旧会话缓存")
|
|
|
| async def start_background_cleanup(self):
|
| """启动后台缓存清理任务(每5分钟执行一次)"""
|
| try:
|
| while True:
|
| await asyncio.sleep(300)
|
| async with self._cache_lock:
|
| self._clean_expired_cache()
|
| self._ensure_cache_size()
|
| except asyncio.CancelledError:
|
| logger.info("[CACHE] 后台清理任务已停止")
|
| except Exception as e:
|
| logger.error(f"[CACHE] 后台清理任务异常: {e}")
|
|
|
| async def set_session_cache(self, conv_key: str, account_id: str, session_id: str):
|
| """线程安全地设置会话缓存"""
|
| async with self._cache_lock:
|
| self.global_session_cache[conv_key] = {
|
| "account_id": account_id,
|
| "session_id": session_id,
|
| "updated_at": time.time()
|
| }
|
|
|
| self._ensure_cache_size()
|
|
|
| async def update_session_time(self, conv_key: str):
|
| """线程安全地更新会话时间戳"""
|
| async with self._cache_lock:
|
| if conv_key in self.global_session_cache:
|
| self.global_session_cache[conv_key]["updated_at"] = time.time()
|
|
|
| async def acquire_session_lock(self, conv_key: str) -> asyncio.Lock:
|
| """获取指定对话的锁(用于防止同一对话的并发请求冲突)"""
|
| async with self._session_locks_lock:
|
|
|
| if len(self._session_locks) > self._session_locks_max_size:
|
|
|
| valid_keys = set(self.global_session_cache.keys())
|
| keys_to_remove = [k for k in self._session_locks if k not in valid_keys]
|
| for k in keys_to_remove[:len(keys_to_remove)//2]:
|
| del self._session_locks[k]
|
|
|
| if conv_key not in self._session_locks:
|
| self._session_locks[conv_key] = asyncio.Lock()
|
| return self._session_locks[conv_key]
|
|
|
| def update_http_client(self, http_client):
|
| """更新所有账户使用的 http_client(用于代理变更后重建客户端)"""
|
| for account_mgr in self.accounts.values():
|
| account_mgr.http_client = http_client
|
| if account_mgr.jwt_manager is not None:
|
| account_mgr.jwt_manager.http_client = http_client
|
|
|
| def add_account(self, config: AccountConfig, http_client, user_agent: str, account_failure_threshold: int, rate_limit_cooldown_seconds: int, global_stats: dict):
|
| """添加账户"""
|
| manager = AccountManager(config, http_client, user_agent, account_failure_threshold, rate_limit_cooldown_seconds)
|
|
|
| if "account_conversations" in global_stats:
|
| manager.conversation_count = global_stats["account_conversations"].get(config.account_id, 0)
|
| self.accounts[config.account_id] = manager
|
| self.account_list.append(config.account_id)
|
| logger.info(f"[MULTI] [ACCOUNT] 添加账户: {config.account_id}")
|
|
|
| async def get_account(self, account_id: Optional[str] = None, request_id: str = "") -> AccountManager:
|
| """获取账户 (轮询或指定) - 优化锁粒度,减少竞争"""
|
| req_tag = f"[req_{request_id}] " if request_id else ""
|
|
|
|
|
| if account_id:
|
| if account_id not in self.accounts:
|
| raise HTTPException(404, f"Account {account_id} not found")
|
| account = self.accounts[account_id]
|
| if not account.should_retry():
|
| raise HTTPException(503, f"Account {account_id} temporarily unavailable")
|
| return account
|
|
|
|
|
| available_accounts = [
|
| acc_id for acc_id in self.account_list
|
| if self.accounts[acc_id].should_retry()
|
| and not self.accounts[acc_id].config.is_expired()
|
| and not self.accounts[acc_id].config.disabled
|
| ]
|
|
|
| if not available_accounts:
|
| raise HTTPException(503, "No available accounts")
|
|
|
|
|
| async with self._index_lock:
|
| if not hasattr(self, '_available_index'):
|
| self._available_index = 0
|
|
|
| account_id = available_accounts[self._available_index % len(available_accounts)]
|
| self._available_index = (self._available_index + 1) % len(available_accounts)
|
|
|
| account = self.accounts[account_id]
|
| logger.info(f"[MULTI] [ACCOUNT] {req_tag}选择账户: {account_id}")
|
| return account
|
|
|
|
|
|
|
|
|
| def save_accounts_to_file(accounts_data: list):
|
| """保存账户配置到文件"""
|
| with open(ACCOUNTS_FILE, 'w', encoding='utf-8') as f:
|
| json.dump(accounts_data, f, ensure_ascii=False, indent=2)
|
| logger.info(f"[CONFIG] 配置已保存到 {ACCOUNTS_FILE}")
|
|
|
|
|
| def load_accounts_from_source() -> list:
|
| """从环境变量或文件加载账户配置,优先使用环境变量"""
|
|
|
| env_accounts = os.environ.get('ACCOUNTS_CONFIG')
|
| if env_accounts:
|
| try:
|
| accounts_data = json.loads(env_accounts)
|
| if accounts_data:
|
| logger.info(f"[CONFIG] 从环境变量加载配置,共 {len(accounts_data)} 个账户")
|
| else:
|
| logger.warning(f"[CONFIG] 环境变量 ACCOUNTS_CONFIG 为空")
|
| return accounts_data
|
| except Exception as e:
|
| logger.error(f"[CONFIG] 环境变量加载失败: {str(e)},尝试从文件加载")
|
|
|
|
|
| if os.path.exists(ACCOUNTS_FILE):
|
| try:
|
| with open(ACCOUNTS_FILE, 'r', encoding='utf-8') as f:
|
| accounts_data = json.load(f)
|
| if accounts_data:
|
| logger.info(f"[CONFIG] 从文件加载配置: {ACCOUNTS_FILE},共 {len(accounts_data)} 个账户")
|
| else:
|
| logger.warning(f"[CONFIG] 账户配置为空,请在管理面板添加账户或编辑 {ACCOUNTS_FILE}")
|
| return accounts_data
|
| except Exception as e:
|
| logger.warning(f"[CONFIG] 文件加载失败: {str(e)},创建空配置")
|
|
|
|
|
| logger.warning(f"[CONFIG] 未找到 {ACCOUNTS_FILE},已创建空配置文件")
|
| logger.info(f"[CONFIG] 💡 请在管理面板添加账户,或直接编辑 {ACCOUNTS_FILE},或使用批量上传功能,或设置环境变量 ACCOUNTS_CONFIG")
|
| save_accounts_to_file([])
|
| return []
|
|
|
|
|
| def get_account_id(acc: dict, index: int) -> str:
|
| """获取账户ID(有显式ID则使用,否则生成默认ID)"""
|
| return acc.get("id", f"account_{index}")
|
|
|
|
|
| def load_multi_account_config(
|
| http_client,
|
| user_agent: str,
|
| account_failure_threshold: int,
|
| rate_limit_cooldown_seconds: int,
|
| session_cache_ttl_seconds: int,
|
| global_stats: dict
|
| ) -> MultiAccountManager:
|
| """从文件或环境变量加载多账户配置"""
|
| manager = MultiAccountManager(session_cache_ttl_seconds)
|
|
|
| accounts_data = load_accounts_from_source()
|
|
|
| for i, acc in enumerate(accounts_data, 1):
|
|
|
| required_fields = ["secure_c_ses", "csesidx", "config_id"]
|
| missing_fields = [f for f in required_fields if f not in acc]
|
| if missing_fields:
|
| raise ValueError(f"账户 {i} 缺少必需字段: {', '.join(missing_fields)}")
|
|
|
| config = AccountConfig(
|
| account_id=get_account_id(acc, i),
|
| secure_c_ses=acc["secure_c_ses"],
|
| host_c_oses=acc.get("host_c_oses"),
|
| csesidx=acc["csesidx"],
|
| config_id=acc["config_id"],
|
| expires_at=acc.get("expires_at"),
|
| disabled=acc.get("disabled", False)
|
| )
|
|
|
|
|
| if config.is_expired():
|
| logger.warning(f"[CONFIG] 账户 {config.account_id} 已过期,跳过加载")
|
| continue
|
|
|
| manager.add_account(config, http_client, user_agent, account_failure_threshold, rate_limit_cooldown_seconds, global_stats)
|
|
|
| if not manager.accounts:
|
| logger.warning(f"[CONFIG] 没有有效的账户配置,服务将启动但无法处理请求,请在管理面板添加账户")
|
| else:
|
| logger.info(f"[CONFIG] 成功加载 {len(manager.accounts)} 个账户")
|
| return manager
|
|
|
|
|
| def reload_accounts(
|
| multi_account_mgr: MultiAccountManager,
|
| http_client,
|
| user_agent: str,
|
| account_failure_threshold: int,
|
| rate_limit_cooldown_seconds: int,
|
| session_cache_ttl_seconds: int,
|
| global_stats: dict
|
| ) -> MultiAccountManager:
|
| """重新加载账户配置(保留现有账户的运行时状态)"""
|
|
|
| old_states = {}
|
| for account_id, account_mgr in multi_account_mgr.accounts.items():
|
| old_states[account_id] = {
|
| "is_available": account_mgr.is_available,
|
| "last_error_time": account_mgr.last_error_time,
|
| "last_429_time": account_mgr.last_429_time,
|
| "error_count": account_mgr.error_count,
|
| "conversation_count": account_mgr.conversation_count
|
| }
|
|
|
|
|
| multi_account_mgr.global_session_cache.clear()
|
| new_mgr = load_multi_account_config(
|
| http_client,
|
| user_agent,
|
| account_failure_threshold,
|
| rate_limit_cooldown_seconds,
|
| session_cache_ttl_seconds,
|
| global_stats
|
| )
|
|
|
|
|
| for account_id, state in old_states.items():
|
| if account_id in new_mgr.accounts:
|
| account_mgr = new_mgr.accounts[account_id]
|
| account_mgr.is_available = state["is_available"]
|
| account_mgr.last_error_time = state["last_error_time"]
|
| account_mgr.last_429_time = state["last_429_time"]
|
| account_mgr.error_count = state["error_count"]
|
| account_mgr.conversation_count = state["conversation_count"]
|
| logger.debug(f"[CONFIG] 账户 {account_id} 运行时状态已恢复")
|
|
|
| logger.info(f"[CONFIG] 配置已重载,当前账户数: {len(new_mgr.accounts)}")
|
| return new_mgr
|
|
|
|
|
| def update_accounts_config(
|
| accounts_data: list,
|
| multi_account_mgr: MultiAccountManager,
|
| http_client,
|
| user_agent: str,
|
| account_failure_threshold: int,
|
| rate_limit_cooldown_seconds: int,
|
| session_cache_ttl_seconds: int,
|
| global_stats: dict
|
| ) -> MultiAccountManager:
|
| """更新账户配置(保存到文件并重新加载)"""
|
| save_accounts_to_file(accounts_data)
|
| return reload_accounts(
|
| multi_account_mgr,
|
| http_client,
|
| user_agent,
|
| account_failure_threshold,
|
| rate_limit_cooldown_seconds,
|
| session_cache_ttl_seconds,
|
| global_stats
|
| )
|
|
|
|
|
| def delete_account(
|
| account_id: str,
|
| multi_account_mgr: MultiAccountManager,
|
| http_client,
|
| user_agent: str,
|
| account_failure_threshold: int,
|
| rate_limit_cooldown_seconds: int,
|
| session_cache_ttl_seconds: int,
|
| global_stats: dict
|
| ) -> MultiAccountManager:
|
| """删除单个账户"""
|
| accounts_data = load_accounts_from_source()
|
|
|
|
|
| filtered = [
|
| acc for i, acc in enumerate(accounts_data, 1)
|
| if get_account_id(acc, i) != account_id
|
| ]
|
|
|
| if len(filtered) == len(accounts_data):
|
| raise ValueError(f"账户 {account_id} 不存在")
|
|
|
| save_accounts_to_file(filtered)
|
| return reload_accounts(
|
| multi_account_mgr,
|
| http_client,
|
| user_agent,
|
| account_failure_threshold,
|
| rate_limit_cooldown_seconds,
|
| session_cache_ttl_seconds,
|
| global_stats
|
| )
|
|
|
|
|
| def update_account_disabled_status(
|
| account_id: str,
|
| disabled: bool,
|
| multi_account_mgr: MultiAccountManager,
|
| http_client,
|
| user_agent: str,
|
| account_failure_threshold: int,
|
| rate_limit_cooldown_seconds: int,
|
| session_cache_ttl_seconds: int,
|
| global_stats: dict
|
| ) -> MultiAccountManager:
|
| """更新账户的禁用状态"""
|
| accounts_data = load_accounts_from_source()
|
|
|
|
|
| found = False
|
| for i, acc in enumerate(accounts_data, 1):
|
| if get_account_id(acc, i) == account_id:
|
| acc["disabled"] = disabled
|
| found = True
|
| break
|
|
|
| if not found:
|
| raise ValueError(f"账户 {account_id} 不存在")
|
|
|
| save_accounts_to_file(accounts_data)
|
| new_mgr = reload_accounts(
|
| multi_account_mgr,
|
| http_client,
|
| user_agent,
|
| account_failure_threshold,
|
| rate_limit_cooldown_seconds,
|
| session_cache_ttl_seconds,
|
| global_stats
|
| )
|
|
|
| status_text = "已禁用" if disabled else "已启用"
|
| logger.info(f"[CONFIG] 账户 {account_id} {status_text}")
|
| return new_mgr
|
|
|