""" 聊天请求编排:解析 session_id、调度 browser/tab/session、调用插件流式补全, 并在响应末尾附加零宽字符编码的会话 ID。 当前调度模型: - 一个浏览器对应一个代理组 - 一个浏览器内,一个 type 只有一个 tab - 一个 tab 绑定一个 account,只有 drained 后才能切号 - 一个 session 绑定到某个 tab/account;复用成功时不传完整历史 - 无法复用时,新建会话并回放完整历史 """ from __future__ import annotations import asyncio import json import logging import time import uuid from dataclasses import dataclass from pathlib import Path from typing import Any, AsyncIterator, cast from playwright.async_api import BrowserContext, Page from core.account.pool import AccountPool from core.config.repository import ConfigRepository from core.config.schema import AccountConfig, ProxyGroupConfig from core.config.settings import get from core.constants import TIMEZONE from core.plugin.base import ( AccountFrozenError, BaseSitePlugin, BrowserResourceInvalidError, PluginRegistry, ) from core.plugin.helpers import clear_cookies_for_domain from core.runtime.browser_manager import BrowserManager, ClosedTabInfo, TabRuntime from core.runtime.keys import ProxyKey from core.runtime.local_proxy_forwarder import LocalProxyForwarder, UpstreamProxy, parse_proxy_server from core.runtime.session_cache import SessionCache, SessionEntry from core.api.conv_parser import parse_conv_uuid_from_messages, session_id_suffix from core.api.fingerprint import compute_conversation_fingerprint from core.api.react import format_react_prompt from core.api.schemas import OpenAIChatRequest, extract_user_content from core.hub.schemas import OpenAIStreamEvent from core.runtime.conversation_index import ConversationIndex logger = logging.getLogger(__name__) def _request_messages_as_dicts(req: OpenAIChatRequest) -> list[dict[str, Any]]: """转为 conv_parser 需要的 list[dict]。""" out: list[dict[str, Any]] = [] for m in req.messages: d: dict[str, Any] = {"role": m.role} if isinstance(m.content, list): d["content"] = [p.model_dump() for p in m.content] else: d["content"] = m.content out.append(d) return out def _proxy_key_for_group(group: ProxyGroupConfig) -> ProxyKey: return ProxyKey( group.proxy_host, group.proxy_user, group.fingerprint_id, group.use_proxy, group.timezone or TIMEZONE, ) @dataclass class _RequestTarget: proxy_key: ProxyKey group: ProxyGroupConfig account: AccountConfig context: BrowserContext page: Page session_id: str | None full_history: bool proxy_url: str | None = None proxy_auth: tuple[str, str] | None = None proxy_forwarder: LocalProxyForwarder | None = None class ChatHandler: """编排一次 chat 请求:会话解析、tab 调度、插件调用。""" def __init__( self, pool: AccountPool, session_cache: SessionCache, browser_manager: BrowserManager, config_repo: ConfigRepository | None = None, ) -> None: self._pool = pool self._session_cache = session_cache self._browser_manager = browser_manager self._config_repo = config_repo self._conv_index = ConversationIndex() self._schedule_lock = asyncio.Lock() self._stop_event = asyncio.Event() self._busy_sessions: set[str] = set() self._tab_max_concurrent = int(get("scheduler", "tab_max_concurrent") or 5) self._gc_interval_seconds = float( get("scheduler", "browser_gc_interval_seconds") or 300 ) self._tab_idle_seconds = float(get("scheduler", "tab_idle_seconds") or 900) self._resident_browser_count = int( get("scheduler", "resident_browser_count", 1) ) def reload_pool( self, groups: list[ProxyGroupConfig], config_repo: ConfigRepository | None = None, ) -> None: """配置热更新后替换账号池与 repository。""" self._pool.reload(groups) if config_repo is not None: self._config_repo = config_repo async def refresh_configuration( self, groups: list[ProxyGroupConfig], config_repo: ConfigRepository | None = None, ) -> None: """配置热更新:替换账号池、清理失效资源,并重新预热常驻浏览器。""" async with self._schedule_lock: self.reload_pool(groups, config_repo) await self._prune_invalid_resources_locked() await self._reconcile_tabs_locked() await self.prewarm_resident_browsers() async def prewarm_resident_browsers(self) -> None: """启动时预热常驻浏览器,并为其下可用 type 建立 tab。""" async with self._schedule_lock: warmed = 0 for group in self._pool.groups(): if warmed >= self._resident_browser_count: break available_types = { a.type for a in group.accounts if a.is_available() and PluginRegistry.get(a.type) is not None } if not available_types: continue proxy_key = _proxy_key_for_group(group) await self._browser_manager.ensure_browser(proxy_key, group.proxy_pass) for type_name in sorted(available_types): if self._browser_manager.get_tab(proxy_key, type_name) is not None: continue account = self._pool.available_accounts_in_group(group, type_name) if not account: continue chosen = account[0] plugin = PluginRegistry.get(type_name) if plugin is None: continue await self._browser_manager.open_tab( proxy_key, group.proxy_pass, type_name, self._pool.account_id(group, chosen), plugin.create_page, self._make_apply_auth_fn(plugin, chosen), ) warmed += 1 async def run_maintenance_loop(self) -> None: """周期性回收空闲浏览器,并收尾 drained/frozen tab。""" while not self._stop_event.is_set(): try: await asyncio.wait_for( self._stop_event.wait(), timeout=self._gc_interval_seconds, ) break except asyncio.TimeoutError: pass try: async with self._schedule_lock: # Evict stale sessions to prevent unbounded accumulation. stale_ids = self._session_cache.evict_stale() # Evict stale fingerprint entries in sync. stale_fp = self._conv_index.evict_stale(ttl=1800.0) if stale_fp: logger.info( "[maintenance] evicted %d stale fingerprint entries", len(stale_fp), ) if stale_ids: for sid in stale_ids: plugin_type = None for pk, entry in self._browser_manager.list_browser_entries(): for tn, tab in entry.tabs.items(): if sid in tab.sessions: tab.sessions.discard(sid) plugin_type = tn break if plugin_type: plugin = PluginRegistry.get(plugin_type) if plugin is not None: plugin.drop_session(sid) logger.info( "[maintenance] evicted %d stale sessions, cache size=%d", len(stale_ids), len(self._session_cache), ) await self._reconcile_tabs_locked() closed = await self._browser_manager.collect_idle_browsers( idle_seconds=self._tab_idle_seconds, resident_browser_count=self._resident_browser_count, ) self._apply_closed_tabs_locked(closed) except Exception: logger.exception("维护循环执行失败") async def shutdown(self) -> None: """停止维护循环并关闭全部浏览器。""" self._stop_event.set() async with self._schedule_lock: closed = await self._browser_manager.close_all() self._apply_closed_tabs_locked(closed) def report_account_unfreeze( self, fingerprint_id: str, account_name: str, unfreeze_at: int, ) -> None: """记录账号解冻时间,并同步更新内存账号池。""" if self._config_repo is None: return self._config_repo.update_account_unfreeze_at( fingerprint_id, account_name, unfreeze_at ) self._pool.update_account_unfreeze_at( fingerprint_id, account_name, unfreeze_at, ) def get_account_runtime_status(self) -> dict[str, dict[str, Any]]: """返回当前账号运行时状态,供配置页展示角标。""" status: dict[str, dict[str, Any]] = {} for proxy_key, entry in self._browser_manager.list_browser_entries(): for type_name, tab in entry.tabs.items(): status[tab.account_id] = { "fingerprint_id": proxy_key.fingerprint_id, "type": type_name, "is_active": True, "tab_state": tab.state, "accepting_new": tab.accepting_new, "active_requests": tab.active_requests, "frozen_until": tab.frozen_until, } return status def _make_apply_auth_fn( self, plugin: Any, account: AccountConfig, ) -> Any: async def _apply_auth(context: BrowserContext, page: Page) -> None: await plugin.apply_auth(context, page, account.auth) return _apply_auth def _apply_closed_tabs_locked(self, closed_tabs: list[ClosedTabInfo]) -> None: for info in closed_tabs: self._session_cache.delete_many(info.session_ids) plugin = PluginRegistry.get(info.type_name) if plugin is not None: plugin.drop_sessions(info.session_ids) def _stream_proxy_settings( self, target: _RequestTarget, ) -> tuple[str | None, tuple[str, str] | None, LocalProxyForwarder | None]: if not target.proxy_key.use_proxy: return (None, None, None) upstream_host, upstream_port = parse_proxy_server(target.proxy_key.proxy_host) forwarder = LocalProxyForwarder( UpstreamProxy( host=upstream_host, port=upstream_port, username=target.proxy_key.proxy_user, password=target.group.proxy_pass, ), listen_host="127.0.0.1", listen_port=0, on_log=lambda msg: logger.debug("[stream-proxy] %s", msg), ) forwarder.start() return ( forwarder.proxy_url, None, forwarder, ) async def _clear_tab_domain_cookies_if_supported( self, proxy_key: ProxyKey, type_name: str ) -> None: """关 tab 前清该 type 对应域名的 cookie(仅支持带 site.cookie_domain 的插件)。""" entry = self._browser_manager.get_browser_entry(proxy_key) if entry is None: return plugin = PluginRegistry.get(type_name) if not isinstance(plugin, BaseSitePlugin) or not getattr(plugin, "site", None): return try: await clear_cookies_for_domain(entry.context, plugin.site.cookie_domain) except Exception as e: logger.debug("关 tab 前清 cookie 失败 type=%s: %s", type_name, e) async def _prune_invalid_resources_locked(self) -> None: """关闭配置中已不存在的浏览器/tab,避免热更新后继续使用失效资源。""" for proxy_key, entry in list(self._browser_manager.list_browser_entries()): group = self._pool.get_group_by_proxy_key(proxy_key) if group is None: self._apply_closed_tabs_locked( await self._browser_manager.close_browser(proxy_key) ) continue for type_name in list(entry.tabs.keys()): tab = entry.tabs[type_name] pair = self._pool.get_account_by_id(tab.account_id) if ( pair is None or pair[0] is not group or pair[1].type != type_name or not pair[1].enabled ): self._invalidate_tab_sessions_locked(proxy_key, type_name) if tab.active_requests == 0: # 与 reconcile 一致:优先同组同一页 re-auth,失败或无可用账号再关 tab switched = False group = self._pool.get_group_by_proxy_key(proxy_key) if group is not None: next_account = self._pool.next_available_account_in_group( group, type_name, exclude_account_ids={tab.account_id}, ) if next_account is not None: plugin = PluginRegistry.get(type_name) if plugin is not None: switched = ( await self._browser_manager.switch_tab_account( proxy_key, type_name, self._pool.account_id(group, next_account), self._make_apply_auth_fn( plugin, next_account, ), ) ) if not switched: await self._clear_tab_domain_cookies_if_supported( proxy_key, type_name ) closed = await self._browser_manager.close_tab( proxy_key, type_name ) if closed is not None: self._apply_closed_tabs_locked([closed]) else: self._browser_manager.mark_tab_draining(proxy_key, type_name) def _invalidate_session_locked( self, session_id: str, entry: SessionEntry | None = None, ) -> None: entry = entry or self._session_cache.get(session_id) if entry is None: return self._session_cache.delete(session_id) self._conv_index.remove_session(session_id) self._browser_manager.unregister_session( entry.proxy_key, entry.type_name, session_id, ) plugin = PluginRegistry.get(entry.type_name) if plugin is not None: plugin.drop_session(session_id) def _invalidate_tab_sessions_locked( self, proxy_key: ProxyKey, type_name: str, ) -> None: tab = self._browser_manager.get_tab(proxy_key, type_name) if tab is None or not tab.sessions: return session_ids = list(tab.sessions) self._session_cache.delete_many(session_ids) plugin = PluginRegistry.get(type_name) if plugin is not None: plugin.drop_sessions(session_ids) tab.sessions.clear() async def _recover_browser_resource_invalid_locked( self, type_name: str, target: _RequestTarget, request_id: str, active_session_id: str | None, error: BrowserResourceInvalidError, attempt: int, max_retries: int, ) -> None: account_id = self._pool.account_id(target.group, target.account) diagnostics = self._browser_manager.browser_diagnostics(target.proxy_key) logger.warning( "[chat] browser resource invalid attempt=%s/%s type=%s proxy=%s account=%s session_id=%s request_id=%s resource=%s helper=%s stage=%s stream_phase=%s browser_present=%s proc_alive=%s cdp_listening=%s tab_count=%s active_requests=%s err=%s", attempt + 1, max_retries, type_name, target.proxy_key.fingerprint_id, account_id, active_session_id, request_id, error.resource_hint, error.helper_name, error.stage, error.stream_phase, diagnostics.get("browser_present"), diagnostics.get("proc_alive"), diagnostics.get("cdp_listening"), diagnostics.get("tab_count"), diagnostics.get("active_requests"), error, ) stderr_tail = str(diagnostics.get("stderr_tail") or "").strip() if stderr_tail: logger.warning( "[chat] browser resource invalid stderr tail proxy=%s request_id=%s:\n%s", target.proxy_key.fingerprint_id, request_id, stderr_tail, ) if active_session_id is not None: self._invalidate_session_locked(active_session_id) if error.resource_hint == "transport": logger.warning( "[chat] transport-level stream failure, keep tab/browser and retry proxy=%s request_id=%s", target.proxy_key.fingerprint_id, request_id, ) return self._browser_manager.mark_tab_draining(target.proxy_key, type_name) browser_restart_reason: str | None = None if error.resource_hint == "browser": browser_restart_reason = "resource_hint" # Legacy: page_fetch transport is no longer used by Claude (context_request since v0.x). # Kept for potential future plugins that still use page_fetch transport. elif ( error.helper_name == "stream_raw_via_page_fetch" and error.stage in {"read_timeout", "evaluate_timeout"} ): browser_restart_reason = f"{error.helper_name}:{error.stage}" if browser_restart_reason is not None: logger.warning( "[chat] escalating browser recovery to full restart proxy=%s request_id=%s reason=%s", target.proxy_key.fingerprint_id, request_id, browser_restart_reason, ) closed = await self._browser_manager.close_browser(target.proxy_key) self._apply_closed_tabs_locked(closed) return self._invalidate_tab_sessions_locked(target.proxy_key, type_name) closed = await self._browser_manager.close_tab(target.proxy_key, type_name) if closed is not None: self._apply_closed_tabs_locked([closed]) def _revive_tab_if_possible_locked( self, proxy_key: ProxyKey, type_name: str, ) -> bool: tab = self._browser_manager.get_tab(proxy_key, type_name) if tab is None or tab.active_requests != 0: return False if tab.accepting_new: return True pair = self._pool.get_account_by_id(tab.account_id) if pair is None: return False _, account = pair if not account.is_available(): return False tab.accepting_new = True tab.state = "ready" tab.frozen_until = None tab.last_used_at = time.time() return True async def _reconcile_tabs_locked(self) -> None: """ 收尾所有 non-ready tab: - 若原账号已恢复可用,则恢复 tab - 否则若同组有其他可用账号,则在 drained 后切号 - 否则关闭 tab """ for proxy_key, entry in list(self._browser_manager.list_browser_entries()): for type_name in list(entry.tabs.keys()): tab = entry.tabs[type_name] if tab.accepting_new: continue if tab.active_requests != 0: continue if self._revive_tab_if_possible_locked(proxy_key, type_name): continue group = self._pool.get_group_by_proxy_key(proxy_key) if group is None: await self._clear_tab_domain_cookies_if_supported( proxy_key, type_name ) closed = await self._browser_manager.close_tab(proxy_key, type_name) if closed is not None: self._apply_closed_tabs_locked([closed]) continue next_account = self._pool.next_available_account_in_group( group, type_name, exclude_account_ids={tab.account_id}, ) if next_account is not None: plugin = PluginRegistry.get(type_name) if plugin is None: continue self._invalidate_tab_sessions_locked(proxy_key, type_name) switched = await self._browser_manager.switch_tab_account( proxy_key, type_name, self._pool.account_id(group, next_account), self._make_apply_auth_fn(plugin, next_account), ) if switched: continue await self._clear_tab_domain_cookies_if_supported(proxy_key, type_name) closed = await self._browser_manager.close_tab(proxy_key, type_name) if closed is not None: self._apply_closed_tabs_locked([closed]) async def _reuse_session_target_locked( self, plugin: Any, type_name: str, session_id: str, ) -> _RequestTarget | None: entry = self._session_cache.get(session_id) if entry is None or entry.type_name != type_name: return None pair = self._pool.get_account_by_id(entry.account_id) if pair is None: self._invalidate_session_locked(session_id, entry) return None group, account = pair tab = self._browser_manager.get_tab(entry.proxy_key, type_name) if ( tab is None or tab.account_id != entry.account_id or not plugin.has_session(session_id) ): self._invalidate_session_locked(session_id, entry) return None if not tab.accepting_new: self._invalidate_session_locked(session_id, entry) return None if session_id in self._busy_sessions: raise RuntimeError("当前会话正在处理中,请稍后再试") if tab.active_requests >= self._tab_max_concurrent: raise RuntimeError("当前会话所在 tab 繁忙,请稍后再试") page = self._browser_manager.acquire_tab( entry.proxy_key, type_name, self._tab_max_concurrent, ) if page is None: raise RuntimeError("当前会话暂不可复用,请稍后再试") self._session_cache.touch(session_id) self._busy_sessions.add(session_id) context = await self._browser_manager.ensure_browser( entry.proxy_key, group.proxy_pass, ) return _RequestTarget( proxy_key=entry.proxy_key, group=group, account=account, context=context, page=page, session_id=session_id, full_history=False, ) async def _allocate_new_target_locked( self, type_name: str, ) -> _RequestTarget: # 1. 已打开浏览器里已有该 type 的可服务 tab,直接复用。 existing_tabs: list[tuple[int, float, ProxyKey, TabRuntime]] = [] for proxy_key, entry in self._browser_manager.list_browser_entries(): tab = entry.tabs.get(type_name) if ( tab is not None and tab.accepting_new and tab.active_requests < self._tab_max_concurrent ): existing_tabs.append( (tab.active_requests, tab.last_used_at, proxy_key, tab) ) if existing_tabs: _, _, proxy_key, tab = min(existing_tabs, key=lambda item: item[:2]) pair = self._pool.get_account_by_id(tab.account_id) if pair is None: self._invalidate_tab_sessions_locked(proxy_key, type_name) closed = await self._browser_manager.close_tab(proxy_key, type_name) if closed is not None: self._apply_closed_tabs_locked([closed]) else: group, account = pair page = self._browser_manager.acquire_tab( proxy_key, type_name, self._tab_max_concurrent, ) if page is not None: context = await self._browser_manager.ensure_browser( proxy_key, group.proxy_pass, ) return _RequestTarget( proxy_key=proxy_key, group=group, account=account, context=context, page=page, session_id=None, full_history=True, ) # 2. 已打开浏览器里还没有该 type tab,但该组有可用账号,直接建新 tab。 open_browser_candidates: list[ tuple[int, float, ProxyKey, ProxyGroupConfig] ] = [] for proxy_key, entry in self._browser_manager.list_browser_entries(): if type_name in entry.tabs: continue group = self._pool.get_group_by_proxy_key(proxy_key) if group is None: continue if not self._pool.has_available_account_in_group(group, type_name): continue open_browser_candidates.append( ( self._browser_manager.browser_load(proxy_key), entry.last_used_at, proxy_key, group, ) ) if open_browser_candidates: _, _, proxy_key, group = min( open_browser_candidates, key=lambda item: item[:2] ) account = self._pool.next_available_account_in_group(group, type_name) if account is not None: plugin = PluginRegistry.get(type_name) if plugin is None: raise ValueError(f"未注册的 type: {type_name}") await self._browser_manager.open_tab( proxy_key, group.proxy_pass, type_name, self._pool.account_id(group, account), plugin.create_page, self._make_apply_auth_fn(plugin, account), ) page = self._browser_manager.acquire_tab( proxy_key, type_name, self._tab_max_concurrent, ) if page is None: raise RuntimeError("新建 tab 后仍无法占用请求槽位") context = await self._browser_manager.ensure_browser( proxy_key, group.proxy_pass, ) return _RequestTarget( proxy_key=proxy_key, group=group, account=account, context=context, page=page, session_id=None, full_history=True, ) # 3. 已打开浏览器里该 type tab 已 drained,且同组有备用账号,可在当前 tab 切号。 switch_candidates: list[tuple[float, ProxyKey, ProxyGroupConfig]] = [] for proxy_key, entry in self._browser_manager.list_browser_entries(): tab = entry.tabs.get(type_name) if tab is None or tab.active_requests != 0: continue group = self._pool.get_group_by_proxy_key(proxy_key) if group is None: continue if not self._pool.has_available_account_in_group( group, type_name, exclude_account_ids={tab.account_id}, ): continue switch_candidates.append((tab.last_used_at, proxy_key, group)) if switch_candidates: _, proxy_key, group = min(switch_candidates, key=lambda item: item[0]) tab = self._browser_manager.get_tab(proxy_key, type_name) plugin = PluginRegistry.get(type_name) if tab is not None and plugin is not None: next_account = self._pool.next_available_account_in_group( group, type_name, exclude_account_ids={tab.account_id}, ) if next_account is not None: self._invalidate_tab_sessions_locked(proxy_key, type_name) switched = await self._browser_manager.switch_tab_account( proxy_key, type_name, self._pool.account_id(group, next_account), self._make_apply_auth_fn(plugin, next_account), ) if switched: page = self._browser_manager.acquire_tab( proxy_key, type_name, self._tab_max_concurrent, ) if page is None: raise RuntimeError("切号后仍无法占用请求槽位") context = await self._browser_manager.ensure_browser( proxy_key, group.proxy_pass, ) return _RequestTarget( proxy_key=proxy_key, group=group, account=next_account, context=context, page=page, session_id=None, full_history=True, ) # 4. 开新浏览器。 open_groups = { proxy_key.fingerprint_id for proxy_key in self._browser_manager.current_proxy_keys() } pair = self._pool.next_available_pair( type_name, exclude_fingerprint_ids=open_groups, ) if pair is None: raise ValueError(f"没有类别为 {type_name!r} 的可用账号,请稍后再试") group, account = pair proxy_key = _proxy_key_for_group(group) plugin = PluginRegistry.get(type_name) if plugin is None: raise ValueError(f"未注册的 type: {type_name}") await self._browser_manager.open_tab( proxy_key, group.proxy_pass, type_name, self._pool.account_id(group, account), plugin.create_page, self._make_apply_auth_fn(plugin, account), ) page = self._browser_manager.acquire_tab( proxy_key, type_name, self._tab_max_concurrent, ) if page is None: raise RuntimeError("新浏览器建 tab 后仍无法占用请求槽位") context = await self._browser_manager.ensure_browser( proxy_key, group.proxy_pass ) return _RequestTarget( proxy_key=proxy_key, group=group, account=account, context=context, page=page, session_id=None, full_history=True, ) async def _stream_completion( self, type_name: str, req: OpenAIChatRequest, ) -> AsyncIterator[str]: """ 内部实现:调度 + 插件 stream_completion 字符串流,末尾附加 session_id 零宽编码。 对外仅通过 stream_openai_events() 暴露事件流。 """ plugin = PluginRegistry.get(type_name) if plugin is None: raise ValueError(f"未注册的 type: {type_name}") raw_messages = _request_messages_as_dicts(req) conv_uuid = req.resume_session_id or parse_conv_uuid_from_messages(raw_messages) # Fingerprint matching: when the client doesn't preserve the zero-width # session marker (conv_uuid is None), compute a fingerprint from # system prompt + first user message and look up the matching session. # This replaces sticky session and prevents context pollution. fingerprint = "" if not conv_uuid: fingerprint = compute_conversation_fingerprint(req.messages) if fingerprint: entry = self._conv_index.lookup(fingerprint) if entry is not None: conv_uuid = entry.session_id logger.info("[chat] type=%s parsed conv_uuid=%s fingerprint=%s", type_name, conv_uuid, fingerprint or "n/a") has_tools = bool(req.tools) react_prompt_prefix = format_react_prompt(req.tools or []) if has_tools else "" debug_path = ( Path(__file__).resolve().parent.parent.parent / "debug" / "chat_prompt_debug.json" ) max_retries = 3 for attempt in range(max_retries): target: _RequestTarget | None = None active_session_id: str | None = None request_id = uuid.uuid4().hex try: async with self._schedule_lock: if conv_uuid: target = await self._reuse_session_target_locked( plugin, type_name, conv_uuid, ) if target is None: target = await self._allocate_new_target_locked(type_name) if target.session_id is not None: active_session_id = target.session_id content = extract_user_content( req.messages, has_tools=has_tools, react_prompt_prefix=react_prompt_prefix, full_history=target.full_history, ) if not content.strip() and req.attachment_files: content = "Please analyze the attached image." if not content.strip(): raise ValueError("messages 中需至少有一条带 content 的 user 消息") debug_path.parent.mkdir(parents=True, exist_ok=True) debug_path.write_text( json.dumps( { "prompt": content, "full_history": target.full_history, "type": type_name, }, ensure_ascii=False, indent=2, ), encoding="utf-8", ) account_id = self._pool.account_id(target.group, target.account) session_id = target.session_id if session_id is None: await plugin.ensure_request_ready( target.context, target.page, request_id=request_id, session_id=None, phase="create_conversation", account_id=account_id, ) logger.info( "[chat] create_conversation type=%s proxy=%s account=%s", type_name, target.proxy_key.fingerprint_id, account_id, ) session_id = await plugin.create_conversation( target.context, target.page, timezone=target.group.timezone or getattr(target.proxy_key, "timezone", None) or TIMEZONE, public_model=str(getattr(req, "model", "") or ""), upstream_model=str(getattr(req, "upstream_model", "") or ""), request_id=request_id, ) if not session_id: raise RuntimeError("插件创建会话失败") async with self._schedule_lock: self._session_cache.put( session_id, target.proxy_key, type_name, account_id, ) self._browser_manager.register_session( target.proxy_key, type_name, session_id, ) self._busy_sessions.add(session_id) # Register fingerprint for future matching if fingerprint: self._conv_index.register( fingerprint, session_id, len(req.messages), account_id, ) active_session_id = session_id # Skip pre-stream probe for newly created sessions: # create_conversation already validated page health. if target.session_id is not None: await plugin.ensure_request_ready( target.context, target.page, request_id=request_id, session_id=session_id, phase="stream_completion", account_id=account_id, ) logger.info( "[chat] stream_completion type=%s session_id=%s proxy=%s account=%s full_history=%s", type_name, session_id, target.proxy_key.fingerprint_id, account_id, target.full_history, ) # 根据是否 full_history 选择附件来源: # - 复用会话(full_history=False):仅最后一条 user 的图片(可能为空,则本轮不带图) # - 新建/重建会话(full_history=True):所有历史 user 的图片 attachments = ( req.attachment_files_all_users if target.full_history else req.attachment_files_last_user ) proxy_url = None proxy_auth = None proxy_forwarder = None if plugin.stream_transport() == "context_request": proxy_url, proxy_auth, proxy_forwarder = self._stream_proxy_settings(target) target.proxy_url = proxy_url target.proxy_auth = proxy_auth target.proxy_forwarder = proxy_forwarder try: stream = cast( AsyncIterator[str], plugin.stream_completion( target.context, target.page, session_id, content, request_id=request_id, attachments=attachments, proxy_url=proxy_url, proxy_auth=proxy_auth, ), ) async for chunk in stream: yield chunk finally: if proxy_forwarder is not None: try: proxy_forwarder.stop() except Exception: pass target.proxy_forwarder = None yield session_id_suffix(session_id) return except AccountFrozenError as e: logger.warning( "账号限流/额度用尽(插件上报),切换资源重试: type=%s proxy=%s err=%s", type_name, target.proxy_key.fingerprint_id if target else None, e, ) async with self._schedule_lock: if target is not None: self.report_account_unfreeze( target.group.fingerprint_id, target.account.name, e.unfreeze_at, ) self._browser_manager.mark_tab_draining( target.proxy_key, type_name, frozen_until=e.unfreeze_at, ) self._invalidate_tab_sessions_locked( target.proxy_key, type_name ) if attempt == max_retries - 1: raise RuntimeError( f"已重试 {max_retries} 次仍限流/过载,请稍后再试: {e}" ) from e continue except BrowserResourceInvalidError as e: proxy_for_log = ( target.proxy_key.fingerprint_id if target is not None else getattr(getattr(e, "proxy_key", None), "fingerprint_id", None) ) logger.warning( "[chat] browser resource invalid bubbled type=%s request_id=%s proxy=%s session_id=%s helper=%s stage=%s resource=%s err=%s", type_name, request_id, proxy_for_log, active_session_id, e.helper_name, e.stage, e.resource_hint, e, ) async with self._schedule_lock: if target is not None: await self._recover_browser_resource_invalid_locked( type_name, target, request_id, active_session_id, e, attempt, max_retries, ) elif getattr(e, "proxy_key", None) is not None: closed = await self._browser_manager.close_browser(e.proxy_key) self._apply_closed_tabs_locked(closed) if attempt == max_retries - 1: raise RuntimeError( f"浏览器资源已失效,重试 {max_retries} 次后仍失败: {e}" ) from e continue finally: if target is not None: async with self._schedule_lock: if active_session_id is not None: self._busy_sessions.discard(active_session_id) self._browser_manager.release_tab(target.proxy_key, type_name) async def stream_openai_events( self, type_name: str, req: OpenAIChatRequest, ) -> AsyncIterator[OpenAIStreamEvent]: """ 唯一流式出口:以 OpenAIStreamEvent 为中间态。插件产出字符串流, 在此包装为 content_delta + finish,供协议适配层编码为各协议 SSE。 """ async for chunk in self._stream_completion(type_name, req): # session marker 也作为 content_delta 透传(对事件消费者而言是普通文本片段) yield OpenAIStreamEvent(type="content_delta", content=chunk) yield OpenAIStreamEvent(type="finish", finish_reason="stop")