File size: 46,085 Bytes
77169b4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 | """
聊天请求编排:解析 session_id、调度 browser/tab/session、调用插件流式补全,
并在响应末尾附加零宽字符编码的会话 ID。
当前调度模型:
- 一个浏览器对应一个代理组
- 一个浏览器内,一个 type 只有一个 tab
- 一个 tab 绑定一个 account,只有 drained 后才能切号
- 一个 session 绑定到某个 tab/account;复用成功时不传完整历史
- 无法复用时,新建会话并回放完整历史
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, AsyncIterator, cast
from playwright.async_api import BrowserContext, Page
from core.account.pool import AccountPool
from core.config.repository import ConfigRepository
from core.config.schema import AccountConfig, ProxyGroupConfig
from core.config.settings import get
from core.constants import TIMEZONE
from core.plugin.base import (
AccountFrozenError,
BaseSitePlugin,
BrowserResourceInvalidError,
PluginRegistry,
)
from core.plugin.helpers import clear_cookies_for_domain
from core.runtime.browser_manager import BrowserManager, ClosedTabInfo, TabRuntime
from core.runtime.keys import ProxyKey
from core.runtime.local_proxy_forwarder import LocalProxyForwarder, UpstreamProxy, parse_proxy_server
from core.runtime.session_cache import SessionCache, SessionEntry
from core.api.conv_parser import parse_conv_uuid_from_messages, session_id_suffix
from core.api.fingerprint import compute_conversation_fingerprint
from core.api.react import format_react_prompt
from core.api.schemas import OpenAIChatRequest, extract_user_content
from core.hub.schemas import OpenAIStreamEvent
from core.runtime.conversation_index import ConversationIndex
logger = logging.getLogger(__name__)
def _request_messages_as_dicts(req: OpenAIChatRequest) -> list[dict[str, Any]]:
"""转为 conv_parser 需要的 list[dict]。"""
out: list[dict[str, Any]] = []
for m in req.messages:
d: dict[str, Any] = {"role": m.role}
if isinstance(m.content, list):
d["content"] = [p.model_dump() for p in m.content]
else:
d["content"] = m.content
out.append(d)
return out
def _proxy_key_for_group(group: ProxyGroupConfig) -> ProxyKey:
return ProxyKey(
group.proxy_host,
group.proxy_user,
group.fingerprint_id,
group.use_proxy,
group.timezone or TIMEZONE,
)
@dataclass
class _RequestTarget:
proxy_key: ProxyKey
group: ProxyGroupConfig
account: AccountConfig
context: BrowserContext
page: Page
session_id: str | None
full_history: bool
proxy_url: str | None = None
proxy_auth: tuple[str, str] | None = None
proxy_forwarder: LocalProxyForwarder | None = None
class ChatHandler:
"""编排一次 chat 请求:会话解析、tab 调度、插件调用。"""
def __init__(
self,
pool: AccountPool,
session_cache: SessionCache,
browser_manager: BrowserManager,
config_repo: ConfigRepository | None = None,
) -> None:
self._pool = pool
self._session_cache = session_cache
self._browser_manager = browser_manager
self._config_repo = config_repo
self._conv_index = ConversationIndex()
self._schedule_lock = asyncio.Lock()
self._stop_event = asyncio.Event()
self._busy_sessions: set[str] = set()
self._tab_max_concurrent = int(get("scheduler", "tab_max_concurrent") or 5)
self._gc_interval_seconds = float(
get("scheduler", "browser_gc_interval_seconds") or 300
)
self._tab_idle_seconds = float(get("scheduler", "tab_idle_seconds") or 900)
self._resident_browser_count = int(
get("scheduler", "resident_browser_count", 1)
)
def reload_pool(
self,
groups: list[ProxyGroupConfig],
config_repo: ConfigRepository | None = None,
) -> None:
"""配置热更新后替换账号池与 repository。"""
self._pool.reload(groups)
if config_repo is not None:
self._config_repo = config_repo
async def refresh_configuration(
self,
groups: list[ProxyGroupConfig],
config_repo: ConfigRepository | None = None,
) -> None:
"""配置热更新:替换账号池、清理失效资源,并重新预热常驻浏览器。"""
async with self._schedule_lock:
self.reload_pool(groups, config_repo)
await self._prune_invalid_resources_locked()
await self._reconcile_tabs_locked()
await self.prewarm_resident_browsers()
async def prewarm_resident_browsers(self) -> None:
"""启动时预热常驻浏览器,并为其下可用 type 建立 tab。"""
async with self._schedule_lock:
warmed = 0
for group in self._pool.groups():
if warmed >= self._resident_browser_count:
break
available_types = {
a.type
for a in group.accounts
if a.is_available() and PluginRegistry.get(a.type) is not None
}
if not available_types:
continue
proxy_key = _proxy_key_for_group(group)
await self._browser_manager.ensure_browser(proxy_key, group.proxy_pass)
for type_name in sorted(available_types):
if self._browser_manager.get_tab(proxy_key, type_name) is not None:
continue
account = self._pool.available_accounts_in_group(group, type_name)
if not account:
continue
chosen = account[0]
plugin = PluginRegistry.get(type_name)
if plugin is None:
continue
await self._browser_manager.open_tab(
proxy_key,
group.proxy_pass,
type_name,
self._pool.account_id(group, chosen),
plugin.create_page,
self._make_apply_auth_fn(plugin, chosen),
)
warmed += 1
async def run_maintenance_loop(self) -> None:
"""周期性回收空闲浏览器,并收尾 drained/frozen tab。"""
while not self._stop_event.is_set():
try:
await asyncio.wait_for(
self._stop_event.wait(),
timeout=self._gc_interval_seconds,
)
break
except asyncio.TimeoutError:
pass
try:
async with self._schedule_lock:
# Evict stale sessions to prevent unbounded accumulation.
stale_ids = self._session_cache.evict_stale()
# Evict stale fingerprint entries in sync.
stale_fp = self._conv_index.evict_stale(ttl=1800.0)
if stale_fp:
logger.info(
"[maintenance] evicted %d stale fingerprint entries",
len(stale_fp),
)
if stale_ids:
for sid in stale_ids:
plugin_type = None
for pk, entry in self._browser_manager.list_browser_entries():
for tn, tab in entry.tabs.items():
if sid in tab.sessions:
tab.sessions.discard(sid)
plugin_type = tn
break
if plugin_type:
plugin = PluginRegistry.get(plugin_type)
if plugin is not None:
plugin.drop_session(sid)
logger.info(
"[maintenance] evicted %d stale sessions, cache size=%d",
len(stale_ids),
len(self._session_cache),
)
await self._reconcile_tabs_locked()
closed = await self._browser_manager.collect_idle_browsers(
idle_seconds=self._tab_idle_seconds,
resident_browser_count=self._resident_browser_count,
)
self._apply_closed_tabs_locked(closed)
except Exception:
logger.exception("维护循环执行失败")
async def shutdown(self) -> None:
"""停止维护循环并关闭全部浏览器。"""
self._stop_event.set()
async with self._schedule_lock:
closed = await self._browser_manager.close_all()
self._apply_closed_tabs_locked(closed)
def report_account_unfreeze(
self,
fingerprint_id: str,
account_name: str,
unfreeze_at: int,
) -> None:
"""记录账号解冻时间,并同步更新内存账号池。"""
if self._config_repo is None:
return
self._config_repo.update_account_unfreeze_at(
fingerprint_id, account_name, unfreeze_at
)
self._pool.update_account_unfreeze_at(
fingerprint_id,
account_name,
unfreeze_at,
)
def get_account_runtime_status(self) -> dict[str, dict[str, Any]]:
"""返回当前账号运行时状态,供配置页展示角标。"""
status: dict[str, dict[str, Any]] = {}
for proxy_key, entry in self._browser_manager.list_browser_entries():
for type_name, tab in entry.tabs.items():
status[tab.account_id] = {
"fingerprint_id": proxy_key.fingerprint_id,
"type": type_name,
"is_active": True,
"tab_state": tab.state,
"accepting_new": tab.accepting_new,
"active_requests": tab.active_requests,
"frozen_until": tab.frozen_until,
}
return status
def _make_apply_auth_fn(
self,
plugin: Any,
account: AccountConfig,
) -> Any:
async def _apply_auth(context: BrowserContext, page: Page) -> None:
await plugin.apply_auth(context, page, account.auth)
return _apply_auth
def _apply_closed_tabs_locked(self, closed_tabs: list[ClosedTabInfo]) -> None:
for info in closed_tabs:
self._session_cache.delete_many(info.session_ids)
plugin = PluginRegistry.get(info.type_name)
if plugin is not None:
plugin.drop_sessions(info.session_ids)
def _stream_proxy_settings(
self,
target: _RequestTarget,
) -> tuple[str | None, tuple[str, str] | None, LocalProxyForwarder | None]:
if not target.proxy_key.use_proxy:
return (None, None, None)
upstream_host, upstream_port = parse_proxy_server(target.proxy_key.proxy_host)
forwarder = LocalProxyForwarder(
UpstreamProxy(
host=upstream_host,
port=upstream_port,
username=target.proxy_key.proxy_user,
password=target.group.proxy_pass,
),
listen_host="127.0.0.1",
listen_port=0,
on_log=lambda msg: logger.debug("[stream-proxy] %s", msg),
)
forwarder.start()
return (
forwarder.proxy_url,
None,
forwarder,
)
async def _clear_tab_domain_cookies_if_supported(
self, proxy_key: ProxyKey, type_name: str
) -> None:
"""关 tab 前清该 type 对应域名的 cookie(仅支持带 site.cookie_domain 的插件)。"""
entry = self._browser_manager.get_browser_entry(proxy_key)
if entry is None:
return
plugin = PluginRegistry.get(type_name)
if not isinstance(plugin, BaseSitePlugin) or not getattr(plugin, "site", None):
return
try:
await clear_cookies_for_domain(entry.context, plugin.site.cookie_domain)
except Exception as e:
logger.debug("关 tab 前清 cookie 失败 type=%s: %s", type_name, e)
async def _prune_invalid_resources_locked(self) -> None:
"""关闭配置中已不存在的浏览器/tab,避免热更新后继续使用失效资源。"""
for proxy_key, entry in list(self._browser_manager.list_browser_entries()):
group = self._pool.get_group_by_proxy_key(proxy_key)
if group is None:
self._apply_closed_tabs_locked(
await self._browser_manager.close_browser(proxy_key)
)
continue
for type_name in list(entry.tabs.keys()):
tab = entry.tabs[type_name]
pair = self._pool.get_account_by_id(tab.account_id)
if (
pair is None
or pair[0] is not group
or pair[1].type != type_name
or not pair[1].enabled
):
self._invalidate_tab_sessions_locked(proxy_key, type_name)
if tab.active_requests == 0:
# 与 reconcile 一致:优先同组同一页 re-auth,失败或无可用账号再关 tab
switched = False
group = self._pool.get_group_by_proxy_key(proxy_key)
if group is not None:
next_account = self._pool.next_available_account_in_group(
group,
type_name,
exclude_account_ids={tab.account_id},
)
if next_account is not None:
plugin = PluginRegistry.get(type_name)
if plugin is not None:
switched = (
await self._browser_manager.switch_tab_account(
proxy_key,
type_name,
self._pool.account_id(group, next_account),
self._make_apply_auth_fn(
plugin,
next_account,
),
)
)
if not switched:
await self._clear_tab_domain_cookies_if_supported(
proxy_key, type_name
)
closed = await self._browser_manager.close_tab(
proxy_key, type_name
)
if closed is not None:
self._apply_closed_tabs_locked([closed])
else:
self._browser_manager.mark_tab_draining(proxy_key, type_name)
def _invalidate_session_locked(
self,
session_id: str,
entry: SessionEntry | None = None,
) -> None:
entry = entry or self._session_cache.get(session_id)
if entry is None:
return
self._session_cache.delete(session_id)
self._conv_index.remove_session(session_id)
self._browser_manager.unregister_session(
entry.proxy_key,
entry.type_name,
session_id,
)
plugin = PluginRegistry.get(entry.type_name)
if plugin is not None:
plugin.drop_session(session_id)
def _invalidate_tab_sessions_locked(
self,
proxy_key: ProxyKey,
type_name: str,
) -> None:
tab = self._browser_manager.get_tab(proxy_key, type_name)
if tab is None or not tab.sessions:
return
session_ids = list(tab.sessions)
self._session_cache.delete_many(session_ids)
plugin = PluginRegistry.get(type_name)
if plugin is not None:
plugin.drop_sessions(session_ids)
tab.sessions.clear()
async def _recover_browser_resource_invalid_locked(
self,
type_name: str,
target: _RequestTarget,
request_id: str,
active_session_id: str | None,
error: BrowserResourceInvalidError,
attempt: int,
max_retries: int,
) -> None:
account_id = self._pool.account_id(target.group, target.account)
diagnostics = self._browser_manager.browser_diagnostics(target.proxy_key)
logger.warning(
"[chat] browser resource invalid attempt=%s/%s type=%s proxy=%s account=%s session_id=%s request_id=%s resource=%s helper=%s stage=%s stream_phase=%s browser_present=%s proc_alive=%s cdp_listening=%s tab_count=%s active_requests=%s err=%s",
attempt + 1,
max_retries,
type_name,
target.proxy_key.fingerprint_id,
account_id,
active_session_id,
request_id,
error.resource_hint,
error.helper_name,
error.stage,
error.stream_phase,
diagnostics.get("browser_present"),
diagnostics.get("proc_alive"),
diagnostics.get("cdp_listening"),
diagnostics.get("tab_count"),
diagnostics.get("active_requests"),
error,
)
stderr_tail = str(diagnostics.get("stderr_tail") or "").strip()
if stderr_tail:
logger.warning(
"[chat] browser resource invalid stderr tail proxy=%s request_id=%s:\n%s",
target.proxy_key.fingerprint_id,
request_id,
stderr_tail,
)
if active_session_id is not None:
self._invalidate_session_locked(active_session_id)
if error.resource_hint == "transport":
logger.warning(
"[chat] transport-level stream failure, keep tab/browser and retry proxy=%s request_id=%s",
target.proxy_key.fingerprint_id,
request_id,
)
return
self._browser_manager.mark_tab_draining(target.proxy_key, type_name)
browser_restart_reason: str | None = None
if error.resource_hint == "browser":
browser_restart_reason = "resource_hint"
# Legacy: page_fetch transport is no longer used by Claude (context_request since v0.x).
# Kept for potential future plugins that still use page_fetch transport.
elif (
error.helper_name == "stream_raw_via_page_fetch"
and error.stage in {"read_timeout", "evaluate_timeout"}
):
browser_restart_reason = f"{error.helper_name}:{error.stage}"
if browser_restart_reason is not None:
logger.warning(
"[chat] escalating browser recovery to full restart proxy=%s request_id=%s reason=%s",
target.proxy_key.fingerprint_id,
request_id,
browser_restart_reason,
)
closed = await self._browser_manager.close_browser(target.proxy_key)
self._apply_closed_tabs_locked(closed)
return
self._invalidate_tab_sessions_locked(target.proxy_key, type_name)
closed = await self._browser_manager.close_tab(target.proxy_key, type_name)
if closed is not None:
self._apply_closed_tabs_locked([closed])
def _revive_tab_if_possible_locked(
self,
proxy_key: ProxyKey,
type_name: str,
) -> bool:
tab = self._browser_manager.get_tab(proxy_key, type_name)
if tab is None or tab.active_requests != 0:
return False
if tab.accepting_new:
return True
pair = self._pool.get_account_by_id(tab.account_id)
if pair is None:
return False
_, account = pair
if not account.is_available():
return False
tab.accepting_new = True
tab.state = "ready"
tab.frozen_until = None
tab.last_used_at = time.time()
return True
async def _reconcile_tabs_locked(self) -> None:
"""
收尾所有 non-ready tab:
- 若原账号已恢复可用,则恢复 tab
- 否则若同组有其他可用账号,则在 drained 后切号
- 否则关闭 tab
"""
for proxy_key, entry in list(self._browser_manager.list_browser_entries()):
for type_name in list(entry.tabs.keys()):
tab = entry.tabs[type_name]
if tab.accepting_new:
continue
if tab.active_requests != 0:
continue
if self._revive_tab_if_possible_locked(proxy_key, type_name):
continue
group = self._pool.get_group_by_proxy_key(proxy_key)
if group is None:
await self._clear_tab_domain_cookies_if_supported(
proxy_key, type_name
)
closed = await self._browser_manager.close_tab(proxy_key, type_name)
if closed is not None:
self._apply_closed_tabs_locked([closed])
continue
next_account = self._pool.next_available_account_in_group(
group,
type_name,
exclude_account_ids={tab.account_id},
)
if next_account is not None:
plugin = PluginRegistry.get(type_name)
if plugin is None:
continue
self._invalidate_tab_sessions_locked(proxy_key, type_name)
switched = await self._browser_manager.switch_tab_account(
proxy_key,
type_name,
self._pool.account_id(group, next_account),
self._make_apply_auth_fn(plugin, next_account),
)
if switched:
continue
await self._clear_tab_domain_cookies_if_supported(proxy_key, type_name)
closed = await self._browser_manager.close_tab(proxy_key, type_name)
if closed is not None:
self._apply_closed_tabs_locked([closed])
async def _reuse_session_target_locked(
self,
plugin: Any,
type_name: str,
session_id: str,
) -> _RequestTarget | None:
entry = self._session_cache.get(session_id)
if entry is None or entry.type_name != type_name:
return None
pair = self._pool.get_account_by_id(entry.account_id)
if pair is None:
self._invalidate_session_locked(session_id, entry)
return None
group, account = pair
tab = self._browser_manager.get_tab(entry.proxy_key, type_name)
if (
tab is None
or tab.account_id != entry.account_id
or not plugin.has_session(session_id)
):
self._invalidate_session_locked(session_id, entry)
return None
if not tab.accepting_new:
self._invalidate_session_locked(session_id, entry)
return None
if session_id in self._busy_sessions:
raise RuntimeError("当前会话正在处理中,请稍后再试")
if tab.active_requests >= self._tab_max_concurrent:
raise RuntimeError("当前会话所在 tab 繁忙,请稍后再试")
page = self._browser_manager.acquire_tab(
entry.proxy_key,
type_name,
self._tab_max_concurrent,
)
if page is None:
raise RuntimeError("当前会话暂不可复用,请稍后再试")
self._session_cache.touch(session_id)
self._busy_sessions.add(session_id)
context = await self._browser_manager.ensure_browser(
entry.proxy_key,
group.proxy_pass,
)
return _RequestTarget(
proxy_key=entry.proxy_key,
group=group,
account=account,
context=context,
page=page,
session_id=session_id,
full_history=False,
)
async def _allocate_new_target_locked(
self,
type_name: str,
) -> _RequestTarget:
# 1. 已打开浏览器里已有该 type 的可服务 tab,直接复用。
existing_tabs: list[tuple[int, float, ProxyKey, TabRuntime]] = []
for proxy_key, entry in self._browser_manager.list_browser_entries():
tab = entry.tabs.get(type_name)
if (
tab is not None
and tab.accepting_new
and tab.active_requests < self._tab_max_concurrent
):
existing_tabs.append(
(tab.active_requests, tab.last_used_at, proxy_key, tab)
)
if existing_tabs:
_, _, proxy_key, tab = min(existing_tabs, key=lambda item: item[:2])
pair = self._pool.get_account_by_id(tab.account_id)
if pair is None:
self._invalidate_tab_sessions_locked(proxy_key, type_name)
closed = await self._browser_manager.close_tab(proxy_key, type_name)
if closed is not None:
self._apply_closed_tabs_locked([closed])
else:
group, account = pair
page = self._browser_manager.acquire_tab(
proxy_key,
type_name,
self._tab_max_concurrent,
)
if page is not None:
context = await self._browser_manager.ensure_browser(
proxy_key,
group.proxy_pass,
)
return _RequestTarget(
proxy_key=proxy_key,
group=group,
account=account,
context=context,
page=page,
session_id=None,
full_history=True,
)
# 2. 已打开浏览器里还没有该 type tab,但该组有可用账号,直接建新 tab。
open_browser_candidates: list[
tuple[int, float, ProxyKey, ProxyGroupConfig]
] = []
for proxy_key, entry in self._browser_manager.list_browser_entries():
if type_name in entry.tabs:
continue
group = self._pool.get_group_by_proxy_key(proxy_key)
if group is None:
continue
if not self._pool.has_available_account_in_group(group, type_name):
continue
open_browser_candidates.append(
(
self._browser_manager.browser_load(proxy_key),
entry.last_used_at,
proxy_key,
group,
)
)
if open_browser_candidates:
_, _, proxy_key, group = min(
open_browser_candidates, key=lambda item: item[:2]
)
account = self._pool.next_available_account_in_group(group, type_name)
if account is not None:
plugin = PluginRegistry.get(type_name)
if plugin is None:
raise ValueError(f"未注册的 type: {type_name}")
await self._browser_manager.open_tab(
proxy_key,
group.proxy_pass,
type_name,
self._pool.account_id(group, account),
plugin.create_page,
self._make_apply_auth_fn(plugin, account),
)
page = self._browser_manager.acquire_tab(
proxy_key,
type_name,
self._tab_max_concurrent,
)
if page is None:
raise RuntimeError("新建 tab 后仍无法占用请求槽位")
context = await self._browser_manager.ensure_browser(
proxy_key,
group.proxy_pass,
)
return _RequestTarget(
proxy_key=proxy_key,
group=group,
account=account,
context=context,
page=page,
session_id=None,
full_history=True,
)
# 3. 已打开浏览器里该 type tab 已 drained,且同组有备用账号,可在当前 tab 切号。
switch_candidates: list[tuple[float, ProxyKey, ProxyGroupConfig]] = []
for proxy_key, entry in self._browser_manager.list_browser_entries():
tab = entry.tabs.get(type_name)
if tab is None or tab.active_requests != 0:
continue
group = self._pool.get_group_by_proxy_key(proxy_key)
if group is None:
continue
if not self._pool.has_available_account_in_group(
group,
type_name,
exclude_account_ids={tab.account_id},
):
continue
switch_candidates.append((tab.last_used_at, proxy_key, group))
if switch_candidates:
_, proxy_key, group = min(switch_candidates, key=lambda item: item[0])
tab = self._browser_manager.get_tab(proxy_key, type_name)
plugin = PluginRegistry.get(type_name)
if tab is not None and plugin is not None:
next_account = self._pool.next_available_account_in_group(
group,
type_name,
exclude_account_ids={tab.account_id},
)
if next_account is not None:
self._invalidate_tab_sessions_locked(proxy_key, type_name)
switched = await self._browser_manager.switch_tab_account(
proxy_key,
type_name,
self._pool.account_id(group, next_account),
self._make_apply_auth_fn(plugin, next_account),
)
if switched:
page = self._browser_manager.acquire_tab(
proxy_key,
type_name,
self._tab_max_concurrent,
)
if page is None:
raise RuntimeError("切号后仍无法占用请求槽位")
context = await self._browser_manager.ensure_browser(
proxy_key,
group.proxy_pass,
)
return _RequestTarget(
proxy_key=proxy_key,
group=group,
account=next_account,
context=context,
page=page,
session_id=None,
full_history=True,
)
# 4. 开新浏览器。
open_groups = {
proxy_key.fingerprint_id
for proxy_key in self._browser_manager.current_proxy_keys()
}
pair = self._pool.next_available_pair(
type_name,
exclude_fingerprint_ids=open_groups,
)
if pair is None:
raise ValueError(f"没有类别为 {type_name!r} 的可用账号,请稍后再试")
group, account = pair
proxy_key = _proxy_key_for_group(group)
plugin = PluginRegistry.get(type_name)
if plugin is None:
raise ValueError(f"未注册的 type: {type_name}")
await self._browser_manager.open_tab(
proxy_key,
group.proxy_pass,
type_name,
self._pool.account_id(group, account),
plugin.create_page,
self._make_apply_auth_fn(plugin, account),
)
page = self._browser_manager.acquire_tab(
proxy_key,
type_name,
self._tab_max_concurrent,
)
if page is None:
raise RuntimeError("新浏览器建 tab 后仍无法占用请求槽位")
context = await self._browser_manager.ensure_browser(
proxy_key, group.proxy_pass
)
return _RequestTarget(
proxy_key=proxy_key,
group=group,
account=account,
context=context,
page=page,
session_id=None,
full_history=True,
)
async def _stream_completion(
self,
type_name: str,
req: OpenAIChatRequest,
) -> AsyncIterator[str]:
"""
内部实现:调度 + 插件 stream_completion 字符串流,末尾附加 session_id 零宽编码。
对外仅通过 stream_openai_events() 暴露事件流。
"""
plugin = PluginRegistry.get(type_name)
if plugin is None:
raise ValueError(f"未注册的 type: {type_name}")
raw_messages = _request_messages_as_dicts(req)
conv_uuid = req.resume_session_id or parse_conv_uuid_from_messages(raw_messages)
# Fingerprint matching: when the client doesn't preserve the zero-width
# session marker (conv_uuid is None), compute a fingerprint from
# system prompt + first user message and look up the matching session.
# This replaces sticky session and prevents context pollution.
fingerprint = ""
if not conv_uuid:
fingerprint = compute_conversation_fingerprint(req.messages)
if fingerprint:
entry = self._conv_index.lookup(fingerprint)
if entry is not None:
conv_uuid = entry.session_id
logger.info("[chat] type=%s parsed conv_uuid=%s fingerprint=%s", type_name, conv_uuid, fingerprint or "n/a")
has_tools = bool(req.tools)
react_prompt_prefix = format_react_prompt(req.tools or []) if has_tools else ""
debug_path = (
Path(__file__).resolve().parent.parent.parent
/ "debug"
/ "chat_prompt_debug.json"
)
max_retries = 3
for attempt in range(max_retries):
target: _RequestTarget | None = None
active_session_id: str | None = None
request_id = uuid.uuid4().hex
try:
async with self._schedule_lock:
if conv_uuid:
target = await self._reuse_session_target_locked(
plugin,
type_name,
conv_uuid,
)
if target is None:
target = await self._allocate_new_target_locked(type_name)
if target.session_id is not None:
active_session_id = target.session_id
content = extract_user_content(
req.messages,
has_tools=has_tools,
react_prompt_prefix=react_prompt_prefix,
full_history=target.full_history,
)
if not content.strip() and req.attachment_files:
content = "Please analyze the attached image."
if not content.strip():
raise ValueError("messages 中需至少有一条带 content 的 user 消息")
debug_path.parent.mkdir(parents=True, exist_ok=True)
debug_path.write_text(
json.dumps(
{
"prompt": content,
"full_history": target.full_history,
"type": type_name,
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
account_id = self._pool.account_id(target.group, target.account)
session_id = target.session_id
if session_id is None:
await plugin.ensure_request_ready(
target.context,
target.page,
request_id=request_id,
session_id=None,
phase="create_conversation",
account_id=account_id,
)
logger.info(
"[chat] create_conversation type=%s proxy=%s account=%s",
type_name,
target.proxy_key.fingerprint_id,
account_id,
)
session_id = await plugin.create_conversation(
target.context,
target.page,
timezone=target.group.timezone
or getattr(target.proxy_key, "timezone", None)
or TIMEZONE,
public_model=str(getattr(req, "model", "") or ""),
upstream_model=str(getattr(req, "upstream_model", "") or ""),
request_id=request_id,
)
if not session_id:
raise RuntimeError("插件创建会话失败")
async with self._schedule_lock:
self._session_cache.put(
session_id,
target.proxy_key,
type_name,
account_id,
)
self._browser_manager.register_session(
target.proxy_key,
type_name,
session_id,
)
self._busy_sessions.add(session_id)
# Register fingerprint for future matching
if fingerprint:
self._conv_index.register(
fingerprint,
session_id,
len(req.messages),
account_id,
)
active_session_id = session_id
# Skip pre-stream probe for newly created sessions:
# create_conversation already validated page health.
if target.session_id is not None:
await plugin.ensure_request_ready(
target.context,
target.page,
request_id=request_id,
session_id=session_id,
phase="stream_completion",
account_id=account_id,
)
logger.info(
"[chat] stream_completion type=%s session_id=%s proxy=%s account=%s full_history=%s",
type_name,
session_id,
target.proxy_key.fingerprint_id,
account_id,
target.full_history,
)
# 根据是否 full_history 选择附件来源:
# - 复用会话(full_history=False):仅最后一条 user 的图片(可能为空,则本轮不带图)
# - 新建/重建会话(full_history=True):所有历史 user 的图片
attachments = (
req.attachment_files_all_users
if target.full_history
else req.attachment_files_last_user
)
proxy_url = None
proxy_auth = None
proxy_forwarder = None
if plugin.stream_transport() == "context_request":
proxy_url, proxy_auth, proxy_forwarder = self._stream_proxy_settings(target)
target.proxy_url = proxy_url
target.proxy_auth = proxy_auth
target.proxy_forwarder = proxy_forwarder
try:
stream = cast(
AsyncIterator[str],
plugin.stream_completion(
target.context,
target.page,
session_id,
content,
request_id=request_id,
attachments=attachments,
proxy_url=proxy_url,
proxy_auth=proxy_auth,
),
)
async for chunk in stream:
yield chunk
finally:
if proxy_forwarder is not None:
try:
proxy_forwarder.stop()
except Exception:
pass
target.proxy_forwarder = None
yield session_id_suffix(session_id)
return
except AccountFrozenError as e:
logger.warning(
"账号限流/额度用尽(插件上报),切换资源重试: type=%s proxy=%s err=%s",
type_name,
target.proxy_key.fingerprint_id if target else None,
e,
)
async with self._schedule_lock:
if target is not None:
self.report_account_unfreeze(
target.group.fingerprint_id,
target.account.name,
e.unfreeze_at,
)
self._browser_manager.mark_tab_draining(
target.proxy_key,
type_name,
frozen_until=e.unfreeze_at,
)
self._invalidate_tab_sessions_locked(
target.proxy_key, type_name
)
if attempt == max_retries - 1:
raise RuntimeError(
f"已重试 {max_retries} 次仍限流/过载,请稍后再试: {e}"
) from e
continue
except BrowserResourceInvalidError as e:
proxy_for_log = (
target.proxy_key.fingerprint_id
if target is not None
else getattr(getattr(e, "proxy_key", None), "fingerprint_id", None)
)
logger.warning(
"[chat] browser resource invalid bubbled type=%s request_id=%s proxy=%s session_id=%s helper=%s stage=%s resource=%s err=%s",
type_name,
request_id,
proxy_for_log,
active_session_id,
e.helper_name,
e.stage,
e.resource_hint,
e,
)
async with self._schedule_lock:
if target is not None:
await self._recover_browser_resource_invalid_locked(
type_name,
target,
request_id,
active_session_id,
e,
attempt,
max_retries,
)
elif getattr(e, "proxy_key", None) is not None:
closed = await self._browser_manager.close_browser(e.proxy_key)
self._apply_closed_tabs_locked(closed)
if attempt == max_retries - 1:
raise RuntimeError(
f"浏览器资源已失效,重试 {max_retries} 次后仍失败: {e}"
) from e
continue
finally:
if target is not None:
async with self._schedule_lock:
if active_session_id is not None:
self._busy_sessions.discard(active_session_id)
self._browser_manager.release_tab(target.proxy_key, type_name)
async def stream_openai_events(
self,
type_name: str,
req: OpenAIChatRequest,
) -> AsyncIterator[OpenAIStreamEvent]:
"""
唯一流式出口:以 OpenAIStreamEvent 为中间态。插件产出字符串流,
在此包装为 content_delta + finish,供协议适配层编码为各协议 SSE。
"""
async for chunk in self._stream_completion(type_name, req):
# session marker 也作为 content_delta 透传(对事件消费者而言是普通文本片段)
yield OpenAIStreamEvent(type="content_delta", content=chunk)
yield OpenAIStreamEvent(type="finish", finish_reason="stop")
|