| """ |
| 插件通用能力:页面复用、Cookie 登录、在浏览器内发起 fetch 并流式回传。 |
| 接入方只需实现站点特有的 URL/请求体/SSE 解析,其余复用此处逻辑。 |
| """ |
|
|
| import asyncio |
| import base64 |
| import codecs |
| import json |
| import logging |
| from collections.abc import Callable |
| from typing import Any, AsyncIterator |
| from urllib.parse import urlparse |
|
|
| from curl_cffi import requests as curl_requests |
| from playwright.async_api import BrowserContext, Page |
|
|
| from core.plugin.errors import AccountFrozenError, BrowserResourceInvalidError |
|
|
| ParseSseEvent = Callable[[str], tuple[list[str], str | None, str | None]] |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _BROWSER_RESOURCE_ERROR_PATTERNS: tuple[tuple[str, str, str], ...] = ( |
| ("target crashed", "page", "target_crashed"), |
| ("page crashed", "browser", "page_crashed"), |
| ("execution context was destroyed", "page", "execution_context_destroyed"), |
| ("navigating frame was detached", "page", "frame_detached"), |
| ("frame was detached", "page", "frame_detached"), |
| ("session closed. most likely the page has been closed", "page", "page_closed"), |
| ("most likely the page has been closed", "page", "page_closed"), |
| ("browser context has been closed", "page", "context_closed"), |
| ("context has been closed", "page", "context_closed"), |
| ("target page, context or browser has been closed", "page", "page_or_browser_closed"), |
| ("page has been closed", "page", "page_closed"), |
| ("target closed", "page", "target_closed"), |
| ("browser has been closed", "browser", "browser_closed"), |
| ("browser closed", "browser", "browser_closed"), |
| ("connection closed", "browser", "browser_disconnected"), |
| ("connection terminated", "browser", "browser_disconnected"), |
| ("has been disconnected", "browser", "browser_disconnected"), |
| |
| ("err_tunnel_connection_failed", "browser", "proxy_tunnel_failed"), |
| ("err_proxy_connection_failed", "browser", "proxy_connection_failed"), |
| ("err_connection_refused", "browser", "connection_refused"), |
| ("err_connection_timed_out", "browser", "connection_timed_out"), |
| ("err_connection_reset", "browser", "connection_reset"), |
| ) |
|
|
|
|
| def _truncate_for_log(value: str, limit: int = 240) -> str: |
| if len(value) <= limit: |
| return value |
| return value[:limit] + "..." |
|
|
|
|
|
|
| def _safe_page_url(page: Page | None) -> str: |
| if page is None: |
| return "" |
| try: |
| return page.url or "" |
| except Exception: |
| return "" |
|
|
|
|
|
|
| def _evaluate_timeout_seconds(timeout_ms: int, grace_seconds: float = 5.0) -> float: |
| return max(5.0, float(timeout_ms) / 1000.0 + grace_seconds) |
|
|
|
|
|
|
| def _consume_background_task_result(task: asyncio.Task[Any]) -> None: |
| try: |
| if not task.cancelled(): |
| task.exception() |
| except Exception: |
| pass |
|
|
|
|
|
|
| def _classify_browser_resource_error( |
| exc: Exception, |
| *, |
| helper_name: str, |
| operation: str, |
| stage: str, |
| request_url: str, |
| page: Page | None, |
| request_id: str | None = None, |
| stream_phase: str | None = None, |
| ) -> BrowserResourceInvalidError | None: |
| message = str(exc).strip() or exc.__class__.__name__ |
| normalized = message.lower() |
| for pattern, resource_hint, reason in _BROWSER_RESOURCE_ERROR_PATTERNS: |
| if pattern not in normalized: |
| continue |
| page_url = _safe_page_url(page) |
| logger.warning( |
| "[browser-resource-invalid] helper=%s operation=%s stage=%s reason=%s resource=%s request_id=%s stream_phase=%s request_url=%s page.url=%s err=%s", |
| helper_name, |
| operation, |
| stage, |
| reason, |
| resource_hint, |
| request_id, |
| stream_phase, |
| _truncate_for_log(request_url), |
| _truncate_for_log(page_url), |
| _truncate_for_log(message, 400), |
| ) |
| return BrowserResourceInvalidError( |
| message, |
| helper_name=helper_name, |
| operation=operation, |
| stage=stage, |
| resource_hint=resource_hint, |
| request_url=request_url, |
| page_url=page_url, |
| request_id=request_id, |
| stream_phase=stream_phase, |
| ) |
| return None |
|
|
| |
| |
| PAGE_FETCH_STREAM_JS = """ |
| async ({ url, body, bindingName, timeoutMs }) => { |
| const send = globalThis[bindingName]; |
| const done = "__done__"; |
| const errPrefix = "__error__:"; |
| try { |
| const ctrl = new AbortController(); |
| const effectiveTimeoutMs = timeoutMs || 90000; |
| const t = setTimeout(() => ctrl.abort(), effectiveTimeoutMs); |
| const resp = await fetch(url, { |
| method: "POST", |
| body: body, |
| headers: { "Content-Type": "application/json", "Accept": "text/event-stream" }, |
| credentials: "include", |
| signal: ctrl.signal |
| }); |
| clearTimeout(t); |
| if (!resp.ok) { |
| const errText = await resp.text(); |
| const errSnippet = (errText && errText.length > 800) ? errText.slice(0, 800) + "..." : (errText || ""); |
| await send(errPrefix + "HTTP " + resp.status + " " + errSnippet); |
| await send(done); |
| return; |
| } |
| if (!resp.body) { |
| await send(errPrefix + "No response body"); |
| await send(done); |
| return; |
| } |
| const headersObj = {}; |
| resp.headers.forEach((v, k) => { headersObj[k] = v; }); |
| await send("__headers__:" + JSON.stringify(headersObj)); |
| const reader = resp.body.getReader(); |
| const dec = new TextDecoder(); |
| while (true) { |
| const { done: streamDone, value } = await reader.read(); |
| if (streamDone) break; |
| await send(dec.decode(value)); |
| } |
| } catch (e) { |
| const msg = e.name === "AbortError" ? `请求超时(${Math.floor(effectiveTimeoutMs / 1000)}s)` : (e.message || String(e)); |
| await send(errPrefix + msg); |
| } |
| await send(done); |
| } |
| """ |
|
|
|
|
| PAGE_FETCH_JSON_JS = """ |
| async ({ url, method, body, headers, timeoutMs }) => { |
| const ctrl = new AbortController(); |
| const t = setTimeout(() => ctrl.abort(), timeoutMs || 15000); |
| try { |
| const resp = await fetch(url, { |
| method: method || "GET", |
| body: body ?? undefined, |
| headers: headers || {}, |
| credentials: "include", |
| signal: ctrl.signal |
| }); |
| clearTimeout(t); |
| const text = await resp.text(); |
| const headersObj = {}; |
| resp.headers.forEach((v, k) => { headersObj[k] = v; }); |
| return { |
| ok: resp.ok, |
| status: resp.status, |
| statusText: resp.statusText, |
| url: resp.url, |
| redirected: resp.redirected, |
| headers: headersObj, |
| text, |
| }; |
| } catch (e) { |
| clearTimeout(t); |
| const msg = e.name === "AbortError" ? `请求超时(${Math.floor((timeoutMs || 15000) / 1000)}s)` : (e.message || String(e)); |
| return { error: msg }; |
| } |
| } |
| """ |
|
|
|
|
| PAGE_FETCH_MULTIPART_JS = """ |
| async ({ url, filename, mimeType, dataBase64, fieldName, extraFields, timeoutMs }) => { |
| const ctrl = new AbortController(); |
| const t = setTimeout(() => ctrl.abort(), timeoutMs || 30000); |
| try { |
| const binary = atob(dataBase64); |
| const bytes = new Uint8Array(binary.length); |
| for (let i = 0; i < binary.length; i += 1) { |
| bytes[i] = binary.charCodeAt(i); |
| } |
| const form = new FormData(); |
| if (extraFields) { |
| Object.entries(extraFields).forEach(([k, v]) => { |
| if (v !== undefined && v !== null) form.append(k, String(v)); |
| }); |
| } |
| const file = new File([bytes], filename, { type: mimeType || "application/octet-stream" }); |
| form.append(fieldName || "file", file); |
| const resp = await fetch(url, { |
| method: "POST", |
| body: form, |
| credentials: "include", |
| signal: ctrl.signal |
| }); |
| clearTimeout(t); |
| const text = await resp.text(); |
| const headersObj = {}; |
| resp.headers.forEach((v, k) => { headersObj[k] = v; }); |
| return { |
| ok: resp.ok, |
| status: resp.status, |
| statusText: resp.statusText, |
| url: resp.url, |
| redirected: resp.redirected, |
| headers: headersObj, |
| text, |
| }; |
| } catch (e) { |
| clearTimeout(t); |
| const msg = e.name === "AbortError" ? `请求超时(${Math.floor((timeoutMs || 30000) / 1000)}s)` : (e.message || String(e)); |
| return { error: msg }; |
| } |
| } |
| """ |
|
|
|
|
| async def ensure_page_for_site( |
| context: BrowserContext, |
| url_contains: str, |
| start_url: str, |
| *, |
| timeout: int = 45000, |
| ) -> Page: |
| """ |
| 若已有页面 URL 包含 url_contains 则复用,否则 new_page 并 goto start_url。 |
| 接入方只需提供「站点特征」和「入口 URL」。 |
| """ |
| if context.pages: |
| for p in context.pages: |
| if url_contains in (p.url or ""): |
| return p |
| page = await context.new_page() |
| await page.goto(start_url, wait_until="commit", timeout=timeout) |
| return page |
|
|
|
|
| async def create_page_for_site( |
| context: BrowserContext, |
| start_url: str, |
| *, |
| reuse_page: Page | None = None, |
| timeout: int = 45000, |
| ) -> Page: |
| """ |
| 若传入 reuse_page 则在其上 goto start_url,否则 new_page 再 goto。 |
| 用于复用浏览器默认空白页或 page 池的初始化与补回。 |
| """ |
| if reuse_page is not None: |
| await reuse_page.goto(start_url, wait_until="commit", timeout=timeout) |
| return reuse_page |
| page = await context.new_page() |
| await page.goto(start_url, wait_until="commit", timeout=timeout) |
| return page |
|
|
|
|
| def _cookie_domain_matches(cookie_domain: str, site_domain: str) -> bool: |
| """判断 cookie 的 domain 是否属于站点 domain(如 .claude.ai 与 claude.ai 视为同一域)。""" |
| a = cookie_domain if cookie_domain.startswith(".") else f".{cookie_domain}" |
| b = site_domain if site_domain.startswith(".") else f".{site_domain}" |
| return a == b |
|
|
|
|
| def _cookie_to_set_param(c: Any) -> dict[str, str]: |
| """将 context.cookies() 返回的项转为 add_cookies 接受的 SetCookieParam 格式。""" |
| return { |
| "name": c["name"], |
| "value": c["value"], |
| "domain": c.get("domain") or "", |
| "path": c.get("path") or "/", |
| } |
|
|
|
|
| async def clear_cookies_for_domain( |
| context: BrowserContext, |
| site_domain: str, |
| ) -> None: |
| """清除 context 内属于指定站点域的所有 cookie,保留其他域。""" |
| cookies = await context.cookies() |
| keep = [ |
| c |
| for c in cookies |
| if not _cookie_domain_matches(c.get("domain", ""), site_domain) |
| ] |
| await context.clear_cookies() |
| if keep: |
| await context.add_cookies([_cookie_to_set_param(c) for c in keep]) |
| logger.info( |
| "[auth] cleared cookies for domain=%s (kept %s cookies)", site_domain, len(keep) |
| ) |
|
|
|
|
| async def clear_page_storage_for_switch(page: Page) -> None: |
| """切号前清空当前页面的 localStorage(当前 origin)。""" |
| try: |
| await page.evaluate("() => { window.localStorage.clear(); }") |
| logger.info("[auth] cleared localStorage for switch") |
| except Exception as e: |
| logger.warning("[auth] clear localStorage failed (page may be detached): %s", e) |
|
|
|
|
| async def safe_page_reload(page: Page, url: str | None = None) -> None: |
| """安全地 reload 或 goto(url),忽略因 ERR_ABORTED / frame detached 导致的异常。""" |
| try: |
| if url: |
| await page.goto(url, wait_until="commit", timeout=45000) |
| else: |
| await page.reload(wait_until="domcontentloaded", timeout=45000) |
| except Exception as e: |
| err_msg = str(e) |
| if "ERR_ABORTED" in err_msg or "detached" in err_msg.lower(): |
| logger.warning( |
| "[auth] page.reload/goto 被中止或 frame 已分离: %s", err_msg[:200] |
| ) |
| else: |
| raise |
|
|
|
|
| async def apply_cookie_auth( |
| context: BrowserContext, |
| page: Page, |
| auth: dict[str, Any], |
| cookie_name: str, |
| auth_keys: list[str], |
| domain: str, |
| *, |
| path: str = "/", |
| reload: bool = True, |
| ) -> None: |
| """ |
| 从 auth 中按 auth_keys 顺序取第一个非空值作为 cookie 值,写入 context 并可选 reload。 |
| 接入方只需提供 cookie 名、auth 里的 key 列表、域名。 |
| 仅写 cookie 不 reload 时,同 context 内的 fetch() 仍会带上 cookie;reload 仅在需要页面文档同步登录态时用。 |
| """ |
| value = None |
| for k in auth_keys: |
| v = auth.get(k) |
| if v is not None and v != "": |
| value = str(v).strip() |
| if value: |
| break |
| if not value: |
| raise ValueError(f"auth 需包含以下其一且非空: {auth_keys}") |
|
|
| logger.info( |
| "[auth] context.add_cookies domain=%s name=%s reload=%s page.url=%s", |
| domain, |
| cookie_name, |
| reload, |
| page.url, |
| ) |
| await context.add_cookies( |
| [ |
| { |
| "name": cookie_name, |
| "value": value, |
| "domain": domain, |
| "path": path, |
| "secure": True, |
| "httpOnly": True, |
| } |
| ] |
| ) |
| if reload: |
| await safe_page_reload(page) |
|
|
|
|
| def _attach_json_body(result: dict[str, Any], *, invalid_message: str) -> dict[str, Any]: |
| if not isinstance(result, dict): |
| raise RuntimeError(invalid_message) |
| error = result.get("error") |
| if error: |
| raise RuntimeError(str(error)) |
| text = result.get("text") |
| if isinstance(text, str) and text: |
| try: |
| result["json"] = json.loads(text) |
| except json.JSONDecodeError: |
| result["json"] = None |
| else: |
| result["json"] = None |
| return result |
|
|
|
|
| def _cookie_domain_matches_url(cookie_domain: str, target_url: str) -> bool: |
| host = (urlparse(target_url).hostname or "").lower().lstrip(".") |
| domain = (cookie_domain or "").lower().lstrip(".") |
| if not host or not domain: |
| return False |
| return host == domain or host.endswith(f".{domain}") |
|
|
|
|
| def _cookies_for_url(cookies: list[dict[str, Any]], target_url: str) -> dict[str, str]: |
| target_host = (urlparse(target_url).hostname or "").lower().lstrip(".") |
| if not target_host: |
| return {} |
| selected: dict[str, str] = {} |
| for cookie in cookies: |
| name = str(cookie.get("name") or "").strip() |
| value = str(cookie.get("value") or "") |
| domain = str(cookie.get("domain") or "").strip() |
| if not name or not _cookie_domain_matches_url(domain, target_url): |
| continue |
| selected[name] = value |
| return selected |
|
|
|
|
| async def _stream_via_http_client( |
| context: BrowserContext, |
| page: Page | None, |
| url: str, |
| body: str, |
| request_id: str, |
| *, |
| on_http_error: Callable[[str, dict[str, str] | None], int | None] | None = None, |
| on_headers: Callable[[dict[str, str]], None] | None = None, |
| connect_timeout: float = 30.0, |
| read_timeout: float = 300.0, |
| impersonate: str = "chrome142", |
| proxy_url: str | None = None, |
| proxy_auth: tuple[str, str] | None = None, |
| ) -> AsyncIterator[str]: |
| logger.info( |
| "[fetch] helper=stream_raw_via_context_request request_id=%s stage=http_client url=%s page.url=%s", |
| request_id, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| ) |
|
|
| parsed = urlparse(url) |
| referer = "" |
| if parsed.scheme and parsed.netloc: |
| referer = f"{parsed.scheme}://{parsed.netloc}/" |
|
|
| try: |
| cookies = await context.cookies([url]) |
| except Exception as e: |
| classified = _classify_browser_resource_error( |
| e, |
| helper_name="stream_raw_via_context_request", |
| operation="context.cookies", |
| stage="load_cookies", |
| request_url=url, |
| page=page, |
| request_id=request_id, |
| stream_phase="fetch", |
| ) |
| if classified is not None: |
| raise classified from e |
| raise BrowserResourceInvalidError( |
| str(e), |
| helper_name="stream_raw_via_context_request", |
| operation="context.cookies", |
| stage="load_cookies", |
| resource_hint="page", |
| request_url=url, |
| page_url=_safe_page_url(page), |
| request_id=request_id, |
| stream_phase="fetch", |
| ) from e |
| cookie_jar = _cookies_for_url(cookies, url) |
| session_kwargs: dict[str, Any] = { |
| "impersonate": impersonate, |
| "timeout": (connect_timeout, read_timeout), |
| "verify": True, |
| "allow_redirects": True, |
| "default_headers": True, |
| } |
| if cookie_jar: |
| session_kwargs["cookies"] = cookie_jar |
| if proxy_url: |
| session_kwargs["proxy"] = proxy_url |
| if proxy_auth: |
| session_kwargs["proxy_auth"] = proxy_auth |
|
|
| response = None |
| try: |
| async with curl_requests.AsyncSession(**session_kwargs) as session: |
| try: |
| request_headers = { |
| "Content-Type": "application/json", |
| "Accept": "text/event-stream", |
| } |
| if referer: |
| request_headers["Origin"] = referer.rstrip("/") |
| async with session.stream( |
| "POST", |
| url, |
| data=body.encode("utf-8"), |
| headers=request_headers, |
| ) as response: |
| headers = { |
| str(k).lower(): str(v) for k, v in response.headers.items() |
| } |
| if on_headers: |
| on_headers(headers) |
|
|
| status = int(response.status_code) |
| if status < 200 or status >= 300: |
| body_parts: list[str] = [] |
| decoder = codecs.getincrementaldecoder("utf-8")("replace") |
| async for chunk in response.aiter_content(): |
| if not chunk: |
| continue |
| body_parts.append(decoder.decode(chunk)) |
| if sum(len(part) for part in body_parts) >= 800: |
| break |
| body_parts.append(decoder.decode(b"", final=True)) |
| snippet = "".join(body_parts) |
| if len(snippet) > 800: |
| snippet = snippet[:800] + "..." |
| msg = f"HTTP {status} {snippet}".strip() |
| if on_http_error: |
| unfreeze_at = on_http_error(msg, headers) |
| if isinstance(unfreeze_at, int): |
| logger.warning("[fetch] HTTP error from context request: %s", msg) |
| raise AccountFrozenError(msg, unfreeze_at) |
| raise RuntimeError(msg) |
|
|
| decoder = codecs.getincrementaldecoder("utf-8")("replace") |
| async for chunk in response.aiter_content(): |
| if not chunk: |
| continue |
| text = decoder.decode(chunk) |
| if text: |
| yield text |
| tail = decoder.decode(b"", final=True) |
| if tail: |
| yield tail |
| except Exception as e: |
| classified = _classify_browser_resource_error( |
| e, |
| helper_name="stream_raw_via_context_request", |
| operation="http_client", |
| stage="stream", |
| request_url=url, |
| page=page, |
| request_id=request_id, |
| stream_phase="body", |
| ) |
| if classified is not None: |
| raise classified from e |
| raise BrowserResourceInvalidError( |
| str(e), |
| helper_name="stream_raw_via_context_request", |
| operation="http_client", |
| stage="stream", |
| resource_hint="transport", |
| request_url=url, |
| page_url=_safe_page_url(page), |
| request_id=request_id, |
| stream_phase="body", |
| ) from e |
| except AccountFrozenError: |
| raise |
| except BrowserResourceInvalidError: |
| raise |
| except Exception as e: |
| classified = _classify_browser_resource_error( |
| e, |
| helper_name="stream_raw_via_context_request", |
| operation="http_client", |
| stage="request", |
| request_url=url, |
| page=page, |
| request_id=request_id, |
| stream_phase="fetch", |
| ) |
| if classified is not None: |
| raise classified from e |
| logger.warning( |
| "[fetch] helper=stream_raw_via_context_request request_id=%s http_client failed url=%s page.url=%s err=%s", |
| request_id, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| _truncate_for_log(str(e), 400), |
| ) |
| raise BrowserResourceInvalidError( |
| str(e), |
| helper_name="stream_raw_via_context_request", |
| operation="http_client", |
| stage="request", |
| resource_hint="transport", |
| request_url=url, |
| page_url=_safe_page_url(page), |
| request_id=request_id, |
| stream_phase="fetch", |
| ) from e |
|
|
|
|
| async def _request_via_context_request( |
| context: BrowserContext, |
| page: Page | None, |
| url: str, |
| *, |
| method: str = "GET", |
| body: str | None = None, |
| headers: dict[str, str] | None = None, |
| multipart: dict[str, Any] | None = None, |
| timeout_ms: int = 15000, |
| request_id: str | None = None, |
| helper_name: str, |
| ) -> dict[str, Any]: |
| logger.info( |
| "[fetch] helper=%s method=%s request_id=%s url=%s page.url=%s", |
| helper_name, |
| method, |
| request_id, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| ) |
| response = None |
| try: |
| response = await context.request.fetch( |
| url, |
| method=method, |
| headers=headers or None, |
| data=body, |
| multipart=multipart, |
| timeout=timeout_ms, |
| fail_on_status_code=False, |
| ) |
| text = await response.text() |
| return { |
| "ok": bool(response.ok), |
| "status": int(response.status), |
| "statusText": str(response.status_text), |
| "url": str(response.url), |
| "redirected": str(response.url) != url, |
| "headers": {str(k): str(v) for k, v in response.headers.items()}, |
| "text": text, |
| } |
| except Exception as e: |
| classified = _classify_browser_resource_error( |
| e, |
| helper_name=helper_name, |
| operation="context.request", |
| stage="fetch", |
| request_url=url, |
| page=page, |
| request_id=request_id, |
| ) |
| if classified is not None: |
| raise classified from e |
| logger.warning( |
| "[fetch] helper=%s request_id=%s context.request failed url=%s page.url=%s err=%s", |
| helper_name, |
| request_id, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| _truncate_for_log(str(e), 400), |
| ) |
| raise RuntimeError(str(e)) from e |
| finally: |
| if response is not None: |
| try: |
| await response.dispose() |
| except Exception: |
| pass |
|
|
|
|
| async def request_json_via_context_request( |
| context: BrowserContext, |
| page: Page | None, |
| url: str, |
| *, |
| method: str = "GET", |
| body: str | None = None, |
| headers: dict[str, str] | None = None, |
| timeout_ms: int = 15000, |
| request_id: str | None = None, |
| ) -> dict[str, Any]: |
| result = await _request_via_context_request( |
| context, |
| page, |
| url, |
| method=method, |
| body=body, |
| headers=headers, |
| timeout_ms=timeout_ms, |
| request_id=request_id, |
| helper_name="request_json_via_context_request", |
| ) |
| return _attach_json_body(result, invalid_message="控制请求返回结果异常") |
|
|
|
|
| async def request_json_via_page_fetch( |
| page: Page, |
| url: str, |
| *, |
| method: str = "GET", |
| body: str | None = None, |
| headers: dict[str, str] | None = None, |
| timeout_ms: int = 15000, |
| request_id: str | None = None, |
| ) -> dict[str, Any]: |
| """ |
| 在页面内发起非流式 fetch,请求结果按 JSON 优先解析返回。 |
| 这样能复用浏览器真实网络栈、cookie 与代理扩展能力。 |
| """ |
| logger.info( |
| "[fetch] helper=request_json_via_page_fetch method=%s request_id=%s url=%s page.url=%s", |
| method, |
| request_id, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| ) |
| try: |
| result = await asyncio.wait_for( |
| page.evaluate( |
| PAGE_FETCH_JSON_JS, |
| { |
| "url": url, |
| "method": method, |
| "body": body, |
| "headers": headers or {}, |
| "timeoutMs": timeout_ms, |
| }, |
| ), |
| timeout=_evaluate_timeout_seconds(timeout_ms), |
| ) |
| except asyncio.TimeoutError as e: |
| logger.warning( |
| "[fetch] helper=request_json_via_page_fetch request_id=%s evaluate timeout url=%s page.url=%s", |
| request_id, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| ) |
| raise BrowserResourceInvalidError( |
| f"page.evaluate timeout after {_evaluate_timeout_seconds(timeout_ms):.1f}s", |
| helper_name="request_json_via_page_fetch", |
| operation="page.evaluate", |
| stage="evaluate_timeout", |
| resource_hint="page", |
| request_url=url, |
| page_url=_safe_page_url(page), |
| request_id=request_id, |
| ) from e |
| except Exception as e: |
| classified = _classify_browser_resource_error( |
| e, |
| helper_name="request_json_via_page_fetch", |
| operation="page.evaluate", |
| stage="evaluate", |
| request_url=url, |
| page=page, |
| request_id=request_id, |
| ) |
| if classified is not None: |
| raise classified from e |
| raise |
| return _attach_json_body(result, invalid_message="页面 fetch 返回结果异常") |
|
|
|
|
| async def upload_file_via_context_request( |
| context: BrowserContext, |
| page: Page | None, |
| url: str, |
| *, |
| filename: str, |
| mime_type: str, |
| data: bytes, |
| field_name: str = "file", |
| extra_fields: dict[str, str] | None = None, |
| timeout_ms: int = 30000, |
| request_id: str | None = None, |
| ) -> dict[str, Any]: |
| multipart: dict[str, Any] = dict(extra_fields or {}) |
| multipart[field_name] = { |
| "name": filename, |
| "mimeType": mime_type or "application/octet-stream", |
| "buffer": data, |
| } |
| result = await _request_via_context_request( |
| context, |
| page, |
| url, |
| method="POST", |
| multipart=multipart, |
| timeout_ms=timeout_ms, |
| request_id=request_id, |
| helper_name="upload_file_via_context_request", |
| ) |
| return _attach_json_body(result, invalid_message="控制上传返回结果异常") |
|
|
|
|
| async def upload_file_via_page_fetch( |
| page: Page, |
| url: str, |
| *, |
| filename: str, |
| mime_type: str, |
| data: bytes, |
| field_name: str = "file", |
| extra_fields: dict[str, str] | None = None, |
| timeout_ms: int = 30000, |
| request_id: str | None = None, |
| ) -> dict[str, Any]: |
| logger.info( |
| "[fetch] helper=upload_file_via_page_fetch filename=%s mime=%s request_id=%s url=%s page.url=%s", |
| filename, |
| mime_type, |
| request_id, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| ) |
| try: |
| result = await asyncio.wait_for( |
| page.evaluate( |
| PAGE_FETCH_MULTIPART_JS, |
| { |
| "url": url, |
| "filename": filename, |
| "mimeType": mime_type, |
| "dataBase64": base64.b64encode(data).decode("ascii"), |
| "fieldName": field_name, |
| "extraFields": extra_fields or {}, |
| "timeoutMs": timeout_ms, |
| }, |
| ), |
| timeout=_evaluate_timeout_seconds(timeout_ms), |
| ) |
| except asyncio.TimeoutError as e: |
| logger.warning( |
| "[fetch] helper=upload_file_via_page_fetch request_id=%s evaluate timeout url=%s page.url=%s", |
| request_id, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| ) |
| raise BrowserResourceInvalidError( |
| f"page.evaluate timeout after {_evaluate_timeout_seconds(timeout_ms):.1f}s", |
| helper_name="upload_file_via_page_fetch", |
| operation="page.evaluate", |
| stage="evaluate_timeout", |
| resource_hint="page", |
| request_url=url, |
| page_url=_safe_page_url(page), |
| request_id=request_id, |
| ) from e |
| except Exception as e: |
| classified = _classify_browser_resource_error( |
| e, |
| helper_name="upload_file_via_page_fetch", |
| operation="page.evaluate", |
| stage="evaluate", |
| request_url=url, |
| page=page, |
| request_id=request_id, |
| ) |
| if classified is not None: |
| raise classified from e |
| raise |
| return _attach_json_body(result, invalid_message="页面上传返回结果异常") |
|
|
|
|
| async def stream_raw_via_context_request( |
| context: BrowserContext, |
| page: Page | None, |
| url: str, |
| body: str, |
| request_id: str, |
| *, |
| on_http_error: Callable[[str, dict[str, str] | None], int | None] | None = None, |
| on_headers: Callable[[dict[str, str]], None] | None = None, |
| fetch_timeout: float = 90.0, |
| body_timeout: float = 300.0, |
| proxy_url: str | None = None, |
| proxy_auth: tuple[str, str] | None = None, |
| ) -> AsyncIterator[str]: |
| """通过真实流式 HTTP client 发起 completion 请求,避免先读完整 body。""" |
| del fetch_timeout |
| async for chunk in _stream_via_http_client( |
| context, |
| page, |
| url, |
| body, |
| request_id, |
| on_http_error=on_http_error, |
| on_headers=on_headers, |
| read_timeout=body_timeout, |
| proxy_url=proxy_url, |
| proxy_auth=proxy_auth, |
| ): |
| yield chunk |
|
|
|
|
| async def stream_raw_via_page_fetch( |
| context: BrowserContext, |
| page: Page, |
| url: str, |
| body: str, |
| request_id: str, |
| *, |
| on_http_error: Callable[[str, dict[str, str] | None], int | None] | None = None, |
| on_headers: Callable[[dict[str, str]], None] | None = None, |
| error_state: dict[str, bool] | None = None, |
| fetch_timeout: int = 90, |
| read_timeout: float = 60.0, |
| ) -> AsyncIterator[str]: |
| """ |
| 在浏览器内对 url 发起 POST body,流式回传原始字符串块(含 SSE 等)。 |
| 同一 page 多请求用 request_id 区分 binding,互不串数据。 |
| 通过 CDP Runtime.addBinding 注入 sendChunk_<request_id>,用 Runtime.bindingCalled 接收。 |
| 收到 __headers__: 时解析 JSON 并调用 on_headers(headers);收到 __error__: 时调用 on_http_error(msg);收到 __done__ 结束。 |
| """ |
| chunk_queue: asyncio.Queue[str] = asyncio.Queue() |
| binding_name = "sendChunk_" + request_id |
| stream_phase = "cdp_setup" |
|
|
| def on_binding_called(event: dict[str, Any]) -> None: |
| name = event.get("name") |
| payload = event.get("payload", "") |
| if name == binding_name: |
| chunk_queue.put_nowait( |
| payload if isinstance(payload, str) else str(payload) |
| ) |
|
|
| def classify_stream_error( |
| exc: Exception, |
| *, |
| stage: str, |
| ) -> BrowserResourceInvalidError | None: |
| return _classify_browser_resource_error( |
| exc, |
| helper_name="stream_raw_via_page_fetch", |
| operation="stream", |
| stage=stage, |
| request_url=url, |
| page=page, |
| request_id=request_id, |
| stream_phase=stream_phase, |
| ) |
|
|
| cdp = None |
| fetch_task: asyncio.Task[None] | None = None |
| try: |
| try: |
| cdp = await context.new_cdp_session(page) |
| except Exception as e: |
| classified = classify_stream_error(e, stage="new_cdp_session") |
| if classified is not None: |
| raise classified from e |
| raise |
| cdp.on("Runtime.bindingCalled", on_binding_called) |
| try: |
| await cdp.send("Runtime.addBinding", {"name": binding_name}) |
| except Exception as e: |
| classified = classify_stream_error(e, stage="add_binding") |
| if classified is not None: |
| raise classified from e |
| raise |
|
|
| logger.info( |
| "[fetch] helper=stream_raw_via_page_fetch request_id=%s stage=page.evaluate url=%s page.url=%s", |
| request_id, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| ) |
|
|
| async def run_fetch() -> None: |
| nonlocal stream_phase |
| try: |
| stream_phase = "page_evaluate" |
| await asyncio.wait_for( |
| page.evaluate( |
| PAGE_FETCH_STREAM_JS, |
| { |
| "url": url, |
| "body": body, |
| "bindingName": binding_name, |
| "timeoutMs": max(1, int(fetch_timeout * 1000)), |
| }, |
| ), |
| timeout=max(float(fetch_timeout) + 5.0, 10.0), |
| ) |
| except asyncio.TimeoutError as e: |
| logger.warning( |
| "[fetch] helper=stream_raw_via_page_fetch request_id=%s stage=page.evaluate evaluate timeout url=%s page.url=%s", |
| request_id, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| ) |
| raise BrowserResourceInvalidError( |
| f"page.evaluate timeout after {max(float(fetch_timeout) + 5.0, 10.0):.1f}s", |
| helper_name="stream_raw_via_page_fetch", |
| operation="stream", |
| stage="evaluate_timeout", |
| resource_hint="page", |
| request_url=url, |
| page_url=_safe_page_url(page), |
| request_id=request_id, |
| stream_phase=stream_phase, |
| ) from e |
| except Exception as e: |
| classified = classify_stream_error(e, stage="page.evaluate") |
| if classified is not None: |
| raise classified from e |
| raise |
|
|
| fetch_task = asyncio.create_task(run_fetch()) |
| try: |
| headers = None |
| while True: |
| if fetch_task.done(): |
| exc = fetch_task.exception() |
| if exc is not None: |
| raise exc |
| try: |
| chunk = await asyncio.wait_for( |
| chunk_queue.get(), timeout=read_timeout |
| ) |
| except asyncio.TimeoutError as e: |
| stream_phase = "body" |
| logger.warning( |
| "[fetch] helper=stream_raw_via_page_fetch request_id=%s stream_phase=%s read timeout url=%s page.url=%s", |
| request_id, |
| stream_phase, |
| _truncate_for_log(url, 120), |
| _truncate_for_log(_safe_page_url(page), 120), |
| ) |
| raise BrowserResourceInvalidError( |
| f"stream read timeout after {read_timeout:.1f}s", |
| helper_name="stream_raw_via_page_fetch", |
| operation="stream", |
| stage="read_timeout", |
| resource_hint="page", |
| request_url=url, |
| page_url=_safe_page_url(page), |
| request_id=request_id, |
| stream_phase=stream_phase, |
| ) from e |
| if chunk == "__done__": |
| break |
| if chunk.startswith("__headers__:"): |
| stream_phase = "headers" |
| try: |
| headers = json.loads(chunk[12:]) |
| if on_headers and isinstance(headers, dict): |
| on_headers({k: str(v) for k, v in headers.items()}) |
| except (json.JSONDecodeError, TypeError) as e: |
| logger.debug("[fetch] 解析 __headers__ 失败: %s", e) |
| continue |
| if chunk.startswith("__error__:"): |
| msg = chunk[10:].strip() |
| saw_terminal = bool(error_state and error_state.get("terminal")) |
| stream_phase = "terminal_event" if saw_terminal else ("body" if headers else "before_headers") |
| if on_http_error: |
| unfreeze_at = on_http_error(msg, headers) |
| if isinstance(unfreeze_at, int): |
| logger.warning("[fetch] __error__ from page: %s", msg) |
| raise AccountFrozenError(msg, unfreeze_at) |
| classified = _classify_browser_resource_error( |
| RuntimeError(msg), |
| helper_name="stream_raw_via_page_fetch", |
| operation="page_fetch_stream", |
| stage="page_error_event", |
| request_url=url, |
| page=page, |
| request_id=request_id, |
| stream_phase=stream_phase, |
| ) |
| if classified is not None: |
| raise classified |
| if saw_terminal: |
| logger.info( |
| "[fetch] page fetch disconnected after terminal event request_id=%s stream_phase=%s: %s", |
| request_id, |
| stream_phase, |
| msg, |
| ) |
| continue |
| logger.warning( |
| "[fetch] __error__ from page before terminal event request_id=%s stream_phase=%s: %s", |
| request_id, |
| stream_phase, |
| msg, |
| ) |
| raise RuntimeError(msg) |
| stream_phase = "body" |
| yield chunk |
| finally: |
| if fetch_task is not None: |
| done, pending = await asyncio.wait({fetch_task}, timeout=5.0) |
| if pending: |
| fetch_task.cancel() |
| fetch_task.add_done_callback(_consume_background_task_result) |
| else: |
| try: |
| fetch_task.result() |
| except asyncio.CancelledError: |
| pass |
| except BrowserResourceInvalidError: |
| pass |
| finally: |
| if cdp is not None: |
| try: |
| await asyncio.wait_for(cdp.detach(), timeout=2.0) |
| except asyncio.TimeoutError: |
| logger.warning( |
| "[fetch] helper=stream_raw_via_page_fetch request_id=%s detach CDP session timeout page.url=%s", |
| request_id, |
| _truncate_for_log(_safe_page_url(page), 120), |
| ) |
| except Exception as e: |
| logger.debug("detach CDP session 时异常: %s", e) |
|
|
|
|
| def parse_sse_to_events(buffer: str, chunk: str) -> tuple[str, list[str]]: |
| """ |
| 把 chunk 追加到 buffer,按行拆出 data: 后的 payload 列表,返回 (剩余 buffer, payload 列表)。 |
| 接入方对每个 payload 自行 JSON 解析并抽取 text / message_id / error。 |
| """ |
| buffer += chunk |
| lines = buffer.split("\n") |
| buffer = lines[-1] |
| payloads: list[str] = [] |
| for line in lines[:-1]: |
| line = line.strip() |
| if not line.startswith("data: "): |
| continue |
| payload = line[6:].strip() |
| if payload == "[DONE]" or not payload: |
| continue |
| payloads.append(payload) |
| return (buffer, payloads) |
|
|
|
|
| async def stream_completion_via_sse( |
| context: BrowserContext, |
| page: Page, |
| url: str, |
| body: str, |
| parse_event: ParseSseEvent, |
| request_id: str, |
| *, |
| on_http_error: Callable, |
| is_terminal_event: Callable[[str], bool] | None = None, |
| collect_message_id: list[str] | None = None, |
| first_token_timeout: float = 30.0, |
| transport: str = "page_fetch", |
| transport_options: dict[str, Any] | None = None, |
| ) -> AsyncIterator[str]: |
| """ |
| 在浏览器内 POST 拿到流,按 SSE 行拆成 data 事件,用 parse_event(payload) 解析每条; |
| 逐块 yield 文本,可选把 message_id 收集到 collect_message_id。 |
| parse_event(payload) 返回 (texts, message_id, error),error 非空时仅打 debug 日志不抛错。 |
| """ |
| buffer = "" |
| stream_state: dict[str, bool] = {"terminal": False} |
| saw_text = False |
| loop = asyncio.get_running_loop() |
| started_at = loop.time() |
| opts = dict(transport_options or {}) |
| if transport == "context_request": |
| raw_stream = stream_raw_via_context_request( |
| context, |
| page, |
| url, |
| body, |
| request_id, |
| on_http_error=on_http_error, |
| **opts, |
| ) |
| resource_hint = "transport" |
| else: |
| raw_stream = stream_raw_via_page_fetch( |
| context, |
| page, |
| url, |
| body, |
| request_id, |
| on_http_error=on_http_error, |
| error_state=stream_state, |
| ) |
| resource_hint = "page" |
| async for chunk in raw_stream: |
| buffer, payloads = parse_sse_to_events(buffer, chunk) |
| for payload in payloads: |
| if is_terminal_event and is_terminal_event(payload): |
| stream_state["terminal"] = True |
| try: |
| texts, message_id, error = parse_event(payload) |
| except Exception as e: |
| logger.debug("parse_stream_event 单条解析异常: %s", e) |
| continue |
| if error: |
| logger.warning("SSE error from upstream: %s", error) |
| raise RuntimeError(error) |
| if message_id and collect_message_id is not None: |
| collect_message_id.append(message_id) |
| for t in texts: |
| saw_text = True |
| yield t |
| if ( |
| not saw_text |
| and not stream_state["terminal"] |
| and loop.time() - started_at >= first_token_timeout |
| ): |
| raise BrowserResourceInvalidError( |
| f"no text token received within {first_token_timeout:.1f}s", |
| helper_name="stream_completion_via_sse", |
| operation="parse_stream", |
| stage="first_token_timeout", |
| resource_hint=resource_hint, |
| request_url=url, |
| page_url=_safe_page_url(page), |
| request_id=request_id, |
| stream_phase="before_first_text", |
| ) |
|
|