File size: 7,605 Bytes
77169b4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
账号池:从配置加载代理组与账号,按 type 轮询 acquire。

除基础的全局轮询外,还支持:

- 按 proxy_key 反查代理组
- 在指定代理组内选择某个 type 的可用账号
- 排除当前账号后为 tab 切号选择备选账号
- 在未打开浏览器的代理组中选择某个 type 的候选账号
"""

from dataclasses import replace
from typing import Iterator

from core.config.schema import AccountConfig, ProxyGroupConfig
from core.constants import TIMEZONE
from core.runtime.keys import ProxyKey


class AccountPool:
    """
    多 IP / 多账号池,按 type 过滤后轮询。
    acquire(type) 返回 (ProxyGroupConfig, AccountConfig)。
    get_group_by_proxy_key / acquire_from_group 供现役浏览器复用时使用。
    """

    def __init__(self, groups: list[ProxyGroupConfig]) -> None:
        self._groups = list(groups)
        self._indices: dict[str, int] = {}  # type -> 全局轮询下标
        self._group_type_indices: dict[
            tuple[str, str], int
        ] = {}  # (fingerprint_id, type) -> 组内轮询下标

    @classmethod
    def from_groups(cls, groups: list[ProxyGroupConfig]) -> "AccountPool":
        return cls(groups)

    def reload(self, groups: list[ProxyGroupConfig]) -> None:
        """用新加载的配置替换当前组(如更新解冻时间后从 repository 重新 load_groups)。"""
        self._groups = list(groups)

    def groups(self) -> list[ProxyGroupConfig]:
        """返回当前全部代理组。"""
        return list(self._groups)

    def _accounts_by_type(
        self, type_name: str
    ) -> Iterator[tuple[ProxyGroupConfig, AccountConfig]]:
        """按 type 遍历所有 (group, account),仅包含当前可用的账号(解冻时间已过或未设置)。"""
        for g in self._groups:
            for a in g.accounts:
                if a.type == type_name and a.is_available():
                    yield g, a

    def acquire(self, type_name: str) -> tuple[ProxyGroupConfig, AccountConfig]:
        """
        按 type 轮询获取一组 (ProxyGroupConfig, AccountConfig)。
        若该 type 无账号则抛出 ValueError。
        """
        pairs = list(self._accounts_by_type(type_name))
        if not pairs:
            raise ValueError(f"没有类别为 {type_name!r} 的账号,请先在配置中添加")
        n = len(pairs)
        idx = self._indices.get(type_name, 0) % n
        self._indices[type_name] = (idx + 1) % n
        return pairs[idx]

    def account_id(self, group: ProxyGroupConfig, account: AccountConfig) -> str:
        """生成账号唯一标识,用于会话缓存等。"""
        return f"{group.fingerprint_id}:{account.name}"

    def get_account_by_id(
        self, account_id: str
    ) -> tuple[ProxyGroupConfig, AccountConfig] | None:
        """根据 account_id(fingerprint_id:name)反查 (group, account),用于复用会话时取 auth。"""
        for g in self._groups:
            for a in g.accounts:
                if self.account_id(g, a) == account_id:
                    return g, a
        return None

    def get_group_by_proxy_key(self, proxy_key: ProxyKey) -> ProxyGroupConfig | None:
        """根据 proxy_key(proxy_host, proxy_user, fingerprint_id, use_proxy, timezone)反查对应代理组。"""
        pk_tz = getattr(proxy_key, "timezone", None) or TIMEZONE
        for g in self._groups:
            g_tz = g.timezone or TIMEZONE
            if (
                g.proxy_host == proxy_key.proxy_host
                and g.proxy_user == proxy_key.proxy_user
                and g.fingerprint_id == proxy_key.fingerprint_id
                and g.use_proxy == getattr(proxy_key, "use_proxy", True)
                and g_tz == pk_tz
            ):
                return g
        return None

    def acquire_from_group(
        self,
        group: ProxyGroupConfig,
        type_name: str,
    ) -> tuple[ProxyGroupConfig, AccountConfig] | None:
        """
        从指定 group 内按 type 轮询取一个账号;若无该 type 则返回 None。
        供「现役浏览器对应 IP 组是否还有该 type 可用」时使用。
        """
        pairs = [(g, a) for g, a in self._accounts_by_type(type_name) if g is group]
        if not pairs:
            return None
        n = len(pairs)
        key = (group.fingerprint_id, type_name)
        idx = self._group_type_indices.get(key, 0) % n
        self._group_type_indices[key] = (idx + 1) % n
        return pairs[idx]

    def available_accounts_in_group(
        self,
        group: ProxyGroupConfig,
        type_name: str,
        *,
        exclude_account_ids: set[str] | None = None,
    ) -> list[AccountConfig]:
        """返回某代理组下指定 type 的全部可用账号,可排除若干 account_id。"""
        exclude = exclude_account_ids or set()
        return [
            a
            for g, a in self._accounts_by_type(type_name)
            if g is group and self.account_id(group, a) not in exclude
        ]

    def has_available_account_in_group(
        self,
        group: ProxyGroupConfig,
        type_name: str,
        *,
        exclude_account_ids: set[str] | None = None,
    ) -> bool:
        """判断某代理组下是否仍有指定 type 的可用账号。"""
        return bool(
            self.available_accounts_in_group(
                group,
                type_name,
                exclude_account_ids=exclude_account_ids,
            )
        )

    def next_available_account_in_group(
        self,
        group: ProxyGroupConfig,
        type_name: str,
        *,
        exclude_account_ids: set[str] | None = None,
    ) -> AccountConfig | None:
        """
        在指定代理组内按轮询选择一个可用账号。
        支持排除当前已绑定账号,用于 drained 后切号。
        """
        accounts = self.available_accounts_in_group(
            group,
            type_name,
            exclude_account_ids=exclude_account_ids,
        )
        if not accounts:
            return None
        n = len(accounts)
        key = (group.fingerprint_id, type_name)
        idx = self._group_type_indices.get(key, 0) % n
        self._group_type_indices[key] = (idx + 1) % n
        return accounts[idx]

    def next_available_pair(
        self,
        type_name: str,
        *,
        exclude_fingerprint_ids: set[str] | None = None,
    ) -> tuple[ProxyGroupConfig, AccountConfig] | None:
        """
        全局按 type 轮询选择一个可用账号,可排除若干代理组。
        用于“未打开浏览器的组里挑一个候选账号”。
        """
        exclude = exclude_fingerprint_ids or set()
        pairs = [
            (g, a)
            for g, a in self._accounts_by_type(type_name)
            if g.fingerprint_id not in exclude
        ]
        if not pairs:
            return None
        n = len(pairs)
        idx = self._indices.get(type_name, 0) % n
        self._indices[type_name] = (idx + 1) % n
        return pairs[idx]

    def update_account_unfreeze_at(
        self,
        fingerprint_id: str,
        account_name: str,
        unfreeze_at: int | None,
    ) -> bool:
        for group in self._groups:
            if group.fingerprint_id != fingerprint_id:
                continue
            for index, account in enumerate(group.accounts):
                if account.name != account_name:
                    continue
                group.accounts[index] = replace(account, unfreeze_at=unfreeze_at)
                return True
        return False