| """ |
| 账号池:从配置加载代理组与账号,按 type 轮询 acquire。 |
| |
| 除基础的全局轮询外,还支持: |
| |
| - 按 proxy_key 反查代理组 |
| - 在指定代理组内选择某个 type 的可用账号 |
| - 排除当前账号后为 tab 切号选择备选账号 |
| - 在未打开浏览器的代理组中选择某个 type 的候选账号 |
| """ |
|
|
| from dataclasses import replace |
| from typing import Iterator |
|
|
| from core.config.schema import AccountConfig, ProxyGroupConfig |
| from core.constants import TIMEZONE |
| from core.runtime.keys import ProxyKey |
|
|
|
|
| class AccountPool: |
| """ |
| 多 IP / 多账号池,按 type 过滤后轮询。 |
| acquire(type) 返回 (ProxyGroupConfig, AccountConfig)。 |
| get_group_by_proxy_key / acquire_from_group 供现役浏览器复用时使用。 |
| """ |
|
|
| def __init__(self, groups: list[ProxyGroupConfig]) -> None: |
| self._groups = list(groups) |
| self._indices: dict[str, int] = {} |
| self._group_type_indices: dict[ |
| tuple[str, str], int |
| ] = {} |
|
|
| @classmethod |
| def from_groups(cls, groups: list[ProxyGroupConfig]) -> "AccountPool": |
| return cls(groups) |
|
|
| def reload(self, groups: list[ProxyGroupConfig]) -> None: |
| """用新加载的配置替换当前组(如更新解冻时间后从 repository 重新 load_groups)。""" |
| self._groups = list(groups) |
|
|
| def groups(self) -> list[ProxyGroupConfig]: |
| """返回当前全部代理组。""" |
| return list(self._groups) |
|
|
| def _accounts_by_type( |
| self, type_name: str |
| ) -> Iterator[tuple[ProxyGroupConfig, AccountConfig]]: |
| """按 type 遍历所有 (group, account),仅包含当前可用的账号(解冻时间已过或未设置)。""" |
| for g in self._groups: |
| for a in g.accounts: |
| if a.type == type_name and a.is_available(): |
| yield g, a |
|
|
| def acquire(self, type_name: str) -> tuple[ProxyGroupConfig, AccountConfig]: |
| """ |
| 按 type 轮询获取一组 (ProxyGroupConfig, AccountConfig)。 |
| 若该 type 无账号则抛出 ValueError。 |
| """ |
| pairs = list(self._accounts_by_type(type_name)) |
| if not pairs: |
| raise ValueError(f"没有类别为 {type_name!r} 的账号,请先在配置中添加") |
| n = len(pairs) |
| idx = self._indices.get(type_name, 0) % n |
| self._indices[type_name] = (idx + 1) % n |
| return pairs[idx] |
|
|
| def account_id(self, group: ProxyGroupConfig, account: AccountConfig) -> str: |
| """生成账号唯一标识,用于会话缓存等。""" |
| return f"{group.fingerprint_id}:{account.name}" |
|
|
| def get_account_by_id( |
| self, account_id: str |
| ) -> tuple[ProxyGroupConfig, AccountConfig] | None: |
| """根据 account_id(fingerprint_id:name)反查 (group, account),用于复用会话时取 auth。""" |
| for g in self._groups: |
| for a in g.accounts: |
| if self.account_id(g, a) == account_id: |
| return g, a |
| return None |
|
|
| def get_group_by_proxy_key(self, proxy_key: ProxyKey) -> ProxyGroupConfig | None: |
| """根据 proxy_key(proxy_host, proxy_user, fingerprint_id, use_proxy, timezone)反查对应代理组。""" |
| pk_tz = getattr(proxy_key, "timezone", None) or TIMEZONE |
| for g in self._groups: |
| g_tz = g.timezone or TIMEZONE |
| if ( |
| g.proxy_host == proxy_key.proxy_host |
| and g.proxy_user == proxy_key.proxy_user |
| and g.fingerprint_id == proxy_key.fingerprint_id |
| and g.use_proxy == getattr(proxy_key, "use_proxy", True) |
| and g_tz == pk_tz |
| ): |
| return g |
| return None |
|
|
| def acquire_from_group( |
| self, |
| group: ProxyGroupConfig, |
| type_name: str, |
| ) -> tuple[ProxyGroupConfig, AccountConfig] | None: |
| """ |
| 从指定 group 内按 type 轮询取一个账号;若无该 type 则返回 None。 |
| 供「现役浏览器对应 IP 组是否还有该 type 可用」时使用。 |
| """ |
| pairs = [(g, a) for g, a in self._accounts_by_type(type_name) if g is group] |
| if not pairs: |
| return None |
| n = len(pairs) |
| key = (group.fingerprint_id, type_name) |
| idx = self._group_type_indices.get(key, 0) % n |
| self._group_type_indices[key] = (idx + 1) % n |
| return pairs[idx] |
|
|
| def available_accounts_in_group( |
| self, |
| group: ProxyGroupConfig, |
| type_name: str, |
| *, |
| exclude_account_ids: set[str] | None = None, |
| ) -> list[AccountConfig]: |
| """返回某代理组下指定 type 的全部可用账号,可排除若干 account_id。""" |
| exclude = exclude_account_ids or set() |
| return [ |
| a |
| for g, a in self._accounts_by_type(type_name) |
| if g is group and self.account_id(group, a) not in exclude |
| ] |
|
|
| def has_available_account_in_group( |
| self, |
| group: ProxyGroupConfig, |
| type_name: str, |
| *, |
| exclude_account_ids: set[str] | None = None, |
| ) -> bool: |
| """判断某代理组下是否仍有指定 type 的可用账号。""" |
| return bool( |
| self.available_accounts_in_group( |
| group, |
| type_name, |
| exclude_account_ids=exclude_account_ids, |
| ) |
| ) |
|
|
| def next_available_account_in_group( |
| self, |
| group: ProxyGroupConfig, |
| type_name: str, |
| *, |
| exclude_account_ids: set[str] | None = None, |
| ) -> AccountConfig | None: |
| """ |
| 在指定代理组内按轮询选择一个可用账号。 |
| 支持排除当前已绑定账号,用于 drained 后切号。 |
| """ |
| accounts = self.available_accounts_in_group( |
| group, |
| type_name, |
| exclude_account_ids=exclude_account_ids, |
| ) |
| if not accounts: |
| return None |
| n = len(accounts) |
| key = (group.fingerprint_id, type_name) |
| idx = self._group_type_indices.get(key, 0) % n |
| self._group_type_indices[key] = (idx + 1) % n |
| return accounts[idx] |
|
|
| def next_available_pair( |
| self, |
| type_name: str, |
| *, |
| exclude_fingerprint_ids: set[str] | None = None, |
| ) -> tuple[ProxyGroupConfig, AccountConfig] | None: |
| """ |
| 全局按 type 轮询选择一个可用账号,可排除若干代理组。 |
| 用于“未打开浏览器的组里挑一个候选账号”。 |
| """ |
| exclude = exclude_fingerprint_ids or set() |
| pairs = [ |
| (g, a) |
| for g, a in self._accounts_by_type(type_name) |
| if g.fingerprint_id not in exclude |
| ] |
| if not pairs: |
| return None |
| n = len(pairs) |
| idx = self._indices.get(type_name, 0) % n |
| self._indices[type_name] = (idx + 1) % n |
| return pairs[idx] |
|
|
| def update_account_unfreeze_at( |
| self, |
| fingerprint_id: str, |
| account_name: str, |
| unfreeze_at: int | None, |
| ) -> bool: |
| for group in self._groups: |
| if group.fingerprint_id != fingerprint_id: |
| continue |
| for index, account in enumerate(group.accounts): |
| if account.name != account_name: |
| continue |
| group.accounts[index] = replace(account, unfreeze_at=unfreeze_at) |
| return True |
| return False |
|
|