Spaces:
Paused
Paused
| """ | |
| 浏览器管理器:按 ProxyKey 管理浏览器进程;每个浏览器内每个 type 仅保留一个 tab。 | |
| 当前实现的职责: | |
| - 一个 ProxyKey 对应一个 Chromium 进程 | |
| - 一个浏览器内,一个 type 只允许一个 page/tab | |
| - tab 绑定一个 account,只有 drained 后才能切号 | |
| - tab 可承载多个 session,并记录活跃请求数与最近使用时间 | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import os | |
| import subprocess | |
| import tempfile | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING, Any, Callable, Coroutine | |
| if TYPE_CHECKING: | |
| from core.runtime.local_proxy_forwarder import LocalProxyForwarder | |
| from playwright.async_api import Browser, BrowserContext, Page, async_playwright | |
| from core.constants import CDP_PORT_RANGE, CHROMIUM_BIN, TIMEZONE, user_data_dir | |
| from core.plugin.errors import BrowserResourceInvalidError | |
| from core.runtime.keys import ProxyKey | |
| logger = logging.getLogger(__name__) | |
| CreatePageFn = Callable[[BrowserContext, Page | None], Coroutine[Any, Any, Page]] | |
| ApplyAuthFn = Callable[[BrowserContext, Page], Coroutine[Any, Any, None]] | |
| async def _wait_for_cdp( | |
| host: str, | |
| port: int, | |
| max_attempts: int = 60, | |
| interval: float = 2.0, | |
| connect_timeout: float = 2.0, | |
| ) -> bool: | |
| for _ in range(max_attempts): | |
| try: | |
| _, writer = await asyncio.wait_for( | |
| asyncio.open_connection(host, port), timeout=connect_timeout | |
| ) | |
| writer.close() | |
| await writer.wait_closed() | |
| return True | |
| except (OSError, asyncio.TimeoutError): | |
| await asyncio.sleep(interval) | |
| return False | |
| def _is_cdp_listening(port: int) -> bool: | |
| import socket | |
| try: | |
| with socket.create_connection(("127.0.0.1", port), timeout=1.0): | |
| pass | |
| return True | |
| except OSError: | |
| return False | |
| class TabRuntime: | |
| """浏览器中的一个 type tab。""" | |
| type_name: str | |
| page: Page | |
| account_id: str | |
| active_requests: int = 0 | |
| accepting_new: bool = True | |
| state: str = "ready" | |
| last_used_at: float = field(default_factory=time.time) | |
| frozen_until: int | None = None | |
| sessions: set[str] = field(default_factory=set) | |
| class BrowserEntry: | |
| """单个 ProxyKey 对应的浏览器运行时。""" | |
| proc: subprocess.Popen[Any] | |
| port: int | |
| browser: Browser | |
| context: BrowserContext | |
| stderr_path: Path | None = None | |
| tabs: dict[str, TabRuntime] = field(default_factory=dict) | |
| last_used_at: float = field(default_factory=time.time) | |
| proxy_forwarder: Any = None # LocalProxyForwarder | None,仅 use_proxy 时非空 | |
| class ClosedTabInfo: | |
| """关闭 tab/browser 时回传的 session 清理信息。""" | |
| proxy_key: ProxyKey | |
| type_name: str | |
| account_id: str | |
| session_ids: list[str] | |
| class BrowserManager: | |
| """按代理组管理浏览器及其 type -> tab 映射。""" | |
| def __init__( | |
| self, | |
| chromium_bin: str = CHROMIUM_BIN, | |
| headless: bool = False, | |
| no_sandbox: bool = False, | |
| disable_gpu: bool = False, | |
| disable_gpu_sandbox: bool = False, | |
| port_range: list[int] | None = None, | |
| cdp_wait_max_attempts: int = 90, | |
| cdp_wait_interval_seconds: float = 2.0, | |
| cdp_wait_connect_timeout_seconds: float = 2.0, | |
| ) -> None: | |
| self._chromium_bin = chromium_bin | |
| self._headless = headless | |
| self._no_sandbox = no_sandbox | |
| self._disable_gpu = disable_gpu | |
| self._disable_gpu_sandbox = disable_gpu_sandbox | |
| self._port_range = port_range or list(CDP_PORT_RANGE) | |
| self._entries: dict[ProxyKey, BrowserEntry] = {} | |
| self._available_ports: set[int] = set(self._port_range) | |
| self._playwright: Any = None | |
| self._cdp_wait_max_attempts = max(1, int(cdp_wait_max_attempts)) | |
| self._cdp_wait_interval_seconds = max(0.05, float(cdp_wait_interval_seconds)) | |
| self._cdp_wait_connect_timeout_seconds = max( | |
| 0.2, float(cdp_wait_connect_timeout_seconds) | |
| ) | |
| def _stderr_log_path(self, proxy_key: ProxyKey, port: int) -> Path: | |
| log_dir = Path(tempfile.gettempdir()) / "web2api-browser-logs" | |
| log_dir.mkdir(parents=True, exist_ok=True) | |
| return log_dir / ( | |
| f"{proxy_key.fingerprint_id}-{port}-{int(time.time())}.stderr.log" | |
| ) | |
| def _read_stderr_tail(stderr_path: Path | None, max_chars: int = 4000) -> str: | |
| if stderr_path is None or not stderr_path.exists(): | |
| return "" | |
| try: | |
| content = stderr_path.read_text(encoding="utf-8", errors="replace") | |
| except Exception: | |
| return "" | |
| content = content.strip() | |
| if not content: | |
| return "" | |
| return content[-max_chars:] | |
| def _cleanup_stderr_log(stderr_path: Path | None) -> None: | |
| if stderr_path is None: | |
| return | |
| try: | |
| stderr_path.unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| def current_proxy_keys(self) -> list[ProxyKey]: | |
| return list(self._entries.keys()) | |
| def browser_count(self) -> int: | |
| return len(self._entries) | |
| def list_browser_entries(self) -> list[tuple[ProxyKey, BrowserEntry]]: | |
| return list(self._entries.items()) | |
| def get_browser_entry(self, proxy_key: ProxyKey) -> BrowserEntry | None: | |
| return self._entries.get(proxy_key) | |
| def get_tab(self, proxy_key: ProxyKey, type_name: str) -> TabRuntime | None: | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return None | |
| return entry.tabs.get(type_name) | |
| def browser_load(self, proxy_key: ProxyKey) -> int: | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return 0 | |
| return sum(tab.active_requests for tab in entry.tabs.values()) | |
| def browser_diagnostics(self, proxy_key: ProxyKey) -> dict[str, Any]: | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return { | |
| "browser_present": False, | |
| "proc_alive": False, | |
| "cdp_listening": False, | |
| "stderr_tail": "", | |
| "tab_count": 0, | |
| "active_requests": 0, | |
| "tabs": [], | |
| } | |
| tabs = [ | |
| { | |
| "type": type_name, | |
| "state": tab.state, | |
| "accepting_new": tab.accepting_new, | |
| "active_requests": tab.active_requests, | |
| "session_count": len(tab.sessions), | |
| } | |
| for type_name, tab in entry.tabs.items() | |
| ] | |
| return { | |
| "browser_present": True, | |
| "proc_alive": entry.proc.poll() is None, | |
| "cdp_listening": _is_cdp_listening(entry.port), | |
| "stderr_tail": self._read_stderr_tail(entry.stderr_path), | |
| "tab_count": len(entry.tabs), | |
| "active_requests": sum(tab.active_requests for tab in entry.tabs.values()), | |
| "tabs": tabs, | |
| } | |
| def _raise_browser_resource_invalid( | |
| self, | |
| proxy_key: ProxyKey, | |
| *, | |
| detail: str, | |
| helper_name: str, | |
| stage: str, | |
| resource_hint: str = "browser", | |
| request_url: str = "", | |
| page_url: str = "", | |
| request_id: str | None = None, | |
| stream_phase: str | None = None, | |
| type_name: str | None = None, | |
| account_id: str | None = None, | |
| ) -> None: | |
| diagnostics = self.browser_diagnostics(proxy_key) | |
| logger.warning( | |
| "[browser-resource-invalid] helper=%s stage=%s proxy=%s resource=%s request_id=%s type=%s account=%s proc_alive=%s cdp_listening=%s tab_count=%s active_requests=%s stderr_tail=%s detail=%s", | |
| helper_name, | |
| stage, | |
| proxy_key.fingerprint_id, | |
| resource_hint, | |
| request_id, | |
| type_name, | |
| account_id, | |
| diagnostics.get("proc_alive"), | |
| diagnostics.get("cdp_listening"), | |
| diagnostics.get("tab_count"), | |
| diagnostics.get("active_requests"), | |
| diagnostics.get("stderr_tail"), | |
| detail, | |
| ) | |
| raise BrowserResourceInvalidError( | |
| detail, | |
| helper_name=helper_name, | |
| operation="browser_manager", | |
| stage=stage, | |
| resource_hint=resource_hint, | |
| request_url=request_url, | |
| page_url=page_url, | |
| request_id=request_id, | |
| stream_phase=stream_phase, | |
| proxy_key=proxy_key, | |
| type_name=type_name, | |
| account_id=account_id, | |
| ) | |
| def touch_browser(self, proxy_key: ProxyKey) -> None: | |
| entry = self._entries.get(proxy_key) | |
| if entry is not None: | |
| entry.last_used_at = time.time() | |
| def _launch_process( | |
| self, | |
| proxy_key: ProxyKey, | |
| proxy_pass: str, | |
| port: int, | |
| ) -> tuple[subprocess.Popen[Any], Path, LocalProxyForwarder | None]: | |
| """启动 Chromium 进程(代理时使用本地转发鉴权,无扩展),使用指定 port。""" | |
| udd = user_data_dir(proxy_key.fingerprint_id) | |
| udd.mkdir(parents=True, exist_ok=True) | |
| if not Path(self._chromium_bin).exists(): | |
| raise RuntimeError(f"Chromium 不存在: {self._chromium_bin}") | |
| args = [ | |
| self._chromium_bin, | |
| f"--remote-debugging-port={port}", | |
| f"--fingerprint={proxy_key.fingerprint_id}", | |
| "--fingerprint-platform=windows", | |
| "--fingerprint-brand=Edge", | |
| f"--user-data-dir={udd}", | |
| f"--timezone={proxy_key.timezone or TIMEZONE}", | |
| "--force-webrtc-ip-handling-policy", | |
| "--webrtc-ip-handling-policy=disable_non_proxied_udp", | |
| "--disable-features=AsyncDNS,MediaRouter,TranslateUI", | |
| "--disable-dev-shm-usage", | |
| "--no-first-run", | |
| "--no-default-browser-check", | |
| # Memory optimization for constrained environments (HF Spaces cpu-basic) | |
| "--renderer-process-limit=1", | |
| "--disable-extensions", | |
| "--disable-background-networking", | |
| "--disable-component-update", | |
| "--disable-sync", | |
| "--disable-translate", | |
| "--js-flags=--max-old-space-size=512", | |
| ] | |
| proxy_forwarder = None | |
| if proxy_key.use_proxy: | |
| if proxy_key.proxy_user: | |
| # Upstream HTTP proxy with Basic auth — use local forwarder | |
| from core.runtime.local_proxy_forwarder import ( | |
| LocalProxyForwarder, | |
| UpstreamProxy, | |
| parse_proxy_server, | |
| ) | |
| upstream_host, upstream_port = parse_proxy_server(proxy_key.proxy_host) | |
| upstream = UpstreamProxy( | |
| host=upstream_host, | |
| port=upstream_port, | |
| username=proxy_key.proxy_user, | |
| password=proxy_pass, | |
| ) | |
| proxy_forwarder = LocalProxyForwarder( | |
| upstream, | |
| listen_host="127.0.0.1", | |
| listen_port=0, | |
| on_log=lambda msg: logger.debug("[proxy] %s", msg), | |
| ) | |
| proxy_forwarder.start() | |
| args.append(f"--proxy-server={proxy_forwarder.proxy_url}") | |
| else: | |
| # Direct proxy without auth (e.g. local xray socks5/http) | |
| args.append(f"--proxy-server={proxy_key.proxy_host}") | |
| if self._headless: | |
| args.extend( | |
| [ | |
| "--headless=new", | |
| "--window-size=1920,1080", | |
| ] | |
| ) | |
| if self._headless or self._disable_gpu: | |
| args.append("--disable-gpu") | |
| if self._disable_gpu_sandbox: | |
| args.append("--disable-gpu-sandbox") | |
| if self._no_sandbox: | |
| args.extend( | |
| [ | |
| "--no-sandbox", | |
| "--disable-setuid-sandbox", | |
| ] | |
| ) | |
| env = os.environ.copy() | |
| env["NODE_OPTIONS"] = ( | |
| env.get("NODE_OPTIONS") or "" | |
| ).strip() + " --no-deprecation" | |
| env.setdefault("DBUS_SESSION_BUS_ADDRESS", "/dev/null") | |
| stderr_path = self._stderr_log_path(proxy_key, port) | |
| stderr_fp = stderr_path.open("ab") | |
| try: | |
| proc = subprocess.Popen( | |
| args, | |
| stdin=subprocess.DEVNULL, | |
| stdout=subprocess.DEVNULL, | |
| stderr=stderr_fp, | |
| env=env, | |
| ) | |
| finally: | |
| stderr_fp.close() | |
| return proc, stderr_path, proxy_forwarder | |
| async def ensure_browser( | |
| self, | |
| proxy_key: ProxyKey, | |
| proxy_pass: str, | |
| ) -> BrowserContext: | |
| """ | |
| 确保存在对应 proxy_key 的浏览器;若已有且存活则直接复用。 | |
| """ | |
| entry = self._entries.get(proxy_key) | |
| if entry is not None: | |
| if entry.proc.poll() is not None or not _is_cdp_listening(entry.port): | |
| await self._close_entry_async(proxy_key) | |
| else: | |
| entry.last_used_at = time.time() | |
| return entry.context | |
| if not self._available_ports: | |
| raise RuntimeError( | |
| "无可用 CDP 端口,当前并发浏览器数已达上限,请稍后重试或增大 cdp_port_count" | |
| ) | |
| port = self._available_ports.pop() | |
| proc, stderr_path, proxy_forwarder = self._launch_process( | |
| proxy_key, proxy_pass, port | |
| ) | |
| logger.info( | |
| "已启动 Chromium PID=%s port=%s mode=%s headless=%s no_sandbox=%s disable_gpu=%s disable_gpu_sandbox=%s,等待 CDP 就绪...", | |
| proc.pid, | |
| port, | |
| "proxy" if proxy_key.use_proxy else "direct", | |
| self._headless, | |
| self._no_sandbox, | |
| self._disable_gpu, | |
| self._disable_gpu_sandbox, | |
| ) | |
| ok = await _wait_for_cdp( | |
| "127.0.0.1", | |
| port, | |
| max_attempts=self._cdp_wait_max_attempts, | |
| interval=self._cdp_wait_interval_seconds, | |
| connect_timeout=self._cdp_wait_connect_timeout_seconds, | |
| ) | |
| if not ok: | |
| self._available_ports.add(port) | |
| if proxy_forwarder is not None: | |
| try: | |
| proxy_forwarder.stop() | |
| except Exception: | |
| pass | |
| try: | |
| proc.terminate() | |
| proc.wait(timeout=5) | |
| except Exception: | |
| pass | |
| stderr_tail = self._read_stderr_tail(stderr_path) | |
| self._cleanup_stderr_log(stderr_path) | |
| if stderr_tail: | |
| logger.error( | |
| "Chromium 启动失败,CDP 未就绪。stderr tail:\n%s", | |
| stderr_tail, | |
| ) | |
| raise RuntimeError("CDP 未在预期时间内就绪") | |
| if self._playwright is None: | |
| self._playwright = await async_playwright().start() | |
| endpoint = f"http://127.0.0.1:{port}" | |
| try: | |
| browser = await self._playwright.chromium.connect_over_cdp( | |
| endpoint, timeout=10000 | |
| ) | |
| except Exception: | |
| self._available_ports.add(port) | |
| if proxy_forwarder is not None: | |
| try: | |
| proxy_forwarder.stop() | |
| except Exception: | |
| pass | |
| try: | |
| proc.terminate() | |
| proc.wait(timeout=5) | |
| except Exception: | |
| pass | |
| stderr_tail = self._read_stderr_tail(stderr_path) | |
| self._cleanup_stderr_log(stderr_path) | |
| if stderr_tail: | |
| logger.error( | |
| "Chromium 已监听 CDP 但 connect_over_cdp 失败。stderr tail:\n%s", | |
| stderr_tail, | |
| ) | |
| raise | |
| context = browser.contexts[0] if browser.contexts else None | |
| if context is None: | |
| await browser.close() | |
| self._available_ports.add(port) | |
| if proxy_forwarder is not None: | |
| try: | |
| proxy_forwarder.stop() | |
| except Exception: | |
| pass | |
| try: | |
| proc.terminate() | |
| proc.wait(timeout=5) | |
| except Exception: | |
| pass | |
| self._cleanup_stderr_log(stderr_path) | |
| raise RuntimeError("浏览器无默认 context") | |
| self._entries[proxy_key] = BrowserEntry( | |
| proc=proc, | |
| port=port, | |
| browser=browser, | |
| context=context, | |
| stderr_path=stderr_path, | |
| proxy_forwarder=proxy_forwarder, | |
| ) | |
| return context | |
| async def open_tab( | |
| self, | |
| proxy_key: ProxyKey, | |
| proxy_pass: str, | |
| type_name: str, | |
| account_id: str, | |
| create_page_fn: CreatePageFn, | |
| apply_auth_fn: ApplyAuthFn, | |
| ) -> TabRuntime: | |
| """在指定浏览器中创建一个 type tab,并绑定到 account。""" | |
| context = await self.ensure_browser(proxy_key, proxy_pass) | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| raise RuntimeError("ensure_browser 未创建 entry") | |
| existing = entry.tabs.get(type_name) | |
| if existing is not None: | |
| return existing | |
| logger.info( | |
| "[tab] opening proxy=%s type=%s account=%s reuse_blank=%s tab_count=%s active_requests=%s", | |
| proxy_key.fingerprint_id, | |
| type_name, | |
| account_id, | |
| bool(len(entry.tabs) == 0 and context.pages), | |
| len(entry.tabs), | |
| sum(tab.active_requests for tab in entry.tabs.values()), | |
| ) | |
| # 首个 tab 时复用 Chromium 默认空白页,避免多一个无用标签 | |
| reuse_page = ( | |
| context.pages[0] if (len(entry.tabs) == 0 and context.pages) else None | |
| ) | |
| try: | |
| page = await create_page_fn(context, reuse_page) | |
| except Exception as e: | |
| msg = str(e) | |
| normalized = msg.lower() | |
| if "target.createtarget" in normalized or "failed to open a new tab" in normalized: | |
| self._raise_browser_resource_invalid( | |
| proxy_key, | |
| detail=msg, | |
| helper_name="open_tab", | |
| stage="create_page", | |
| resource_hint="browser", | |
| type_name=type_name, | |
| account_id=account_id, | |
| ) | |
| raise | |
| try: | |
| await apply_auth_fn(context, page) | |
| except Exception as e: | |
| try: | |
| await page.close() | |
| except Exception: | |
| pass | |
| msg = str(e) | |
| normalized = msg.lower() | |
| if ( | |
| "target crashed" in normalized | |
| or "page has been closed" in normalized | |
| or "browser has been closed" in normalized | |
| or "has been disconnected" in normalized | |
| ): | |
| self._raise_browser_resource_invalid( | |
| proxy_key, | |
| detail=msg, | |
| helper_name="open_tab", | |
| stage="apply_auth", | |
| resource_hint="browser", | |
| page_url=getattr(page, "url", "") or "", | |
| type_name=type_name, | |
| account_id=account_id, | |
| ) | |
| raise | |
| tab = TabRuntime( | |
| type_name=type_name, | |
| page=page, | |
| account_id=account_id, | |
| ) | |
| entry.tabs[type_name] = tab | |
| entry.last_used_at = time.time() | |
| logger.info( | |
| "[tab] opened mode=%s proxy=%s type=%s account=%s", | |
| "proxy" if proxy_key.use_proxy else "direct", | |
| proxy_key.fingerprint_id, | |
| type_name, | |
| account_id, | |
| ) | |
| return tab | |
| async def switch_tab_account( | |
| self, | |
| proxy_key: ProxyKey, | |
| type_name: str, | |
| account_id: str, | |
| apply_auth_fn: ApplyAuthFn, | |
| ) -> bool: | |
| """ | |
| 在同一个 page 上切换账号。只有 drained 后(active_requests==0)才允许切号。 | |
| """ | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return False | |
| tab = entry.tabs.get(type_name) | |
| if tab is None or tab.active_requests != 0: | |
| return False | |
| tab.accepting_new = False | |
| tab.state = "switching" | |
| try: | |
| await apply_auth_fn(entry.context, tab.page) | |
| except Exception: | |
| tab.state = "draining" | |
| return False | |
| tab.account_id = account_id | |
| tab.accepting_new = True | |
| tab.state = "ready" | |
| tab.frozen_until = None | |
| tab.last_used_at = time.time() | |
| tab.sessions.clear() | |
| entry.last_used_at = time.time() | |
| logger.info( | |
| "[tab] switched account mode=%s proxy=%s type=%s account=%s", | |
| "proxy" if proxy_key.use_proxy else "direct", | |
| proxy_key.fingerprint_id, | |
| type_name, | |
| account_id, | |
| ) | |
| return True | |
| def acquire_tab( | |
| self, | |
| proxy_key: ProxyKey, | |
| type_name: str, | |
| max_concurrent: int, | |
| ) -> Page | None: | |
| """ | |
| 为一次请求占用 tab;tab 必须存在、可接新请求且未达到并发上限。 | |
| """ | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return None | |
| tab = entry.tabs.get(type_name) | |
| if tab is None: | |
| return None | |
| if not tab.accepting_new or tab.active_requests >= max_concurrent: | |
| return None | |
| tab.active_requests += 1 | |
| tab.last_used_at = time.time() | |
| entry.last_used_at = tab.last_used_at | |
| tab.state = "busy" | |
| return tab.page | |
| def release_tab(self, proxy_key: ProxyKey, type_name: str) -> None: | |
| """释放一次请求占用。""" | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return | |
| tab = entry.tabs.get(type_name) | |
| if tab is None: | |
| return | |
| if tab.active_requests > 0: | |
| tab.active_requests -= 1 | |
| tab.last_used_at = time.time() | |
| entry.last_used_at = tab.last_used_at | |
| if tab.active_requests == 0: | |
| if tab.accepting_new: | |
| tab.state = "ready" | |
| elif tab.frozen_until is not None: | |
| tab.state = "frozen" | |
| else: | |
| tab.state = "draining" | |
| def mark_tab_draining( | |
| self, | |
| proxy_key: ProxyKey, | |
| type_name: str, | |
| *, | |
| frozen_until: int | None = None, | |
| ) -> None: | |
| """禁止 tab 接受新请求,并标记为 draining/frozen。""" | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return | |
| tab = entry.tabs.get(type_name) | |
| if tab is None: | |
| return | |
| tab.accepting_new = False | |
| tab.frozen_until = frozen_until | |
| tab.last_used_at = time.time() | |
| entry.last_used_at = tab.last_used_at | |
| if frozen_until is not None: | |
| tab.state = "frozen" | |
| else: | |
| tab.state = "draining" | |
| def register_session( | |
| self, | |
| proxy_key: ProxyKey, | |
| type_name: str, | |
| session_id: str, | |
| ) -> None: | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return | |
| tab = entry.tabs.get(type_name) | |
| if tab is None: | |
| return | |
| tab.sessions.add(session_id) | |
| tab.last_used_at = time.time() | |
| entry.last_used_at = tab.last_used_at | |
| def unregister_session( | |
| self, | |
| proxy_key: ProxyKey, | |
| type_name: str, | |
| session_id: str, | |
| ) -> None: | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return | |
| tab = entry.tabs.get(type_name) | |
| if tab is None: | |
| return | |
| tab.sessions.discard(session_id) | |
| async def close_tab( | |
| self, | |
| proxy_key: ProxyKey, | |
| type_name: str, | |
| ) -> ClosedTabInfo | None: | |
| """关闭某个 type 的 tab,并返回需要失效的 session 列表。""" | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return None | |
| tab = entry.tabs.pop(type_name, None) | |
| if tab is None: | |
| return None | |
| try: | |
| await tab.page.close() | |
| except Exception: | |
| pass | |
| entry.last_used_at = time.time() | |
| logger.info( | |
| "[tab] closed mode=%s proxy=%s type=%s", | |
| "proxy" if proxy_key.use_proxy else "direct", | |
| proxy_key.fingerprint_id, | |
| type_name, | |
| ) | |
| return ClosedTabInfo( | |
| proxy_key=proxy_key, | |
| type_name=type_name, | |
| account_id=tab.account_id, | |
| session_ids=list(tab.sessions), | |
| ) | |
| async def close_browser(self, proxy_key: ProxyKey) -> list[ClosedTabInfo]: | |
| return await self._close_entry_async(proxy_key) | |
| async def _close_entry_async(self, proxy_key: ProxyKey) -> list[ClosedTabInfo]: | |
| entry = self._entries.get(proxy_key) | |
| if entry is None: | |
| return [] | |
| closed_tabs = [ | |
| ClosedTabInfo( | |
| proxy_key=proxy_key, | |
| type_name=type_name, | |
| account_id=tab.account_id, | |
| session_ids=list(tab.sessions), | |
| ) | |
| for type_name, tab in entry.tabs.items() | |
| ] | |
| for tab in list(entry.tabs.values()): | |
| try: | |
| await tab.page.close() | |
| except Exception: | |
| pass | |
| entry.tabs.clear() | |
| if entry.proxy_forwarder is not None: | |
| try: | |
| entry.proxy_forwarder.stop() | |
| except Exception as e: | |
| logger.warning("关闭本地代理转发时异常: %s", e) | |
| if entry.browser is not None: | |
| try: | |
| await entry.browser.close() | |
| except Exception as e: | |
| logger.warning("关闭 CDP 浏览器时异常: %s", e) | |
| try: | |
| entry.proc.terminate() | |
| entry.proc.wait(timeout=8) | |
| except subprocess.TimeoutExpired: | |
| entry.proc.kill() | |
| entry.proc.wait(timeout=3) | |
| except Exception as e: | |
| logger.warning("关闭浏览器进程时异常: %s", e) | |
| self._cleanup_stderr_log(entry.stderr_path) | |
| self._available_ports.add(entry.port) | |
| del self._entries[proxy_key] | |
| logger.info( | |
| "[browser] closed mode=%s proxy=%s", | |
| "proxy" if proxy_key.use_proxy else "direct", | |
| proxy_key.fingerprint_id, | |
| ) | |
| return closed_tabs | |
| async def collect_idle_browsers( | |
| self, | |
| *, | |
| idle_seconds: float, | |
| resident_browser_count: int, | |
| ) -> list[ClosedTabInfo]: | |
| """ | |
| 关闭空闲浏览器: | |
| - 浏览器下所有 tab 都没有活跃请求 | |
| - 所有 tab 均已空闲超过 idle_seconds | |
| - 当前浏览器数 > resident_browser_count | |
| """ | |
| if len(self._entries) <= resident_browser_count: | |
| return [] | |
| now = time.time() | |
| candidates: list[tuple[float, ProxyKey]] = [] | |
| for proxy_key, entry in self._entries.items(): | |
| if any(tab.active_requests > 0 for tab in entry.tabs.values()): | |
| continue | |
| if entry.tabs: | |
| last_tab_used = max(tab.last_used_at for tab in entry.tabs.values()) | |
| else: | |
| last_tab_used = entry.last_used_at | |
| if now - last_tab_used < idle_seconds: | |
| continue | |
| candidates.append((last_tab_used, proxy_key)) | |
| if not candidates: | |
| return [] | |
| closed: list[ClosedTabInfo] = [] | |
| max_close = max(0, len(self._entries) - resident_browser_count) | |
| for _, proxy_key in sorted(candidates, key=lambda item: item[0])[:max_close]: | |
| closed.extend(await self._close_entry_async(proxy_key)) | |
| return closed | |
| async def close_all(self) -> list[ClosedTabInfo]: | |
| """关闭全部浏览器和 tab。""" | |
| closed: list[ClosedTabInfo] = [] | |
| for proxy_key in list(self._entries.keys()): | |
| closed.extend(await self._close_entry_async(proxy_key)) | |
| return closed | |