File size: 7,605 Bytes
77169b4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | """
账号池:从配置加载代理组与账号,按 type 轮询 acquire。
除基础的全局轮询外,还支持:
- 按 proxy_key 反查代理组
- 在指定代理组内选择某个 type 的可用账号
- 排除当前账号后为 tab 切号选择备选账号
- 在未打开浏览器的代理组中选择某个 type 的候选账号
"""
from dataclasses import replace
from typing import Iterator
from core.config.schema import AccountConfig, ProxyGroupConfig
from core.constants import TIMEZONE
from core.runtime.keys import ProxyKey
class AccountPool:
"""
多 IP / 多账号池,按 type 过滤后轮询。
acquire(type) 返回 (ProxyGroupConfig, AccountConfig)。
get_group_by_proxy_key / acquire_from_group 供现役浏览器复用时使用。
"""
def __init__(self, groups: list[ProxyGroupConfig]) -> None:
self._groups = list(groups)
self._indices: dict[str, int] = {} # type -> 全局轮询下标
self._group_type_indices: dict[
tuple[str, str], int
] = {} # (fingerprint_id, type) -> 组内轮询下标
@classmethod
def from_groups(cls, groups: list[ProxyGroupConfig]) -> "AccountPool":
return cls(groups)
def reload(self, groups: list[ProxyGroupConfig]) -> None:
"""用新加载的配置替换当前组(如更新解冻时间后从 repository 重新 load_groups)。"""
self._groups = list(groups)
def groups(self) -> list[ProxyGroupConfig]:
"""返回当前全部代理组。"""
return list(self._groups)
def _accounts_by_type(
self, type_name: str
) -> Iterator[tuple[ProxyGroupConfig, AccountConfig]]:
"""按 type 遍历所有 (group, account),仅包含当前可用的账号(解冻时间已过或未设置)。"""
for g in self._groups:
for a in g.accounts:
if a.type == type_name and a.is_available():
yield g, a
def acquire(self, type_name: str) -> tuple[ProxyGroupConfig, AccountConfig]:
"""
按 type 轮询获取一组 (ProxyGroupConfig, AccountConfig)。
若该 type 无账号则抛出 ValueError。
"""
pairs = list(self._accounts_by_type(type_name))
if not pairs:
raise ValueError(f"没有类别为 {type_name!r} 的账号,请先在配置中添加")
n = len(pairs)
idx = self._indices.get(type_name, 0) % n
self._indices[type_name] = (idx + 1) % n
return pairs[idx]
def account_id(self, group: ProxyGroupConfig, account: AccountConfig) -> str:
"""生成账号唯一标识,用于会话缓存等。"""
return f"{group.fingerprint_id}:{account.name}"
def get_account_by_id(
self, account_id: str
) -> tuple[ProxyGroupConfig, AccountConfig] | None:
"""根据 account_id(fingerprint_id:name)反查 (group, account),用于复用会话时取 auth。"""
for g in self._groups:
for a in g.accounts:
if self.account_id(g, a) == account_id:
return g, a
return None
def get_group_by_proxy_key(self, proxy_key: ProxyKey) -> ProxyGroupConfig | None:
"""根据 proxy_key(proxy_host, proxy_user, fingerprint_id, use_proxy, timezone)反查对应代理组。"""
pk_tz = getattr(proxy_key, "timezone", None) or TIMEZONE
for g in self._groups:
g_tz = g.timezone or TIMEZONE
if (
g.proxy_host == proxy_key.proxy_host
and g.proxy_user == proxy_key.proxy_user
and g.fingerprint_id == proxy_key.fingerprint_id
and g.use_proxy == getattr(proxy_key, "use_proxy", True)
and g_tz == pk_tz
):
return g
return None
def acquire_from_group(
self,
group: ProxyGroupConfig,
type_name: str,
) -> tuple[ProxyGroupConfig, AccountConfig] | None:
"""
从指定 group 内按 type 轮询取一个账号;若无该 type 则返回 None。
供「现役浏览器对应 IP 组是否还有该 type 可用」时使用。
"""
pairs = [(g, a) for g, a in self._accounts_by_type(type_name) if g is group]
if not pairs:
return None
n = len(pairs)
key = (group.fingerprint_id, type_name)
idx = self._group_type_indices.get(key, 0) % n
self._group_type_indices[key] = (idx + 1) % n
return pairs[idx]
def available_accounts_in_group(
self,
group: ProxyGroupConfig,
type_name: str,
*,
exclude_account_ids: set[str] | None = None,
) -> list[AccountConfig]:
"""返回某代理组下指定 type 的全部可用账号,可排除若干 account_id。"""
exclude = exclude_account_ids or set()
return [
a
for g, a in self._accounts_by_type(type_name)
if g is group and self.account_id(group, a) not in exclude
]
def has_available_account_in_group(
self,
group: ProxyGroupConfig,
type_name: str,
*,
exclude_account_ids: set[str] | None = None,
) -> bool:
"""判断某代理组下是否仍有指定 type 的可用账号。"""
return bool(
self.available_accounts_in_group(
group,
type_name,
exclude_account_ids=exclude_account_ids,
)
)
def next_available_account_in_group(
self,
group: ProxyGroupConfig,
type_name: str,
*,
exclude_account_ids: set[str] | None = None,
) -> AccountConfig | None:
"""
在指定代理组内按轮询选择一个可用账号。
支持排除当前已绑定账号,用于 drained 后切号。
"""
accounts = self.available_accounts_in_group(
group,
type_name,
exclude_account_ids=exclude_account_ids,
)
if not accounts:
return None
n = len(accounts)
key = (group.fingerprint_id, type_name)
idx = self._group_type_indices.get(key, 0) % n
self._group_type_indices[key] = (idx + 1) % n
return accounts[idx]
def next_available_pair(
self,
type_name: str,
*,
exclude_fingerprint_ids: set[str] | None = None,
) -> tuple[ProxyGroupConfig, AccountConfig] | None:
"""
全局按 type 轮询选择一个可用账号,可排除若干代理组。
用于“未打开浏览器的组里挑一个候选账号”。
"""
exclude = exclude_fingerprint_ids or set()
pairs = [
(g, a)
for g, a in self._accounts_by_type(type_name)
if g.fingerprint_id not in exclude
]
if not pairs:
return None
n = len(pairs)
idx = self._indices.get(type_name, 0) % n
self._indices[type_name] = (idx + 1) % n
return pairs[idx]
def update_account_unfreeze_at(
self,
fingerprint_id: str,
account_name: str,
unfreeze_at: int | None,
) -> bool:
for group in self._groups:
if group.fingerprint_id != fingerprint_id:
continue
for index, account in enumerate(group.accounts):
if account.name != account_name:
continue
group.accounts[index] = replace(account, unfreeze_at=unfreeze_at)
return True
return False
|