Spaces:
Paused
Paused
| """ | |
| 聊天请求编排:解析 session_id、调度 browser/tab/session、调用插件流式补全, | |
| 并在响应末尾附加零宽字符编码的会话 ID。 | |
| 当前调度模型: | |
| - 一个浏览器对应一个代理组 | |
| - 一个浏览器内,一个 type 只有一个 tab | |
| - 一个 tab 绑定一个 account,只有 drained 后才能切号 | |
| - 一个 session 绑定到某个 tab/account;复用成功时不传完整历史 | |
| - 无法复用时,新建会话并回放完整历史 | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import time | |
| import uuid | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, AsyncIterator, cast | |
| from playwright.async_api import BrowserContext, Page | |
| from core.account.pool import AccountPool | |
| from core.config.repository import ConfigRepository | |
| from core.config.schema import AccountConfig, ProxyGroupConfig | |
| from core.config.settings import get | |
| from core.constants import TIMEZONE | |
| from core.plugin.base import ( | |
| AccountFrozenError, | |
| BaseSitePlugin, | |
| BrowserResourceInvalidError, | |
| PluginRegistry, | |
| ) | |
| from core.plugin.helpers import clear_cookies_for_domain | |
| from core.runtime.browser_manager import BrowserManager, ClosedTabInfo, TabRuntime | |
| from core.runtime.keys import ProxyKey | |
| from core.runtime.local_proxy_forwarder import LocalProxyForwarder, UpstreamProxy, parse_proxy_server | |
| from core.runtime.session_cache import SessionCache, SessionEntry | |
| from core.api.conv_parser import parse_conv_uuid_from_messages, session_id_suffix | |
| from core.api.fingerprint import compute_conversation_fingerprint | |
| from core.api.react import format_react_prompt | |
| from core.api.schemas import OpenAIChatRequest, extract_user_content | |
| from core.hub.schemas import OpenAIStreamEvent | |
| from core.runtime.conversation_index import ConversationIndex | |
| logger = logging.getLogger(__name__) | |
| def _request_messages_as_dicts(req: OpenAIChatRequest) -> list[dict[str, Any]]: | |
| """转为 conv_parser 需要的 list[dict]。""" | |
| out: list[dict[str, Any]] = [] | |
| for m in req.messages: | |
| d: dict[str, Any] = {"role": m.role} | |
| if isinstance(m.content, list): | |
| d["content"] = [p.model_dump() for p in m.content] | |
| else: | |
| d["content"] = m.content | |
| out.append(d) | |
| return out | |
| def _proxy_key_for_group(group: ProxyGroupConfig) -> ProxyKey: | |
| return ProxyKey( | |
| group.proxy_host, | |
| group.proxy_user, | |
| group.fingerprint_id, | |
| group.use_proxy, | |
| group.timezone or TIMEZONE, | |
| ) | |
| class _RequestTarget: | |
| proxy_key: ProxyKey | |
| group: ProxyGroupConfig | |
| account: AccountConfig | |
| context: BrowserContext | |
| page: Page | |
| session_id: str | None | |
| full_history: bool | |
| proxy_url: str | None = None | |
| proxy_auth: tuple[str, str] | None = None | |
| proxy_forwarder: LocalProxyForwarder | None = None | |
| class ChatHandler: | |
| """编排一次 chat 请求:会话解析、tab 调度、插件调用。""" | |
| def __init__( | |
| self, | |
| pool: AccountPool, | |
| session_cache: SessionCache, | |
| browser_manager: BrowserManager, | |
| config_repo: ConfigRepository | None = None, | |
| ) -> None: | |
| self._pool = pool | |
| self._session_cache = session_cache | |
| self._browser_manager = browser_manager | |
| self._config_repo = config_repo | |
| self._conv_index = ConversationIndex() | |
| self._schedule_lock = asyncio.Lock() | |
| self._stop_event = asyncio.Event() | |
| self._busy_sessions: set[str] = set() | |
| self._tab_max_concurrent = int(get("scheduler", "tab_max_concurrent") or 5) | |
| self._gc_interval_seconds = float( | |
| get("scheduler", "browser_gc_interval_seconds") or 300 | |
| ) | |
| self._tab_idle_seconds = float(get("scheduler", "tab_idle_seconds") or 900) | |
| self._resident_browser_count = int( | |
| get("scheduler", "resident_browser_count", 1) | |
| ) | |
| def reload_pool( | |
| self, | |
| groups: list[ProxyGroupConfig], | |
| config_repo: ConfigRepository | None = None, | |
| ) -> None: | |
| """配置热更新后替换账号池与 repository。""" | |
| self._pool.reload(groups) | |
| if config_repo is not None: | |
| self._config_repo = config_repo | |
| async def refresh_configuration( | |
| self, | |
| groups: list[ProxyGroupConfig], | |
| config_repo: ConfigRepository | None = None, | |
| ) -> None: | |
| """配置热更新:替换账号池、清理失效资源,并重新预热常驻浏览器。""" | |
| async with self._schedule_lock: | |
| self.reload_pool(groups, config_repo) | |
| await self._prune_invalid_resources_locked() | |
| await self._reconcile_tabs_locked() | |
| await self.prewarm_resident_browsers() | |
| async def prewarm_resident_browsers(self) -> None: | |
| """启动时预热常驻浏览器,并为其下可用 type 建立 tab。""" | |
| async with self._schedule_lock: | |
| warmed = 0 | |
| for group in self._pool.groups(): | |
| if warmed >= self._resident_browser_count: | |
| break | |
| available_types = { | |
| a.type | |
| for a in group.accounts | |
| if a.is_available() and PluginRegistry.get(a.type) is not None | |
| } | |
| if not available_types: | |
| continue | |
| proxy_key = _proxy_key_for_group(group) | |
| await self._browser_manager.ensure_browser(proxy_key, group.proxy_pass) | |
| for type_name in sorted(available_types): | |
| if self._browser_manager.get_tab(proxy_key, type_name) is not None: | |
| continue | |
| account = self._pool.available_accounts_in_group(group, type_name) | |
| if not account: | |
| continue | |
| chosen = account[0] | |
| plugin = PluginRegistry.get(type_name) | |
| if plugin is None: | |
| continue | |
| await self._browser_manager.open_tab( | |
| proxy_key, | |
| group.proxy_pass, | |
| type_name, | |
| self._pool.account_id(group, chosen), | |
| plugin.create_page, | |
| self._make_apply_auth_fn(plugin, chosen), | |
| ) | |
| warmed += 1 | |
| async def run_maintenance_loop(self) -> None: | |
| """周期性回收空闲浏览器,并收尾 drained/frozen tab。""" | |
| while not self._stop_event.is_set(): | |
| try: | |
| await asyncio.wait_for( | |
| self._stop_event.wait(), | |
| timeout=self._gc_interval_seconds, | |
| ) | |
| break | |
| except asyncio.TimeoutError: | |
| pass | |
| try: | |
| async with self._schedule_lock: | |
| # Evict stale sessions to prevent unbounded accumulation. | |
| stale_ids = self._session_cache.evict_stale() | |
| # Evict stale fingerprint entries in sync. | |
| stale_fp = self._conv_index.evict_stale(ttl=1800.0) | |
| if stale_fp: | |
| logger.info( | |
| "[maintenance] evicted %d stale fingerprint entries", | |
| len(stale_fp), | |
| ) | |
| if stale_ids: | |
| for sid in stale_ids: | |
| plugin_type = None | |
| for pk, entry in self._browser_manager.list_browser_entries(): | |
| for tn, tab in entry.tabs.items(): | |
| if sid in tab.sessions: | |
| tab.sessions.discard(sid) | |
| plugin_type = tn | |
| break | |
| if plugin_type: | |
| plugin = PluginRegistry.get(plugin_type) | |
| if plugin is not None: | |
| plugin.drop_session(sid) | |
| logger.info( | |
| "[maintenance] evicted %d stale sessions, cache size=%d", | |
| len(stale_ids), | |
| len(self._session_cache), | |
| ) | |
| await self._reconcile_tabs_locked() | |
| closed = await self._browser_manager.collect_idle_browsers( | |
| idle_seconds=self._tab_idle_seconds, | |
| resident_browser_count=self._resident_browser_count, | |
| ) | |
| self._apply_closed_tabs_locked(closed) | |
| except Exception: | |
| logger.exception("维护循环执行失败") | |
| async def shutdown(self) -> None: | |
| """停止维护循环并关闭全部浏览器。""" | |
| self._stop_event.set() | |
| async with self._schedule_lock: | |
| closed = await self._browser_manager.close_all() | |
| self._apply_closed_tabs_locked(closed) | |
| def report_account_unfreeze( | |
| self, | |
| fingerprint_id: str, | |
| account_name: str, | |
| unfreeze_at: int, | |
| ) -> None: | |
| """记录账号解冻时间,并同步更新内存账号池。""" | |
| if self._config_repo is None: | |
| return | |
| self._config_repo.update_account_unfreeze_at( | |
| fingerprint_id, account_name, unfreeze_at | |
| ) | |
| self._pool.update_account_unfreeze_at( | |
| fingerprint_id, | |
| account_name, | |
| unfreeze_at, | |
| ) | |
| def get_account_runtime_status(self) -> dict[str, dict[str, Any]]: | |
| """返回当前账号运行时状态,供配置页展示角标。""" | |
| status: dict[str, dict[str, Any]] = {} | |
| for proxy_key, entry in self._browser_manager.list_browser_entries(): | |
| for type_name, tab in entry.tabs.items(): | |
| status[tab.account_id] = { | |
| "fingerprint_id": proxy_key.fingerprint_id, | |
| "type": type_name, | |
| "is_active": True, | |
| "tab_state": tab.state, | |
| "accepting_new": tab.accepting_new, | |
| "active_requests": tab.active_requests, | |
| "frozen_until": tab.frozen_until, | |
| } | |
| return status | |
| def _make_apply_auth_fn( | |
| self, | |
| plugin: Any, | |
| account: AccountConfig, | |
| ) -> Any: | |
| async def _apply_auth(context: BrowserContext, page: Page) -> None: | |
| await plugin.apply_auth(context, page, account.auth) | |
| return _apply_auth | |
| def _apply_closed_tabs_locked(self, closed_tabs: list[ClosedTabInfo]) -> None: | |
| for info in closed_tabs: | |
| self._session_cache.delete_many(info.session_ids) | |
| plugin = PluginRegistry.get(info.type_name) | |
| if plugin is not None: | |
| plugin.drop_sessions(info.session_ids) | |
| def _stream_proxy_settings( | |
| self, | |
| target: _RequestTarget, | |
| ) -> tuple[str | None, tuple[str, str] | None, LocalProxyForwarder | None]: | |
| if not target.proxy_key.use_proxy: | |
| return (None, None, None) | |
| upstream_host, upstream_port = parse_proxy_server(target.proxy_key.proxy_host) | |
| forwarder = LocalProxyForwarder( | |
| UpstreamProxy( | |
| host=upstream_host, | |
| port=upstream_port, | |
| username=target.proxy_key.proxy_user, | |
| password=target.group.proxy_pass, | |
| ), | |
| listen_host="127.0.0.1", | |
| listen_port=0, | |
| on_log=lambda msg: logger.debug("[stream-proxy] %s", msg), | |
| ) | |
| forwarder.start() | |
| return ( | |
| forwarder.proxy_url, | |
| None, | |
| forwarder, | |
| ) | |
| async def _clear_tab_domain_cookies_if_supported( | |
| self, proxy_key: ProxyKey, type_name: str | |
| ) -> None: | |
| """关 tab 前清该 type 对应域名的 cookie(仅支持带 site.cookie_domain 的插件)。""" | |
| entry = self._browser_manager.get_browser_entry(proxy_key) | |
| if entry is None: | |
| return | |
| plugin = PluginRegistry.get(type_name) | |
| if not isinstance(plugin, BaseSitePlugin) or not getattr(plugin, "site", None): | |
| return | |
| try: | |
| await clear_cookies_for_domain(entry.context, plugin.site.cookie_domain) | |
| except Exception as e: | |
| logger.debug("关 tab 前清 cookie 失败 type=%s: %s", type_name, e) | |
| async def _prune_invalid_resources_locked(self) -> None: | |
| """关闭配置中已不存在的浏览器/tab,避免热更新后继续使用失效资源。""" | |
| for proxy_key, entry in list(self._browser_manager.list_browser_entries()): | |
| group = self._pool.get_group_by_proxy_key(proxy_key) | |
| if group is None: | |
| self._apply_closed_tabs_locked( | |
| await self._browser_manager.close_browser(proxy_key) | |
| ) | |
| continue | |
| for type_name in list(entry.tabs.keys()): | |
| tab = entry.tabs[type_name] | |
| pair = self._pool.get_account_by_id(tab.account_id) | |
| if ( | |
| pair is None | |
| or pair[0] is not group | |
| or pair[1].type != type_name | |
| or not pair[1].enabled | |
| ): | |
| self._invalidate_tab_sessions_locked(proxy_key, type_name) | |
| if tab.active_requests == 0: | |
| # 与 reconcile 一致:优先同组同一页 re-auth,失败或无可用账号再关 tab | |
| switched = False | |
| group = self._pool.get_group_by_proxy_key(proxy_key) | |
| if group is not None: | |
| next_account = self._pool.next_available_account_in_group( | |
| group, | |
| type_name, | |
| exclude_account_ids={tab.account_id}, | |
| ) | |
| if next_account is not None: | |
| plugin = PluginRegistry.get(type_name) | |
| if plugin is not None: | |
| switched = ( | |
| await self._browser_manager.switch_tab_account( | |
| proxy_key, | |
| type_name, | |
| self._pool.account_id(group, next_account), | |
| self._make_apply_auth_fn( | |
| plugin, | |
| next_account, | |
| ), | |
| ) | |
| ) | |
| if not switched: | |
| await self._clear_tab_domain_cookies_if_supported( | |
| proxy_key, type_name | |
| ) | |
| closed = await self._browser_manager.close_tab( | |
| proxy_key, type_name | |
| ) | |
| if closed is not None: | |
| self._apply_closed_tabs_locked([closed]) | |
| else: | |
| self._browser_manager.mark_tab_draining(proxy_key, type_name) | |
| def _invalidate_session_locked( | |
| self, | |
| session_id: str, | |
| entry: SessionEntry | None = None, | |
| ) -> None: | |
| entry = entry or self._session_cache.get(session_id) | |
| if entry is None: | |
| return | |
| self._session_cache.delete(session_id) | |
| self._conv_index.remove_session(session_id) | |
| self._browser_manager.unregister_session( | |
| entry.proxy_key, | |
| entry.type_name, | |
| session_id, | |
| ) | |
| plugin = PluginRegistry.get(entry.type_name) | |
| if plugin is not None: | |
| plugin.drop_session(session_id) | |
| def _invalidate_tab_sessions_locked( | |
| self, | |
| proxy_key: ProxyKey, | |
| type_name: str, | |
| ) -> None: | |
| tab = self._browser_manager.get_tab(proxy_key, type_name) | |
| if tab is None or not tab.sessions: | |
| return | |
| session_ids = list(tab.sessions) | |
| self._session_cache.delete_many(session_ids) | |
| plugin = PluginRegistry.get(type_name) | |
| if plugin is not None: | |
| plugin.drop_sessions(session_ids) | |
| tab.sessions.clear() | |
| async def _recover_browser_resource_invalid_locked( | |
| self, | |
| type_name: str, | |
| target: _RequestTarget, | |
| request_id: str, | |
| active_session_id: str | None, | |
| error: BrowserResourceInvalidError, | |
| attempt: int, | |
| max_retries: int, | |
| ) -> None: | |
| account_id = self._pool.account_id(target.group, target.account) | |
| diagnostics = self._browser_manager.browser_diagnostics(target.proxy_key) | |
| logger.warning( | |
| "[chat] browser resource invalid attempt=%s/%s type=%s proxy=%s account=%s session_id=%s request_id=%s resource=%s helper=%s stage=%s stream_phase=%s browser_present=%s proc_alive=%s cdp_listening=%s tab_count=%s active_requests=%s err=%s", | |
| attempt + 1, | |
| max_retries, | |
| type_name, | |
| target.proxy_key.fingerprint_id, | |
| account_id, | |
| active_session_id, | |
| request_id, | |
| error.resource_hint, | |
| error.helper_name, | |
| error.stage, | |
| error.stream_phase, | |
| diagnostics.get("browser_present"), | |
| diagnostics.get("proc_alive"), | |
| diagnostics.get("cdp_listening"), | |
| diagnostics.get("tab_count"), | |
| diagnostics.get("active_requests"), | |
| error, | |
| ) | |
| stderr_tail = str(diagnostics.get("stderr_tail") or "").strip() | |
| if stderr_tail: | |
| logger.warning( | |
| "[chat] browser resource invalid stderr tail proxy=%s request_id=%s:\n%s", | |
| target.proxy_key.fingerprint_id, | |
| request_id, | |
| stderr_tail, | |
| ) | |
| if active_session_id is not None: | |
| self._invalidate_session_locked(active_session_id) | |
| if error.resource_hint == "transport": | |
| logger.warning( | |
| "[chat] transport-level stream failure, keep tab/browser and retry proxy=%s request_id=%s", | |
| target.proxy_key.fingerprint_id, | |
| request_id, | |
| ) | |
| return | |
| self._browser_manager.mark_tab_draining(target.proxy_key, type_name) | |
| browser_restart_reason: str | None = None | |
| if error.resource_hint == "browser": | |
| browser_restart_reason = "resource_hint" | |
| # Legacy: page_fetch transport is no longer used by Claude (context_request since v0.x). | |
| # Kept for potential future plugins that still use page_fetch transport. | |
| elif ( | |
| error.helper_name == "stream_raw_via_page_fetch" | |
| and error.stage in {"read_timeout", "evaluate_timeout"} | |
| ): | |
| browser_restart_reason = f"{error.helper_name}:{error.stage}" | |
| if browser_restart_reason is not None: | |
| logger.warning( | |
| "[chat] escalating browser recovery to full restart proxy=%s request_id=%s reason=%s", | |
| target.proxy_key.fingerprint_id, | |
| request_id, | |
| browser_restart_reason, | |
| ) | |
| closed = await self._browser_manager.close_browser(target.proxy_key) | |
| self._apply_closed_tabs_locked(closed) | |
| return | |
| self._invalidate_tab_sessions_locked(target.proxy_key, type_name) | |
| closed = await self._browser_manager.close_tab(target.proxy_key, type_name) | |
| if closed is not None: | |
| self._apply_closed_tabs_locked([closed]) | |
| def _revive_tab_if_possible_locked( | |
| self, | |
| proxy_key: ProxyKey, | |
| type_name: str, | |
| ) -> bool: | |
| tab = self._browser_manager.get_tab(proxy_key, type_name) | |
| if tab is None or tab.active_requests != 0: | |
| return False | |
| if tab.accepting_new: | |
| return True | |
| pair = self._pool.get_account_by_id(tab.account_id) | |
| if pair is None: | |
| return False | |
| _, account = pair | |
| if not account.is_available(): | |
| return False | |
| tab.accepting_new = True | |
| tab.state = "ready" | |
| tab.frozen_until = None | |
| tab.last_used_at = time.time() | |
| return True | |
| async def _reconcile_tabs_locked(self) -> None: | |
| """ | |
| 收尾所有 non-ready tab: | |
| - 若原账号已恢复可用,则恢复 tab | |
| - 否则若同组有其他可用账号,则在 drained 后切号 | |
| - 否则关闭 tab | |
| """ | |
| for proxy_key, entry in list(self._browser_manager.list_browser_entries()): | |
| for type_name in list(entry.tabs.keys()): | |
| tab = entry.tabs[type_name] | |
| if tab.accepting_new: | |
| continue | |
| if tab.active_requests != 0: | |
| continue | |
| if self._revive_tab_if_possible_locked(proxy_key, type_name): | |
| continue | |
| group = self._pool.get_group_by_proxy_key(proxy_key) | |
| if group is None: | |
| await self._clear_tab_domain_cookies_if_supported( | |
| proxy_key, type_name | |
| ) | |
| closed = await self._browser_manager.close_tab(proxy_key, type_name) | |
| if closed is not None: | |
| self._apply_closed_tabs_locked([closed]) | |
| continue | |
| next_account = self._pool.next_available_account_in_group( | |
| group, | |
| type_name, | |
| exclude_account_ids={tab.account_id}, | |
| ) | |
| if next_account is not None: | |
| plugin = PluginRegistry.get(type_name) | |
| if plugin is None: | |
| continue | |
| self._invalidate_tab_sessions_locked(proxy_key, type_name) | |
| switched = await self._browser_manager.switch_tab_account( | |
| proxy_key, | |
| type_name, | |
| self._pool.account_id(group, next_account), | |
| self._make_apply_auth_fn(plugin, next_account), | |
| ) | |
| if switched: | |
| continue | |
| await self._clear_tab_domain_cookies_if_supported(proxy_key, type_name) | |
| closed = await self._browser_manager.close_tab(proxy_key, type_name) | |
| if closed is not None: | |
| self._apply_closed_tabs_locked([closed]) | |
| async def _reuse_session_target_locked( | |
| self, | |
| plugin: Any, | |
| type_name: str, | |
| session_id: str, | |
| ) -> _RequestTarget | None: | |
| entry = self._session_cache.get(session_id) | |
| if entry is None or entry.type_name != type_name: | |
| return None | |
| pair = self._pool.get_account_by_id(entry.account_id) | |
| if pair is None: | |
| self._invalidate_session_locked(session_id, entry) | |
| return None | |
| group, account = pair | |
| tab = self._browser_manager.get_tab(entry.proxy_key, type_name) | |
| if ( | |
| tab is None | |
| or tab.account_id != entry.account_id | |
| or not plugin.has_session(session_id) | |
| ): | |
| self._invalidate_session_locked(session_id, entry) | |
| return None | |
| if not tab.accepting_new: | |
| self._invalidate_session_locked(session_id, entry) | |
| return None | |
| if session_id in self._busy_sessions: | |
| raise RuntimeError("当前会话正在处理中,请稍后再试") | |
| if tab.active_requests >= self._tab_max_concurrent: | |
| raise RuntimeError("当前会话所在 tab 繁忙,请稍后再试") | |
| page = self._browser_manager.acquire_tab( | |
| entry.proxy_key, | |
| type_name, | |
| self._tab_max_concurrent, | |
| ) | |
| if page is None: | |
| raise RuntimeError("当前会话暂不可复用,请稍后再试") | |
| self._session_cache.touch(session_id) | |
| self._busy_sessions.add(session_id) | |
| context = await self._browser_manager.ensure_browser( | |
| entry.proxy_key, | |
| group.proxy_pass, | |
| ) | |
| return _RequestTarget( | |
| proxy_key=entry.proxy_key, | |
| group=group, | |
| account=account, | |
| context=context, | |
| page=page, | |
| session_id=session_id, | |
| full_history=False, | |
| ) | |
| async def _allocate_new_target_locked( | |
| self, | |
| type_name: str, | |
| ) -> _RequestTarget: | |
| # 1. 已打开浏览器里已有该 type 的可服务 tab,直接复用。 | |
| existing_tabs: list[tuple[int, float, ProxyKey, TabRuntime]] = [] | |
| for proxy_key, entry in self._browser_manager.list_browser_entries(): | |
| tab = entry.tabs.get(type_name) | |
| if ( | |
| tab is not None | |
| and tab.accepting_new | |
| and tab.active_requests < self._tab_max_concurrent | |
| ): | |
| existing_tabs.append( | |
| (tab.active_requests, tab.last_used_at, proxy_key, tab) | |
| ) | |
| if existing_tabs: | |
| _, _, proxy_key, tab = min(existing_tabs, key=lambda item: item[:2]) | |
| pair = self._pool.get_account_by_id(tab.account_id) | |
| if pair is None: | |
| self._invalidate_tab_sessions_locked(proxy_key, type_name) | |
| closed = await self._browser_manager.close_tab(proxy_key, type_name) | |
| if closed is not None: | |
| self._apply_closed_tabs_locked([closed]) | |
| else: | |
| group, account = pair | |
| page = self._browser_manager.acquire_tab( | |
| proxy_key, | |
| type_name, | |
| self._tab_max_concurrent, | |
| ) | |
| if page is not None: | |
| context = await self._browser_manager.ensure_browser( | |
| proxy_key, | |
| group.proxy_pass, | |
| ) | |
| return _RequestTarget( | |
| proxy_key=proxy_key, | |
| group=group, | |
| account=account, | |
| context=context, | |
| page=page, | |
| session_id=None, | |
| full_history=True, | |
| ) | |
| # 2. 已打开浏览器里还没有该 type tab,但该组有可用账号,直接建新 tab。 | |
| open_browser_candidates: list[ | |
| tuple[int, float, ProxyKey, ProxyGroupConfig] | |
| ] = [] | |
| for proxy_key, entry in self._browser_manager.list_browser_entries(): | |
| if type_name in entry.tabs: | |
| continue | |
| group = self._pool.get_group_by_proxy_key(proxy_key) | |
| if group is None: | |
| continue | |
| if not self._pool.has_available_account_in_group(group, type_name): | |
| continue | |
| open_browser_candidates.append( | |
| ( | |
| self._browser_manager.browser_load(proxy_key), | |
| entry.last_used_at, | |
| proxy_key, | |
| group, | |
| ) | |
| ) | |
| if open_browser_candidates: | |
| _, _, proxy_key, group = min( | |
| open_browser_candidates, key=lambda item: item[:2] | |
| ) | |
| account = self._pool.next_available_account_in_group(group, type_name) | |
| if account is not None: | |
| plugin = PluginRegistry.get(type_name) | |
| if plugin is None: | |
| raise ValueError(f"未注册的 type: {type_name}") | |
| await self._browser_manager.open_tab( | |
| proxy_key, | |
| group.proxy_pass, | |
| type_name, | |
| self._pool.account_id(group, account), | |
| plugin.create_page, | |
| self._make_apply_auth_fn(plugin, account), | |
| ) | |
| page = self._browser_manager.acquire_tab( | |
| proxy_key, | |
| type_name, | |
| self._tab_max_concurrent, | |
| ) | |
| if page is None: | |
| raise RuntimeError("新建 tab 后仍无法占用请求槽位") | |
| context = await self._browser_manager.ensure_browser( | |
| proxy_key, | |
| group.proxy_pass, | |
| ) | |
| return _RequestTarget( | |
| proxy_key=proxy_key, | |
| group=group, | |
| account=account, | |
| context=context, | |
| page=page, | |
| session_id=None, | |
| full_history=True, | |
| ) | |
| # 3. 已打开浏览器里该 type tab 已 drained,且同组有备用账号,可在当前 tab 切号。 | |
| switch_candidates: list[tuple[float, ProxyKey, ProxyGroupConfig]] = [] | |
| for proxy_key, entry in self._browser_manager.list_browser_entries(): | |
| tab = entry.tabs.get(type_name) | |
| if tab is None or tab.active_requests != 0: | |
| continue | |
| group = self._pool.get_group_by_proxy_key(proxy_key) | |
| if group is None: | |
| continue | |
| if not self._pool.has_available_account_in_group( | |
| group, | |
| type_name, | |
| exclude_account_ids={tab.account_id}, | |
| ): | |
| continue | |
| switch_candidates.append((tab.last_used_at, proxy_key, group)) | |
| if switch_candidates: | |
| _, proxy_key, group = min(switch_candidates, key=lambda item: item[0]) | |
| tab = self._browser_manager.get_tab(proxy_key, type_name) | |
| plugin = PluginRegistry.get(type_name) | |
| if tab is not None and plugin is not None: | |
| next_account = self._pool.next_available_account_in_group( | |
| group, | |
| type_name, | |
| exclude_account_ids={tab.account_id}, | |
| ) | |
| if next_account is not None: | |
| self._invalidate_tab_sessions_locked(proxy_key, type_name) | |
| switched = await self._browser_manager.switch_tab_account( | |
| proxy_key, | |
| type_name, | |
| self._pool.account_id(group, next_account), | |
| self._make_apply_auth_fn(plugin, next_account), | |
| ) | |
| if switched: | |
| page = self._browser_manager.acquire_tab( | |
| proxy_key, | |
| type_name, | |
| self._tab_max_concurrent, | |
| ) | |
| if page is None: | |
| raise RuntimeError("切号后仍无法占用请求槽位") | |
| context = await self._browser_manager.ensure_browser( | |
| proxy_key, | |
| group.proxy_pass, | |
| ) | |
| return _RequestTarget( | |
| proxy_key=proxy_key, | |
| group=group, | |
| account=next_account, | |
| context=context, | |
| page=page, | |
| session_id=None, | |
| full_history=True, | |
| ) | |
| # 4. 开新浏览器。 | |
| open_groups = { | |
| proxy_key.fingerprint_id | |
| for proxy_key in self._browser_manager.current_proxy_keys() | |
| } | |
| pair = self._pool.next_available_pair( | |
| type_name, | |
| exclude_fingerprint_ids=open_groups, | |
| ) | |
| if pair is None: | |
| raise ValueError(f"没有类别为 {type_name!r} 的可用账号,请稍后再试") | |
| group, account = pair | |
| proxy_key = _proxy_key_for_group(group) | |
| plugin = PluginRegistry.get(type_name) | |
| if plugin is None: | |
| raise ValueError(f"未注册的 type: {type_name}") | |
| await self._browser_manager.open_tab( | |
| proxy_key, | |
| group.proxy_pass, | |
| type_name, | |
| self._pool.account_id(group, account), | |
| plugin.create_page, | |
| self._make_apply_auth_fn(plugin, account), | |
| ) | |
| page = self._browser_manager.acquire_tab( | |
| proxy_key, | |
| type_name, | |
| self._tab_max_concurrent, | |
| ) | |
| if page is None: | |
| raise RuntimeError("新浏览器建 tab 后仍无法占用请求槽位") | |
| context = await self._browser_manager.ensure_browser( | |
| proxy_key, group.proxy_pass | |
| ) | |
| return _RequestTarget( | |
| proxy_key=proxy_key, | |
| group=group, | |
| account=account, | |
| context=context, | |
| page=page, | |
| session_id=None, | |
| full_history=True, | |
| ) | |
| async def _stream_completion( | |
| self, | |
| type_name: str, | |
| req: OpenAIChatRequest, | |
| ) -> AsyncIterator[str]: | |
| """ | |
| 内部实现:调度 + 插件 stream_completion 字符串流,末尾附加 session_id 零宽编码。 | |
| 对外仅通过 stream_openai_events() 暴露事件流。 | |
| """ | |
| plugin = PluginRegistry.get(type_name) | |
| if plugin is None: | |
| raise ValueError(f"未注册的 type: {type_name}") | |
| raw_messages = _request_messages_as_dicts(req) | |
| conv_uuid = req.resume_session_id or parse_conv_uuid_from_messages(raw_messages) | |
| # Fingerprint matching: when the client doesn't preserve the zero-width | |
| # session marker (conv_uuid is None), compute a fingerprint from | |
| # system prompt + first user message and look up the matching session. | |
| # This replaces sticky session and prevents context pollution. | |
| fingerprint = "" | |
| if not conv_uuid: | |
| fingerprint = compute_conversation_fingerprint(req.messages) | |
| if fingerprint: | |
| entry = self._conv_index.lookup(fingerprint) | |
| if entry is not None: | |
| conv_uuid = entry.session_id | |
| logger.info("[chat] type=%s parsed conv_uuid=%s fingerprint=%s", type_name, conv_uuid, fingerprint or "n/a") | |
| has_tools = bool(req.tools) | |
| react_prompt_prefix = format_react_prompt(req.tools or []) if has_tools else "" | |
| debug_path = ( | |
| Path(__file__).resolve().parent.parent.parent | |
| / "debug" | |
| / "chat_prompt_debug.json" | |
| ) | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| target: _RequestTarget | None = None | |
| active_session_id: str | None = None | |
| request_id = uuid.uuid4().hex | |
| try: | |
| async with self._schedule_lock: | |
| if conv_uuid: | |
| target = await self._reuse_session_target_locked( | |
| plugin, | |
| type_name, | |
| conv_uuid, | |
| ) | |
| if target is None: | |
| target = await self._allocate_new_target_locked(type_name) | |
| if target.session_id is not None: | |
| active_session_id = target.session_id | |
| content = extract_user_content( | |
| req.messages, | |
| has_tools=has_tools, | |
| react_prompt_prefix=react_prompt_prefix, | |
| full_history=target.full_history, | |
| ) | |
| if not content.strip() and req.attachment_files: | |
| content = "Please analyze the attached image." | |
| if not content.strip(): | |
| raise ValueError("messages 中需至少有一条带 content 的 user 消息") | |
| debug_path.parent.mkdir(parents=True, exist_ok=True) | |
| debug_path.write_text( | |
| json.dumps( | |
| { | |
| "prompt": content, | |
| "full_history": target.full_history, | |
| "type": type_name, | |
| }, | |
| ensure_ascii=False, | |
| indent=2, | |
| ), | |
| encoding="utf-8", | |
| ) | |
| account_id = self._pool.account_id(target.group, target.account) | |
| session_id = target.session_id | |
| if session_id is None: | |
| logger.info( | |
| "[chat] create_conversation type=%s proxy=%s account=%s", | |
| type_name, | |
| target.proxy_key.fingerprint_id, | |
| account_id, | |
| ) | |
| session_id = await plugin.create_conversation( | |
| target.context, | |
| target.page, | |
| timezone=target.group.timezone | |
| or getattr(target.proxy_key, "timezone", None) | |
| or TIMEZONE, | |
| public_model=str(getattr(req, "model", "") or ""), | |
| upstream_model=str(getattr(req, "upstream_model", "") or ""), | |
| request_id=request_id, | |
| ) | |
| if not session_id: | |
| raise RuntimeError("插件创建会话失败") | |
| async with self._schedule_lock: | |
| self._session_cache.put( | |
| session_id, | |
| target.proxy_key, | |
| type_name, | |
| account_id, | |
| ) | |
| self._browser_manager.register_session( | |
| target.proxy_key, | |
| type_name, | |
| session_id, | |
| ) | |
| self._busy_sessions.add(session_id) | |
| # Register fingerprint for future matching | |
| if fingerprint: | |
| self._conv_index.register( | |
| fingerprint, | |
| session_id, | |
| len(req.messages), | |
| account_id, | |
| ) | |
| active_session_id = session_id | |
| # Skip pre-stream probe for newly created sessions: | |
| # create_conversation already validated page health. | |
| if target.session_id is not None: | |
| await plugin.ensure_request_ready( | |
| target.context, | |
| target.page, | |
| request_id=request_id, | |
| session_id=session_id, | |
| phase="stream_completion", | |
| account_id=account_id, | |
| ) | |
| logger.info( | |
| "[chat] stream_completion type=%s session_id=%s proxy=%s account=%s full_history=%s", | |
| type_name, | |
| session_id, | |
| target.proxy_key.fingerprint_id, | |
| account_id, | |
| target.full_history, | |
| ) | |
| # 根据是否 full_history 选择附件来源: | |
| # - 复用会话(full_history=False):仅最后一条 user 的图片(可能为空,则本轮不带图) | |
| # - 新建/重建会话(full_history=True):所有历史 user 的图片 | |
| attachments = ( | |
| req.attachment_files_all_users | |
| if target.full_history | |
| else req.attachment_files_last_user | |
| ) | |
| proxy_url = None | |
| proxy_auth = None | |
| proxy_forwarder = None | |
| if plugin.stream_transport() == "context_request": | |
| proxy_url, proxy_auth, proxy_forwarder = self._stream_proxy_settings(target) | |
| target.proxy_url = proxy_url | |
| target.proxy_auth = proxy_auth | |
| target.proxy_forwarder = proxy_forwarder | |
| try: | |
| stream = cast( | |
| AsyncIterator[str], | |
| plugin.stream_completion( | |
| target.context, | |
| target.page, | |
| session_id, | |
| content, | |
| request_id=request_id, | |
| attachments=attachments, | |
| proxy_url=proxy_url, | |
| proxy_auth=proxy_auth, | |
| ), | |
| ) | |
| async for chunk in stream: | |
| yield chunk | |
| finally: | |
| if proxy_forwarder is not None: | |
| try: | |
| proxy_forwarder.stop() | |
| except Exception: | |
| pass | |
| target.proxy_forwarder = None | |
| yield session_id_suffix(session_id) | |
| return | |
| except AccountFrozenError as e: | |
| logger.warning( | |
| "账号限流/额度用尽(插件上报),切换资源重试: type=%s proxy=%s err=%s", | |
| type_name, | |
| target.proxy_key.fingerprint_id if target else None, | |
| e, | |
| ) | |
| async with self._schedule_lock: | |
| if target is not None: | |
| self.report_account_unfreeze( | |
| target.group.fingerprint_id, | |
| target.account.name, | |
| e.unfreeze_at, | |
| ) | |
| self._browser_manager.mark_tab_draining( | |
| target.proxy_key, | |
| type_name, | |
| frozen_until=e.unfreeze_at, | |
| ) | |
| self._invalidate_tab_sessions_locked( | |
| target.proxy_key, type_name | |
| ) | |
| if attempt == max_retries - 1: | |
| raise RuntimeError( | |
| f"已重试 {max_retries} 次仍限流/过载,请稍后再试: {e}" | |
| ) from e | |
| continue | |
| except BrowserResourceInvalidError as e: | |
| proxy_for_log = ( | |
| target.proxy_key.fingerprint_id | |
| if target is not None | |
| else getattr(getattr(e, "proxy_key", None), "fingerprint_id", None) | |
| ) | |
| logger.warning( | |
| "[chat] browser resource invalid bubbled type=%s request_id=%s proxy=%s session_id=%s helper=%s stage=%s resource=%s err=%s", | |
| type_name, | |
| request_id, | |
| proxy_for_log, | |
| active_session_id, | |
| e.helper_name, | |
| e.stage, | |
| e.resource_hint, | |
| e, | |
| ) | |
| async with self._schedule_lock: | |
| if target is not None: | |
| await self._recover_browser_resource_invalid_locked( | |
| type_name, | |
| target, | |
| request_id, | |
| active_session_id, | |
| e, | |
| attempt, | |
| max_retries, | |
| ) | |
| elif getattr(e, "proxy_key", None) is not None: | |
| closed = await self._browser_manager.close_browser(e.proxy_key) | |
| self._apply_closed_tabs_locked(closed) | |
| if attempt == max_retries - 1: | |
| raise RuntimeError( | |
| f"浏览器资源已失效,重试 {max_retries} 次后仍失败: {e}" | |
| ) from e | |
| continue | |
| finally: | |
| if target is not None: | |
| async with self._schedule_lock: | |
| if active_session_id is not None: | |
| self._busy_sessions.discard(active_session_id) | |
| self._browser_manager.release_tab(target.proxy_key, type_name) | |
| async def stream_openai_events( | |
| self, | |
| type_name: str, | |
| req: OpenAIChatRequest, | |
| ) -> AsyncIterator[OpenAIStreamEvent]: | |
| """ | |
| 唯一流式出口:以 OpenAIStreamEvent 为中间态。插件产出字符串流, | |
| 在此包装为 content_delta + finish,供协议适配层编码为各协议 SSE。 | |
| """ | |
| async for chunk in self._stream_completion(type_name, req): | |
| # session marker 也作为 content_delta 透传(对事件消费者而言是普通文本片段) | |
| yield OpenAIStreamEvent(type="content_delta", content=chunk) | |
| yield OpenAIStreamEvent(type="finish", finish_reason="stop") | |