| """ |
| 浏览器管理器:按 ProxyKey 管理浏览器进程;每个浏览器内每个 type 仅保留一个 tab。 |
| |
| 当前实现的职责: |
| |
| - 一个 ProxyKey 对应一个 Chromium 进程 |
| - 一个浏览器内,一个 type 只允许一个 page/tab |
| - tab 绑定一个 account,只有 drained 后才能切号 |
| - tab 可承载多个 session,并记录活跃请求数与最近使用时间 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| import os |
| import subprocess |
| import tempfile |
| import time |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import TYPE_CHECKING, Any, Callable, Coroutine |
|
|
| if TYPE_CHECKING: |
| from core.runtime.local_proxy_forwarder import LocalProxyForwarder |
|
|
| from playwright.async_api import Browser, BrowserContext, Page, async_playwright |
|
|
| from core.constants import CDP_PORT_RANGE, CHROMIUM_BIN, TIMEZONE, user_data_dir |
| from core.plugin.errors import BrowserResourceInvalidError |
| from core.runtime.keys import ProxyKey |
|
|
| logger = logging.getLogger(__name__) |
|
|
| CreatePageFn = Callable[[BrowserContext, Page | None], Coroutine[Any, Any, Page]] |
| ApplyAuthFn = Callable[[BrowserContext, Page], Coroutine[Any, Any, None]] |
|
|
|
|
| async def _wait_for_cdp( |
| host: str, |
| port: int, |
| max_attempts: int = 60, |
| interval: float = 2.0, |
| connect_timeout: float = 2.0, |
| ) -> bool: |
| for _ in range(max_attempts): |
| try: |
| _, writer = await asyncio.wait_for( |
| asyncio.open_connection(host, port), timeout=connect_timeout |
| ) |
| writer.close() |
| await writer.wait_closed() |
| return True |
| except (OSError, asyncio.TimeoutError): |
| await asyncio.sleep(interval) |
| return False |
|
|
|
|
| def _is_cdp_listening(port: int) -> bool: |
| import socket |
|
|
| try: |
| with socket.create_connection(("127.0.0.1", port), timeout=1.0): |
| pass |
| return True |
| except OSError: |
| return False |
|
|
|
|
| @dataclass |
| class TabRuntime: |
| """浏览器中的一个 type tab。""" |
|
|
| type_name: str |
| page: Page |
| account_id: str |
| active_requests: int = 0 |
| accepting_new: bool = True |
| state: str = "ready" |
| last_used_at: float = field(default_factory=time.time) |
| frozen_until: int | None = None |
| sessions: set[str] = field(default_factory=set) |
|
|
|
|
| @dataclass |
| class BrowserEntry: |
| """单个 ProxyKey 对应的浏览器运行时。""" |
|
|
| proc: subprocess.Popen[Any] |
| port: int |
| browser: Browser |
| context: BrowserContext |
| stderr_path: Path | None = None |
| tabs: dict[str, TabRuntime] = field(default_factory=dict) |
| last_used_at: float = field(default_factory=time.time) |
| proxy_forwarder: Any = None |
|
|
|
|
| @dataclass |
| class ClosedTabInfo: |
| """关闭 tab/browser 时回传的 session 清理信息。""" |
|
|
| proxy_key: ProxyKey |
| type_name: str |
| account_id: str |
| session_ids: list[str] |
|
|
|
|
| class BrowserManager: |
| """按代理组管理浏览器及其 type -> tab 映射。""" |
|
|
| def __init__( |
| self, |
| chromium_bin: str = CHROMIUM_BIN, |
| headless: bool = False, |
| no_sandbox: bool = False, |
| disable_gpu: bool = False, |
| disable_gpu_sandbox: bool = False, |
| port_range: list[int] | None = None, |
| cdp_wait_max_attempts: int = 90, |
| cdp_wait_interval_seconds: float = 2.0, |
| cdp_wait_connect_timeout_seconds: float = 2.0, |
| ) -> None: |
| self._chromium_bin = chromium_bin |
| self._headless = headless |
| self._no_sandbox = no_sandbox |
| self._disable_gpu = disable_gpu |
| self._disable_gpu_sandbox = disable_gpu_sandbox |
| self._port_range = port_range or list(CDP_PORT_RANGE) |
| self._entries: dict[ProxyKey, BrowserEntry] = {} |
| self._available_ports: set[int] = set(self._port_range) |
| self._playwright: Any = None |
| self._cdp_wait_max_attempts = max(1, int(cdp_wait_max_attempts)) |
| self._cdp_wait_interval_seconds = max(0.05, float(cdp_wait_interval_seconds)) |
| self._cdp_wait_connect_timeout_seconds = max( |
| 0.2, float(cdp_wait_connect_timeout_seconds) |
| ) |
|
|
| def _stderr_log_path(self, proxy_key: ProxyKey, port: int) -> Path: |
| log_dir = Path(tempfile.gettempdir()) / "web2api-browser-logs" |
| log_dir.mkdir(parents=True, exist_ok=True) |
| return log_dir / ( |
| f"{proxy_key.fingerprint_id}-{port}-{int(time.time())}.stderr.log" |
| ) |
|
|
| @staticmethod |
| def _read_stderr_tail(stderr_path: Path | None, max_chars: int = 4000) -> str: |
| if stderr_path is None or not stderr_path.exists(): |
| return "" |
| try: |
| content = stderr_path.read_text(encoding="utf-8", errors="replace") |
| except Exception: |
| return "" |
| content = content.strip() |
| if not content: |
| return "" |
| return content[-max_chars:] |
|
|
| @staticmethod |
| def _cleanup_stderr_log(stderr_path: Path | None) -> None: |
| if stderr_path is None: |
| return |
| try: |
| stderr_path.unlink(missing_ok=True) |
| except Exception: |
| pass |
|
|
| def current_proxy_keys(self) -> list[ProxyKey]: |
| return list(self._entries.keys()) |
|
|
| def browser_count(self) -> int: |
| return len(self._entries) |
|
|
| def list_browser_entries(self) -> list[tuple[ProxyKey, BrowserEntry]]: |
| return list(self._entries.items()) |
|
|
| def get_browser_entry(self, proxy_key: ProxyKey) -> BrowserEntry | None: |
| return self._entries.get(proxy_key) |
|
|
| def get_tab(self, proxy_key: ProxyKey, type_name: str) -> TabRuntime | None: |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return None |
| return entry.tabs.get(type_name) |
|
|
| def browser_load(self, proxy_key: ProxyKey) -> int: |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return 0 |
| return sum(tab.active_requests for tab in entry.tabs.values()) |
|
|
| def browser_diagnostics(self, proxy_key: ProxyKey) -> dict[str, Any]: |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return { |
| "browser_present": False, |
| "proc_alive": False, |
| "cdp_listening": False, |
| "stderr_tail": "", |
| "tab_count": 0, |
| "active_requests": 0, |
| "tabs": [], |
| } |
| tabs = [ |
| { |
| "type": type_name, |
| "state": tab.state, |
| "accepting_new": tab.accepting_new, |
| "active_requests": tab.active_requests, |
| "session_count": len(tab.sessions), |
| } |
| for type_name, tab in entry.tabs.items() |
| ] |
| return { |
| "browser_present": True, |
| "proc_alive": entry.proc.poll() is None, |
| "cdp_listening": _is_cdp_listening(entry.port), |
| "stderr_tail": self._read_stderr_tail(entry.stderr_path), |
| "tab_count": len(entry.tabs), |
| "active_requests": sum(tab.active_requests for tab in entry.tabs.values()), |
| "tabs": tabs, |
| } |
|
|
| def _raise_browser_resource_invalid( |
| self, |
| proxy_key: ProxyKey, |
| *, |
| detail: str, |
| helper_name: str, |
| stage: str, |
| resource_hint: str = "browser", |
| request_url: str = "", |
| page_url: str = "", |
| request_id: str | None = None, |
| stream_phase: str | None = None, |
| type_name: str | None = None, |
| account_id: str | None = None, |
| ) -> None: |
| diagnostics = self.browser_diagnostics(proxy_key) |
| logger.warning( |
| "[browser-resource-invalid] helper=%s stage=%s proxy=%s resource=%s request_id=%s type=%s account=%s proc_alive=%s cdp_listening=%s tab_count=%s active_requests=%s stderr_tail=%s detail=%s", |
| helper_name, |
| stage, |
| proxy_key.fingerprint_id, |
| resource_hint, |
| request_id, |
| type_name, |
| account_id, |
| diagnostics.get("proc_alive"), |
| diagnostics.get("cdp_listening"), |
| diagnostics.get("tab_count"), |
| diagnostics.get("active_requests"), |
| diagnostics.get("stderr_tail"), |
| detail, |
| ) |
| raise BrowserResourceInvalidError( |
| detail, |
| helper_name=helper_name, |
| operation="browser_manager", |
| stage=stage, |
| resource_hint=resource_hint, |
| request_url=request_url, |
| page_url=page_url, |
| request_id=request_id, |
| stream_phase=stream_phase, |
| proxy_key=proxy_key, |
| type_name=type_name, |
| account_id=account_id, |
| ) |
|
|
| def touch_browser(self, proxy_key: ProxyKey) -> None: |
| entry = self._entries.get(proxy_key) |
| if entry is not None: |
| entry.last_used_at = time.time() |
|
|
| def _launch_process( |
| self, |
| proxy_key: ProxyKey, |
| proxy_pass: str, |
| port: int, |
| ) -> tuple[subprocess.Popen[Any], Path, LocalProxyForwarder | None]: |
| """启动 Chromium 进程(代理时使用本地转发鉴权,无扩展),使用指定 port。""" |
| udd = user_data_dir(proxy_key.fingerprint_id) |
| udd.mkdir(parents=True, exist_ok=True) |
|
|
| if not Path(self._chromium_bin).exists(): |
| raise RuntimeError(f"Chromium 不存在: {self._chromium_bin}") |
|
|
| args = [ |
| self._chromium_bin, |
| f"--remote-debugging-port={port}", |
| f"--fingerprint={proxy_key.fingerprint_id}", |
| "--fingerprint-platform=windows", |
| "--fingerprint-brand=Edge", |
| f"--user-data-dir={udd}", |
| f"--timezone={proxy_key.timezone or TIMEZONE}", |
| "--force-webrtc-ip-handling-policy", |
| "--webrtc-ip-handling-policy=disable_non_proxied_udp", |
| "--disable-features=AsyncDNS", |
| "--disable-dev-shm-usage", |
| "--no-first-run", |
| "--no-default-browser-check", |
| |
| "--renderer-process-limit=1", |
| "--disable-extensions", |
| "--disable-background-networking", |
| "--disable-component-update", |
| "--disable-sync", |
| "--disable-translate", |
| "--disable-features=MediaRouter,TranslateUI", |
| "--js-flags=--max-old-space-size=256", |
| ] |
| proxy_forwarder = None |
| if proxy_key.use_proxy: |
| from core.runtime.local_proxy_forwarder import ( |
| LocalProxyForwarder, |
| UpstreamProxy, |
| parse_proxy_server, |
| ) |
|
|
| upstream_host, upstream_port = parse_proxy_server(proxy_key.proxy_host) |
| upstream = UpstreamProxy( |
| host=upstream_host, |
| port=upstream_port, |
| username=proxy_key.proxy_user, |
| password=proxy_pass, |
| ) |
| proxy_forwarder = LocalProxyForwarder( |
| upstream, |
| listen_host="127.0.0.1", |
| listen_port=0, |
| on_log=lambda msg: logger.debug("[proxy] %s", msg), |
| ) |
| proxy_forwarder.start() |
| args.append(f"--proxy-server={proxy_forwarder.proxy_url}") |
| if self._headless: |
| args.extend( |
| [ |
| "--headless=new", |
| "--window-size=1920,1080", |
| ] |
| ) |
| if self._headless or self._disable_gpu: |
| args.append("--disable-gpu") |
| if self._disable_gpu_sandbox: |
| args.append("--disable-gpu-sandbox") |
| if self._no_sandbox: |
| args.extend( |
| [ |
| "--no-sandbox", |
| "--disable-setuid-sandbox", |
| ] |
| ) |
| env = os.environ.copy() |
| env["NODE_OPTIONS"] = ( |
| env.get("NODE_OPTIONS") or "" |
| ).strip() + " --no-deprecation" |
| env.setdefault("DBUS_SESSION_BUS_ADDRESS", "/dev/null") |
| stderr_path = self._stderr_log_path(proxy_key, port) |
| stderr_fp = stderr_path.open("ab") |
| try: |
| proc = subprocess.Popen( |
| args, |
| stdin=subprocess.DEVNULL, |
| stdout=subprocess.DEVNULL, |
| stderr=stderr_fp, |
| env=env, |
| ) |
| finally: |
| stderr_fp.close() |
| return proc, stderr_path, proxy_forwarder |
|
|
| async def ensure_browser( |
| self, |
| proxy_key: ProxyKey, |
| proxy_pass: str, |
| ) -> BrowserContext: |
| """ |
| 确保存在对应 proxy_key 的浏览器;若已有且存活则直接复用。 |
| """ |
| entry = self._entries.get(proxy_key) |
| if entry is not None: |
| if entry.proc.poll() is not None or not _is_cdp_listening(entry.port): |
| await self._close_entry_async(proxy_key) |
| else: |
| entry.last_used_at = time.time() |
| return entry.context |
|
|
| if not self._available_ports: |
| raise RuntimeError( |
| "无可用 CDP 端口,当前并发浏览器数已达上限,请稍后重试或增大 cdp_port_count" |
| ) |
| port = self._available_ports.pop() |
| proc, stderr_path, proxy_forwarder = self._launch_process( |
| proxy_key, proxy_pass, port |
| ) |
| logger.info( |
| "已启动 Chromium PID=%s port=%s mode=%s headless=%s no_sandbox=%s disable_gpu=%s disable_gpu_sandbox=%s,等待 CDP 就绪...", |
| proc.pid, |
| port, |
| "proxy" if proxy_key.use_proxy else "direct", |
| self._headless, |
| self._no_sandbox, |
| self._disable_gpu, |
| self._disable_gpu_sandbox, |
| ) |
| ok = await _wait_for_cdp( |
| "127.0.0.1", |
| port, |
| max_attempts=self._cdp_wait_max_attempts, |
| interval=self._cdp_wait_interval_seconds, |
| connect_timeout=self._cdp_wait_connect_timeout_seconds, |
| ) |
| if not ok: |
| self._available_ports.add(port) |
| if proxy_forwarder is not None: |
| try: |
| proxy_forwarder.stop() |
| except Exception: |
| pass |
| try: |
| proc.terminate() |
| proc.wait(timeout=5) |
| except Exception: |
| pass |
| stderr_tail = self._read_stderr_tail(stderr_path) |
| self._cleanup_stderr_log(stderr_path) |
| if stderr_tail: |
| logger.error( |
| "Chromium 启动失败,CDP 未就绪。stderr tail:\n%s", |
| stderr_tail, |
| ) |
| raise RuntimeError("CDP 未在预期时间内就绪") |
|
|
| if self._playwright is None: |
| self._playwright = await async_playwright().start() |
| endpoint = f"http://127.0.0.1:{port}" |
| try: |
| browser = await self._playwright.chromium.connect_over_cdp( |
| endpoint, timeout=10000 |
| ) |
| except Exception: |
| self._available_ports.add(port) |
| if proxy_forwarder is not None: |
| try: |
| proxy_forwarder.stop() |
| except Exception: |
| pass |
| try: |
| proc.terminate() |
| proc.wait(timeout=5) |
| except Exception: |
| pass |
| stderr_tail = self._read_stderr_tail(stderr_path) |
| self._cleanup_stderr_log(stderr_path) |
| if stderr_tail: |
| logger.error( |
| "Chromium 已监听 CDP 但 connect_over_cdp 失败。stderr tail:\n%s", |
| stderr_tail, |
| ) |
| raise |
| context = browser.contexts[0] if browser.contexts else None |
| if context is None: |
| await browser.close() |
| self._available_ports.add(port) |
| if proxy_forwarder is not None: |
| try: |
| proxy_forwarder.stop() |
| except Exception: |
| pass |
| try: |
| proc.terminate() |
| proc.wait(timeout=5) |
| except Exception: |
| pass |
| self._cleanup_stderr_log(stderr_path) |
| raise RuntimeError("浏览器无默认 context") |
| self._entries[proxy_key] = BrowserEntry( |
| proc=proc, |
| port=port, |
| browser=browser, |
| context=context, |
| stderr_path=stderr_path, |
| proxy_forwarder=proxy_forwarder, |
| ) |
| return context |
|
|
| async def open_tab( |
| self, |
| proxy_key: ProxyKey, |
| proxy_pass: str, |
| type_name: str, |
| account_id: str, |
| create_page_fn: CreatePageFn, |
| apply_auth_fn: ApplyAuthFn, |
| ) -> TabRuntime: |
| """在指定浏览器中创建一个 type tab,并绑定到 account。""" |
| context = await self.ensure_browser(proxy_key, proxy_pass) |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| raise RuntimeError("ensure_browser 未创建 entry") |
| existing = entry.tabs.get(type_name) |
| if existing is not None: |
| return existing |
|
|
| logger.info( |
| "[tab] opening proxy=%s type=%s account=%s reuse_blank=%s tab_count=%s active_requests=%s", |
| proxy_key.fingerprint_id, |
| type_name, |
| account_id, |
| bool(len(entry.tabs) == 0 and context.pages), |
| len(entry.tabs), |
| sum(tab.active_requests for tab in entry.tabs.values()), |
| ) |
| |
| reuse_page = ( |
| context.pages[0] if (len(entry.tabs) == 0 and context.pages) else None |
| ) |
| try: |
| page = await create_page_fn(context, reuse_page) |
| except Exception as e: |
| msg = str(e) |
| normalized = msg.lower() |
| if "target.createtarget" in normalized or "failed to open a new tab" in normalized: |
| self._raise_browser_resource_invalid( |
| proxy_key, |
| detail=msg, |
| helper_name="open_tab", |
| stage="create_page", |
| resource_hint="browser", |
| type_name=type_name, |
| account_id=account_id, |
| ) |
| raise |
| try: |
| await apply_auth_fn(context, page) |
| except Exception as e: |
| try: |
| await page.close() |
| except Exception: |
| pass |
| msg = str(e) |
| normalized = msg.lower() |
| if ( |
| "target crashed" in normalized |
| or "page has been closed" in normalized |
| or "browser has been closed" in normalized |
| or "has been disconnected" in normalized |
| ): |
| self._raise_browser_resource_invalid( |
| proxy_key, |
| detail=msg, |
| helper_name="open_tab", |
| stage="apply_auth", |
| resource_hint="browser", |
| page_url=getattr(page, "url", "") or "", |
| type_name=type_name, |
| account_id=account_id, |
| ) |
| raise |
|
|
| tab = TabRuntime( |
| type_name=type_name, |
| page=page, |
| account_id=account_id, |
| ) |
| entry.tabs[type_name] = tab |
| entry.last_used_at = time.time() |
| logger.info( |
| "[tab] opened mode=%s proxy=%s type=%s account=%s", |
| "proxy" if proxy_key.use_proxy else "direct", |
| proxy_key.fingerprint_id, |
| type_name, |
| account_id, |
| ) |
| return tab |
|
|
| async def switch_tab_account( |
| self, |
| proxy_key: ProxyKey, |
| type_name: str, |
| account_id: str, |
| apply_auth_fn: ApplyAuthFn, |
| ) -> bool: |
| """ |
| 在同一个 page 上切换账号。只有 drained 后(active_requests==0)才允许切号。 |
| """ |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return False |
| tab = entry.tabs.get(type_name) |
| if tab is None or tab.active_requests != 0: |
| return False |
|
|
| tab.accepting_new = False |
| tab.state = "switching" |
| try: |
| await apply_auth_fn(entry.context, tab.page) |
| except Exception: |
| tab.state = "draining" |
| return False |
|
|
| tab.account_id = account_id |
| tab.accepting_new = True |
| tab.state = "ready" |
| tab.frozen_until = None |
| tab.last_used_at = time.time() |
| tab.sessions.clear() |
| entry.last_used_at = time.time() |
| logger.info( |
| "[tab] switched account mode=%s proxy=%s type=%s account=%s", |
| "proxy" if proxy_key.use_proxy else "direct", |
| proxy_key.fingerprint_id, |
| type_name, |
| account_id, |
| ) |
| return True |
|
|
| def acquire_tab( |
| self, |
| proxy_key: ProxyKey, |
| type_name: str, |
| max_concurrent: int, |
| ) -> Page | None: |
| """ |
| 为一次请求占用 tab;tab 必须存在、可接新请求且未达到并发上限。 |
| """ |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return None |
| tab = entry.tabs.get(type_name) |
| if tab is None: |
| return None |
| if not tab.accepting_new or tab.active_requests >= max_concurrent: |
| return None |
| tab.active_requests += 1 |
| tab.last_used_at = time.time() |
| entry.last_used_at = tab.last_used_at |
| tab.state = "busy" |
| return tab.page |
|
|
| def release_tab(self, proxy_key: ProxyKey, type_name: str) -> None: |
| """释放一次请求占用。""" |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return |
| tab = entry.tabs.get(type_name) |
| if tab is None: |
| return |
| if tab.active_requests > 0: |
| tab.active_requests -= 1 |
| tab.last_used_at = time.time() |
| entry.last_used_at = tab.last_used_at |
| if tab.active_requests == 0: |
| if tab.accepting_new: |
| tab.state = "ready" |
| elif tab.frozen_until is not None: |
| tab.state = "frozen" |
| else: |
| tab.state = "draining" |
|
|
| def mark_tab_draining( |
| self, |
| proxy_key: ProxyKey, |
| type_name: str, |
| *, |
| frozen_until: int | None = None, |
| ) -> None: |
| """禁止 tab 接受新请求,并标记为 draining/frozen。""" |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return |
| tab = entry.tabs.get(type_name) |
| if tab is None: |
| return |
| tab.accepting_new = False |
| tab.frozen_until = frozen_until |
| tab.last_used_at = time.time() |
| entry.last_used_at = tab.last_used_at |
| if frozen_until is not None: |
| tab.state = "frozen" |
| else: |
| tab.state = "draining" |
|
|
| def register_session( |
| self, |
| proxy_key: ProxyKey, |
| type_name: str, |
| session_id: str, |
| ) -> None: |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return |
| tab = entry.tabs.get(type_name) |
| if tab is None: |
| return |
| tab.sessions.add(session_id) |
| tab.last_used_at = time.time() |
| entry.last_used_at = tab.last_used_at |
|
|
| def unregister_session( |
| self, |
| proxy_key: ProxyKey, |
| type_name: str, |
| session_id: str, |
| ) -> None: |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return |
| tab = entry.tabs.get(type_name) |
| if tab is None: |
| return |
| tab.sessions.discard(session_id) |
|
|
| async def close_tab( |
| self, |
| proxy_key: ProxyKey, |
| type_name: str, |
| ) -> ClosedTabInfo | None: |
| """关闭某个 type 的 tab,并返回需要失效的 session 列表。""" |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return None |
| tab = entry.tabs.pop(type_name, None) |
| if tab is None: |
| return None |
| try: |
| await tab.page.close() |
| except Exception: |
| pass |
| entry.last_used_at = time.time() |
| logger.info( |
| "[tab] closed mode=%s proxy=%s type=%s", |
| "proxy" if proxy_key.use_proxy else "direct", |
| proxy_key.fingerprint_id, |
| type_name, |
| ) |
| return ClosedTabInfo( |
| proxy_key=proxy_key, |
| type_name=type_name, |
| account_id=tab.account_id, |
| session_ids=list(tab.sessions), |
| ) |
|
|
| async def close_browser(self, proxy_key: ProxyKey) -> list[ClosedTabInfo]: |
| return await self._close_entry_async(proxy_key) |
|
|
| async def _close_entry_async(self, proxy_key: ProxyKey) -> list[ClosedTabInfo]: |
| entry = self._entries.get(proxy_key) |
| if entry is None: |
| return [] |
|
|
| closed_tabs = [ |
| ClosedTabInfo( |
| proxy_key=proxy_key, |
| type_name=type_name, |
| account_id=tab.account_id, |
| session_ids=list(tab.sessions), |
| ) |
| for type_name, tab in entry.tabs.items() |
| ] |
| for tab in list(entry.tabs.values()): |
| try: |
| await tab.page.close() |
| except Exception: |
| pass |
| entry.tabs.clear() |
| if entry.proxy_forwarder is not None: |
| try: |
| entry.proxy_forwarder.stop() |
| except Exception as e: |
| logger.warning("关闭本地代理转发时异常: %s", e) |
| if entry.browser is not None: |
| try: |
| await entry.browser.close() |
| except Exception as e: |
| logger.warning("关闭 CDP 浏览器时异常: %s", e) |
| try: |
| entry.proc.terminate() |
| entry.proc.wait(timeout=8) |
| except subprocess.TimeoutExpired: |
| entry.proc.kill() |
| entry.proc.wait(timeout=3) |
| except Exception as e: |
| logger.warning("关闭浏览器进程时异常: %s", e) |
| self._cleanup_stderr_log(entry.stderr_path) |
| self._available_ports.add(entry.port) |
| del self._entries[proxy_key] |
| logger.info( |
| "[browser] closed mode=%s proxy=%s", |
| "proxy" if proxy_key.use_proxy else "direct", |
| proxy_key.fingerprint_id, |
| ) |
| return closed_tabs |
|
|
| async def collect_idle_browsers( |
| self, |
| *, |
| idle_seconds: float, |
| resident_browser_count: int, |
| ) -> list[ClosedTabInfo]: |
| """ |
| 关闭空闲浏览器: |
| |
| - 浏览器下所有 tab 都没有活跃请求 |
| - 所有 tab 均已空闲超过 idle_seconds |
| - 当前浏览器数 > resident_browser_count |
| """ |
| if len(self._entries) <= resident_browser_count: |
| return [] |
|
|
| now = time.time() |
| candidates: list[tuple[float, ProxyKey]] = [] |
| for proxy_key, entry in self._entries.items(): |
| if any(tab.active_requests > 0 for tab in entry.tabs.values()): |
| continue |
| if entry.tabs: |
| last_tab_used = max(tab.last_used_at for tab in entry.tabs.values()) |
| else: |
| last_tab_used = entry.last_used_at |
| if now - last_tab_used < idle_seconds: |
| continue |
| candidates.append((last_tab_used, proxy_key)) |
|
|
| if not candidates: |
| return [] |
|
|
| closed: list[ClosedTabInfo] = [] |
| max_close = max(0, len(self._entries) - resident_browser_count) |
| for _, proxy_key in sorted(candidates, key=lambda item: item[0])[:max_close]: |
| closed.extend(await self._close_entry_async(proxy_key)) |
| return closed |
|
|
| async def close_all(self) -> list[ClosedTabInfo]: |
| """关闭全部浏览器和 tab。""" |
| closed: list[ClosedTabInfo] = [] |
| for proxy_key in list(self._entries.keys()): |
| closed.extend(await self._close_entry_async(proxy_key)) |
| return closed |
|
|