""" 浏览器管理器:按 ProxyKey 管理浏览器进程;每个浏览器内每个 type 仅保留一个 tab。 当前实现的职责: - 一个 ProxyKey 对应一个 Chromium 进程 - 一个浏览器内,一个 type 只允许一个 page/tab - tab 绑定一个 account,只有 drained 后才能切号 - tab 可承载多个 session,并记录活跃请求数与最近使用时间 """ from __future__ import annotations import asyncio import logging import os import subprocess import tempfile import time from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Coroutine if TYPE_CHECKING: from core.runtime.local_proxy_forwarder import LocalProxyForwarder from playwright.async_api import Browser, BrowserContext, Page, async_playwright from core.constants import CDP_PORT_RANGE, CHROMIUM_BIN, TIMEZONE, user_data_dir from core.plugin.errors import BrowserResourceInvalidError from core.runtime.keys import ProxyKey logger = logging.getLogger(__name__) CreatePageFn = Callable[[BrowserContext, Page | None], Coroutine[Any, Any, Page]] ApplyAuthFn = Callable[[BrowserContext, Page], Coroutine[Any, Any, None]] async def _wait_for_cdp( host: str, port: int, max_attempts: int = 60, interval: float = 2.0, connect_timeout: float = 2.0, ) -> bool: for _ in range(max_attempts): try: _, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=connect_timeout ) writer.close() await writer.wait_closed() return True except (OSError, asyncio.TimeoutError): await asyncio.sleep(interval) return False def _is_cdp_listening(port: int) -> bool: import socket try: with socket.create_connection(("127.0.0.1", port), timeout=1.0): pass return True except OSError: return False @dataclass class TabRuntime: """浏览器中的一个 type tab。""" type_name: str page: Page account_id: str active_requests: int = 0 accepting_new: bool = True state: str = "ready" last_used_at: float = field(default_factory=time.time) frozen_until: int | None = None sessions: set[str] = field(default_factory=set) @dataclass class BrowserEntry: """单个 ProxyKey 对应的浏览器运行时。""" proc: subprocess.Popen[Any] port: int browser: Browser context: BrowserContext stderr_path: Path | None = None tabs: dict[str, TabRuntime] = field(default_factory=dict) last_used_at: float = field(default_factory=time.time) proxy_forwarder: Any = None # LocalProxyForwarder | None,仅 use_proxy 时非空 @dataclass class ClosedTabInfo: """关闭 tab/browser 时回传的 session 清理信息。""" proxy_key: ProxyKey type_name: str account_id: str session_ids: list[str] class BrowserManager: """按代理组管理浏览器及其 type -> tab 映射。""" def __init__( self, chromium_bin: str = CHROMIUM_BIN, headless: bool = False, no_sandbox: bool = False, disable_gpu: bool = False, disable_gpu_sandbox: bool = False, port_range: list[int] | None = None, cdp_wait_max_attempts: int = 90, cdp_wait_interval_seconds: float = 2.0, cdp_wait_connect_timeout_seconds: float = 2.0, ) -> None: self._chromium_bin = chromium_bin self._headless = headless self._no_sandbox = no_sandbox self._disable_gpu = disable_gpu self._disable_gpu_sandbox = disable_gpu_sandbox self._port_range = port_range or list(CDP_PORT_RANGE) self._entries: dict[ProxyKey, BrowserEntry] = {} self._available_ports: set[int] = set(self._port_range) self._playwright: Any = None self._cdp_wait_max_attempts = max(1, int(cdp_wait_max_attempts)) self._cdp_wait_interval_seconds = max(0.05, float(cdp_wait_interval_seconds)) self._cdp_wait_connect_timeout_seconds = max( 0.2, float(cdp_wait_connect_timeout_seconds) ) def _stderr_log_path(self, proxy_key: ProxyKey, port: int) -> Path: log_dir = Path(tempfile.gettempdir()) / "web2api-browser-logs" log_dir.mkdir(parents=True, exist_ok=True) return log_dir / ( f"{proxy_key.fingerprint_id}-{port}-{int(time.time())}.stderr.log" ) @staticmethod def _read_stderr_tail(stderr_path: Path | None, max_chars: int = 4000) -> str: if stderr_path is None or not stderr_path.exists(): return "" try: content = stderr_path.read_text(encoding="utf-8", errors="replace") except Exception: return "" content = content.strip() if not content: return "" return content[-max_chars:] @staticmethod def _cleanup_stderr_log(stderr_path: Path | None) -> None: if stderr_path is None: return try: stderr_path.unlink(missing_ok=True) except Exception: pass def current_proxy_keys(self) -> list[ProxyKey]: return list(self._entries.keys()) def browser_count(self) -> int: return len(self._entries) def list_browser_entries(self) -> list[tuple[ProxyKey, BrowserEntry]]: return list(self._entries.items()) def get_browser_entry(self, proxy_key: ProxyKey) -> BrowserEntry | None: return self._entries.get(proxy_key) def get_tab(self, proxy_key: ProxyKey, type_name: str) -> TabRuntime | None: entry = self._entries.get(proxy_key) if entry is None: return None return entry.tabs.get(type_name) def browser_load(self, proxy_key: ProxyKey) -> int: entry = self._entries.get(proxy_key) if entry is None: return 0 return sum(tab.active_requests for tab in entry.tabs.values()) def browser_diagnostics(self, proxy_key: ProxyKey) -> dict[str, Any]: entry = self._entries.get(proxy_key) if entry is None: return { "browser_present": False, "proc_alive": False, "cdp_listening": False, "stderr_tail": "", "tab_count": 0, "active_requests": 0, "tabs": [], } tabs = [ { "type": type_name, "state": tab.state, "accepting_new": tab.accepting_new, "active_requests": tab.active_requests, "session_count": len(tab.sessions), } for type_name, tab in entry.tabs.items() ] return { "browser_present": True, "proc_alive": entry.proc.poll() is None, "cdp_listening": _is_cdp_listening(entry.port), "stderr_tail": self._read_stderr_tail(entry.stderr_path), "tab_count": len(entry.tabs), "active_requests": sum(tab.active_requests for tab in entry.tabs.values()), "tabs": tabs, } def _raise_browser_resource_invalid( self, proxy_key: ProxyKey, *, detail: str, helper_name: str, stage: str, resource_hint: str = "browser", request_url: str = "", page_url: str = "", request_id: str | None = None, stream_phase: str | None = None, type_name: str | None = None, account_id: str | None = None, ) -> None: diagnostics = self.browser_diagnostics(proxy_key) logger.warning( "[browser-resource-invalid] helper=%s stage=%s proxy=%s resource=%s request_id=%s type=%s account=%s proc_alive=%s cdp_listening=%s tab_count=%s active_requests=%s stderr_tail=%s detail=%s", helper_name, stage, proxy_key.fingerprint_id, resource_hint, request_id, type_name, account_id, diagnostics.get("proc_alive"), diagnostics.get("cdp_listening"), diagnostics.get("tab_count"), diagnostics.get("active_requests"), diagnostics.get("stderr_tail"), detail, ) raise BrowserResourceInvalidError( detail, helper_name=helper_name, operation="browser_manager", stage=stage, resource_hint=resource_hint, request_url=request_url, page_url=page_url, request_id=request_id, stream_phase=stream_phase, proxy_key=proxy_key, type_name=type_name, account_id=account_id, ) def touch_browser(self, proxy_key: ProxyKey) -> None: entry = self._entries.get(proxy_key) if entry is not None: entry.last_used_at = time.time() def _launch_process( self, proxy_key: ProxyKey, proxy_pass: str, port: int, ) -> tuple[subprocess.Popen[Any], Path, LocalProxyForwarder | None]: """启动 Chromium 进程(代理时使用本地转发鉴权,无扩展),使用指定 port。""" udd = user_data_dir(proxy_key.fingerprint_id) udd.mkdir(parents=True, exist_ok=True) if not Path(self._chromium_bin).exists(): raise RuntimeError(f"Chromium 不存在: {self._chromium_bin}") args = [ self._chromium_bin, f"--remote-debugging-port={port}", f"--fingerprint={proxy_key.fingerprint_id}", "--fingerprint-platform=windows", "--fingerprint-brand=Edge", f"--user-data-dir={udd}", f"--timezone={proxy_key.timezone or TIMEZONE}", "--force-webrtc-ip-handling-policy", "--webrtc-ip-handling-policy=disable_non_proxied_udp", "--disable-features=AsyncDNS", "--disable-dev-shm-usage", "--no-first-run", "--no-default-browser-check", # Memory optimization for constrained environments (HF Spaces cpu-basic) "--renderer-process-limit=1", "--disable-extensions", "--disable-background-networking", "--disable-component-update", "--disable-sync", "--disable-translate", "--disable-features=MediaRouter,TranslateUI", "--js-flags=--max-old-space-size=256", ] proxy_forwarder = None if proxy_key.use_proxy: from core.runtime.local_proxy_forwarder import ( LocalProxyForwarder, UpstreamProxy, parse_proxy_server, ) upstream_host, upstream_port = parse_proxy_server(proxy_key.proxy_host) upstream = UpstreamProxy( host=upstream_host, port=upstream_port, username=proxy_key.proxy_user, password=proxy_pass, ) proxy_forwarder = LocalProxyForwarder( upstream, listen_host="127.0.0.1", listen_port=0, on_log=lambda msg: logger.debug("[proxy] %s", msg), ) proxy_forwarder.start() args.append(f"--proxy-server={proxy_forwarder.proxy_url}") if self._headless: args.extend( [ "--headless=new", "--window-size=1920,1080", ] ) if self._headless or self._disable_gpu: args.append("--disable-gpu") if self._disable_gpu_sandbox: args.append("--disable-gpu-sandbox") if self._no_sandbox: args.extend( [ "--no-sandbox", "--disable-setuid-sandbox", ] ) env = os.environ.copy() env["NODE_OPTIONS"] = ( env.get("NODE_OPTIONS") or "" ).strip() + " --no-deprecation" env.setdefault("DBUS_SESSION_BUS_ADDRESS", "/dev/null") stderr_path = self._stderr_log_path(proxy_key, port) stderr_fp = stderr_path.open("ab") try: proc = subprocess.Popen( args, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=stderr_fp, env=env, ) finally: stderr_fp.close() return proc, stderr_path, proxy_forwarder async def ensure_browser( self, proxy_key: ProxyKey, proxy_pass: str, ) -> BrowserContext: """ 确保存在对应 proxy_key 的浏览器;若已有且存活则直接复用。 """ entry = self._entries.get(proxy_key) if entry is not None: if entry.proc.poll() is not None or not _is_cdp_listening(entry.port): await self._close_entry_async(proxy_key) else: entry.last_used_at = time.time() return entry.context if not self._available_ports: raise RuntimeError( "无可用 CDP 端口,当前并发浏览器数已达上限,请稍后重试或增大 cdp_port_count" ) port = self._available_ports.pop() proc, stderr_path, proxy_forwarder = self._launch_process( proxy_key, proxy_pass, port ) logger.info( "已启动 Chromium PID=%s port=%s mode=%s headless=%s no_sandbox=%s disable_gpu=%s disable_gpu_sandbox=%s,等待 CDP 就绪...", proc.pid, port, "proxy" if proxy_key.use_proxy else "direct", self._headless, self._no_sandbox, self._disable_gpu, self._disable_gpu_sandbox, ) ok = await _wait_for_cdp( "127.0.0.1", port, max_attempts=self._cdp_wait_max_attempts, interval=self._cdp_wait_interval_seconds, connect_timeout=self._cdp_wait_connect_timeout_seconds, ) if not ok: self._available_ports.add(port) if proxy_forwarder is not None: try: proxy_forwarder.stop() except Exception: pass try: proc.terminate() proc.wait(timeout=5) except Exception: pass stderr_tail = self._read_stderr_tail(stderr_path) self._cleanup_stderr_log(stderr_path) if stderr_tail: logger.error( "Chromium 启动失败,CDP 未就绪。stderr tail:\n%s", stderr_tail, ) raise RuntimeError("CDP 未在预期时间内就绪") if self._playwright is None: self._playwright = await async_playwright().start() endpoint = f"http://127.0.0.1:{port}" try: browser = await self._playwright.chromium.connect_over_cdp( endpoint, timeout=10000 ) except Exception: self._available_ports.add(port) if proxy_forwarder is not None: try: proxy_forwarder.stop() except Exception: pass try: proc.terminate() proc.wait(timeout=5) except Exception: pass stderr_tail = self._read_stderr_tail(stderr_path) self._cleanup_stderr_log(stderr_path) if stderr_tail: logger.error( "Chromium 已监听 CDP 但 connect_over_cdp 失败。stderr tail:\n%s", stderr_tail, ) raise context = browser.contexts[0] if browser.contexts else None if context is None: await browser.close() self._available_ports.add(port) if proxy_forwarder is not None: try: proxy_forwarder.stop() except Exception: pass try: proc.terminate() proc.wait(timeout=5) except Exception: pass self._cleanup_stderr_log(stderr_path) raise RuntimeError("浏览器无默认 context") self._entries[proxy_key] = BrowserEntry( proc=proc, port=port, browser=browser, context=context, stderr_path=stderr_path, proxy_forwarder=proxy_forwarder, ) return context async def open_tab( self, proxy_key: ProxyKey, proxy_pass: str, type_name: str, account_id: str, create_page_fn: CreatePageFn, apply_auth_fn: ApplyAuthFn, ) -> TabRuntime: """在指定浏览器中创建一个 type tab,并绑定到 account。""" context = await self.ensure_browser(proxy_key, proxy_pass) entry = self._entries.get(proxy_key) if entry is None: raise RuntimeError("ensure_browser 未创建 entry") existing = entry.tabs.get(type_name) if existing is not None: return existing logger.info( "[tab] opening proxy=%s type=%s account=%s reuse_blank=%s tab_count=%s active_requests=%s", proxy_key.fingerprint_id, type_name, account_id, bool(len(entry.tabs) == 0 and context.pages), len(entry.tabs), sum(tab.active_requests for tab in entry.tabs.values()), ) # 首个 tab 时复用 Chromium 默认空白页,避免多一个无用标签 reuse_page = ( context.pages[0] if (len(entry.tabs) == 0 and context.pages) else None ) try: page = await create_page_fn(context, reuse_page) except Exception as e: msg = str(e) normalized = msg.lower() if "target.createtarget" in normalized or "failed to open a new tab" in normalized: self._raise_browser_resource_invalid( proxy_key, detail=msg, helper_name="open_tab", stage="create_page", resource_hint="browser", type_name=type_name, account_id=account_id, ) raise try: await apply_auth_fn(context, page) except Exception as e: try: await page.close() except Exception: pass msg = str(e) normalized = msg.lower() if ( "target crashed" in normalized or "page has been closed" in normalized or "browser has been closed" in normalized or "has been disconnected" in normalized ): self._raise_browser_resource_invalid( proxy_key, detail=msg, helper_name="open_tab", stage="apply_auth", resource_hint="browser", page_url=getattr(page, "url", "") or "", type_name=type_name, account_id=account_id, ) raise tab = TabRuntime( type_name=type_name, page=page, account_id=account_id, ) entry.tabs[type_name] = tab entry.last_used_at = time.time() logger.info( "[tab] opened mode=%s proxy=%s type=%s account=%s", "proxy" if proxy_key.use_proxy else "direct", proxy_key.fingerprint_id, type_name, account_id, ) return tab async def switch_tab_account( self, proxy_key: ProxyKey, type_name: str, account_id: str, apply_auth_fn: ApplyAuthFn, ) -> bool: """ 在同一个 page 上切换账号。只有 drained 后(active_requests==0)才允许切号。 """ entry = self._entries.get(proxy_key) if entry is None: return False tab = entry.tabs.get(type_name) if tab is None or tab.active_requests != 0: return False tab.accepting_new = False tab.state = "switching" try: await apply_auth_fn(entry.context, tab.page) except Exception: tab.state = "draining" return False tab.account_id = account_id tab.accepting_new = True tab.state = "ready" tab.frozen_until = None tab.last_used_at = time.time() tab.sessions.clear() entry.last_used_at = time.time() logger.info( "[tab] switched account mode=%s proxy=%s type=%s account=%s", "proxy" if proxy_key.use_proxy else "direct", proxy_key.fingerprint_id, type_name, account_id, ) return True def acquire_tab( self, proxy_key: ProxyKey, type_name: str, max_concurrent: int, ) -> Page | None: """ 为一次请求占用 tab;tab 必须存在、可接新请求且未达到并发上限。 """ entry = self._entries.get(proxy_key) if entry is None: return None tab = entry.tabs.get(type_name) if tab is None: return None if not tab.accepting_new or tab.active_requests >= max_concurrent: return None tab.active_requests += 1 tab.last_used_at = time.time() entry.last_used_at = tab.last_used_at tab.state = "busy" return tab.page def release_tab(self, proxy_key: ProxyKey, type_name: str) -> None: """释放一次请求占用。""" entry = self._entries.get(proxy_key) if entry is None: return tab = entry.tabs.get(type_name) if tab is None: return if tab.active_requests > 0: tab.active_requests -= 1 tab.last_used_at = time.time() entry.last_used_at = tab.last_used_at if tab.active_requests == 0: if tab.accepting_new: tab.state = "ready" elif tab.frozen_until is not None: tab.state = "frozen" else: tab.state = "draining" def mark_tab_draining( self, proxy_key: ProxyKey, type_name: str, *, frozen_until: int | None = None, ) -> None: """禁止 tab 接受新请求,并标记为 draining/frozen。""" entry = self._entries.get(proxy_key) if entry is None: return tab = entry.tabs.get(type_name) if tab is None: return tab.accepting_new = False tab.frozen_until = frozen_until tab.last_used_at = time.time() entry.last_used_at = tab.last_used_at if frozen_until is not None: tab.state = "frozen" else: tab.state = "draining" def register_session( self, proxy_key: ProxyKey, type_name: str, session_id: str, ) -> None: entry = self._entries.get(proxy_key) if entry is None: return tab = entry.tabs.get(type_name) if tab is None: return tab.sessions.add(session_id) tab.last_used_at = time.time() entry.last_used_at = tab.last_used_at def unregister_session( self, proxy_key: ProxyKey, type_name: str, session_id: str, ) -> None: entry = self._entries.get(proxy_key) if entry is None: return tab = entry.tabs.get(type_name) if tab is None: return tab.sessions.discard(session_id) async def close_tab( self, proxy_key: ProxyKey, type_name: str, ) -> ClosedTabInfo | None: """关闭某个 type 的 tab,并返回需要失效的 session 列表。""" entry = self._entries.get(proxy_key) if entry is None: return None tab = entry.tabs.pop(type_name, None) if tab is None: return None try: await tab.page.close() except Exception: pass entry.last_used_at = time.time() logger.info( "[tab] closed mode=%s proxy=%s type=%s", "proxy" if proxy_key.use_proxy else "direct", proxy_key.fingerprint_id, type_name, ) return ClosedTabInfo( proxy_key=proxy_key, type_name=type_name, account_id=tab.account_id, session_ids=list(tab.sessions), ) async def close_browser(self, proxy_key: ProxyKey) -> list[ClosedTabInfo]: return await self._close_entry_async(proxy_key) async def _close_entry_async(self, proxy_key: ProxyKey) -> list[ClosedTabInfo]: entry = self._entries.get(proxy_key) if entry is None: return [] closed_tabs = [ ClosedTabInfo( proxy_key=proxy_key, type_name=type_name, account_id=tab.account_id, session_ids=list(tab.sessions), ) for type_name, tab in entry.tabs.items() ] for tab in list(entry.tabs.values()): try: await tab.page.close() except Exception: pass entry.tabs.clear() if entry.proxy_forwarder is not None: try: entry.proxy_forwarder.stop() except Exception as e: logger.warning("关闭本地代理转发时异常: %s", e) if entry.browser is not None: try: await entry.browser.close() except Exception as e: logger.warning("关闭 CDP 浏览器时异常: %s", e) try: entry.proc.terminate() entry.proc.wait(timeout=8) except subprocess.TimeoutExpired: entry.proc.kill() entry.proc.wait(timeout=3) except Exception as e: logger.warning("关闭浏览器进程时异常: %s", e) self._cleanup_stderr_log(entry.stderr_path) self._available_ports.add(entry.port) del self._entries[proxy_key] logger.info( "[browser] closed mode=%s proxy=%s", "proxy" if proxy_key.use_proxy else "direct", proxy_key.fingerprint_id, ) return closed_tabs async def collect_idle_browsers( self, *, idle_seconds: float, resident_browser_count: int, ) -> list[ClosedTabInfo]: """ 关闭空闲浏览器: - 浏览器下所有 tab 都没有活跃请求 - 所有 tab 均已空闲超过 idle_seconds - 当前浏览器数 > resident_browser_count """ if len(self._entries) <= resident_browser_count: return [] now = time.time() candidates: list[tuple[float, ProxyKey]] = [] for proxy_key, entry in self._entries.items(): if any(tab.active_requests > 0 for tab in entry.tabs.values()): continue if entry.tabs: last_tab_used = max(tab.last_used_at for tab in entry.tabs.values()) else: last_tab_used = entry.last_used_at if now - last_tab_used < idle_seconds: continue candidates.append((last_tab_used, proxy_key)) if not candidates: return [] closed: list[ClosedTabInfo] = [] max_close = max(0, len(self._entries) - resident_browser_count) for _, proxy_key in sorted(candidates, key=lambda item: item[0])[:max_close]: closed.extend(await self._close_entry_async(proxy_key)) return closed async def close_all(self) -> list[ClosedTabInfo]: """关闭全部浏览器和 tab。""" closed: list[ClosedTabInfo] = [] for proxy_key in list(self._entries.keys()): closed.extend(await self._close_entry_async(proxy_key)) return closed