""" 插件通用能力:页面复用、Cookie 登录、在浏览器内发起 fetch 并流式回传。 接入方只需实现站点特有的 URL/请求体/SSE 解析,其余复用此处逻辑。 """ import asyncio import base64 import codecs import json import logging from collections.abc import Callable from typing import Any, AsyncIterator from urllib.parse import urlparse from curl_cffi import requests as curl_requests from playwright.async_api import BrowserContext, Page from core.plugin.errors import AccountFrozenError, BrowserResourceInvalidError ParseSseEvent = Callable[[str], tuple[list[str], str | None, str | None]] logger = logging.getLogger(__name__) _BROWSER_RESOURCE_ERROR_PATTERNS: tuple[tuple[str, str, str], ...] = ( ("target crashed", "page", "target_crashed"), ("page crashed", "browser", "page_crashed"), ("execution context was destroyed", "page", "execution_context_destroyed"), ("navigating frame was detached", "page", "frame_detached"), ("frame was detached", "page", "frame_detached"), ("session closed. most likely the page has been closed", "page", "page_closed"), ("most likely the page has been closed", "page", "page_closed"), ("browser context has been closed", "page", "context_closed"), ("context has been closed", "page", "context_closed"), ("target page, context or browser has been closed", "page", "page_or_browser_closed"), ("page has been closed", "page", "page_closed"), ("target closed", "page", "target_closed"), ("browser has been closed", "browser", "browser_closed"), ("browser closed", "browser", "browser_closed"), ("connection closed", "browser", "browser_disconnected"), ("connection terminated", "browser", "browser_disconnected"), ("has been disconnected", "browser", "browser_disconnected"), # Proxy / network tunnel errors — retryable via browser re-launch ("err_tunnel_connection_failed", "browser", "proxy_tunnel_failed"), ("err_proxy_connection_failed", "browser", "proxy_connection_failed"), ("err_connection_refused", "browser", "connection_refused"), ("err_connection_timed_out", "browser", "connection_timed_out"), ("err_connection_reset", "browser", "connection_reset"), ) def _truncate_for_log(value: str, limit: int = 240) -> str: if len(value) <= limit: return value return value[:limit] + "..." def _safe_page_url(page: Page | None) -> str: if page is None: return "" try: return page.url or "" except Exception: return "" def _evaluate_timeout_seconds(timeout_ms: int, grace_seconds: float = 5.0) -> float: return max(5.0, float(timeout_ms) / 1000.0 + grace_seconds) def _consume_background_task_result(task: asyncio.Task[Any]) -> None: try: if not task.cancelled(): task.exception() except Exception: pass def _classify_browser_resource_error( exc: Exception, *, helper_name: str, operation: str, stage: str, request_url: str, page: Page | None, request_id: str | None = None, stream_phase: str | None = None, ) -> BrowserResourceInvalidError | None: message = str(exc).strip() or exc.__class__.__name__ normalized = message.lower() for pattern, resource_hint, reason in _BROWSER_RESOURCE_ERROR_PATTERNS: if pattern not in normalized: continue page_url = _safe_page_url(page) logger.warning( "[browser-resource-invalid] helper=%s operation=%s stage=%s reason=%s resource=%s request_id=%s stream_phase=%s request_url=%s page.url=%s err=%s", helper_name, operation, stage, reason, resource_hint, request_id, stream_phase, _truncate_for_log(request_url), _truncate_for_log(page_url), _truncate_for_log(message, 400), ) return BrowserResourceInvalidError( message, helper_name=helper_name, operation=operation, stage=stage, resource_hint=resource_hint, request_url=request_url, page_url=page_url, request_id=request_id, stream_phase=stream_phase, ) return None # 在页面内 POST 请求并流式回传:成功时逐块发送响应体,失败时发送 __error__: 前缀 + 信息,最后发送 __done__ # bindingName 按请求唯一,同一 page 多并发时互不串数据 PAGE_FETCH_STREAM_JS = """ async ({ url, body, bindingName, timeoutMs }) => { const send = globalThis[bindingName]; const done = "__done__"; const errPrefix = "__error__:"; try { const ctrl = new AbortController(); const effectiveTimeoutMs = timeoutMs || 90000; const t = setTimeout(() => ctrl.abort(), effectiveTimeoutMs); const resp = await fetch(url, { method: "POST", body: body, headers: { "Content-Type": "application/json", "Accept": "text/event-stream" }, credentials: "include", signal: ctrl.signal }); clearTimeout(t); if (!resp.ok) { const errText = await resp.text(); const errSnippet = (errText && errText.length > 800) ? errText.slice(0, 800) + "..." : (errText || ""); await send(errPrefix + "HTTP " + resp.status + " " + errSnippet); await send(done); return; } if (!resp.body) { await send(errPrefix + "No response body"); await send(done); return; } const headersObj = {}; resp.headers.forEach((v, k) => { headersObj[k] = v; }); await send("__headers__:" + JSON.stringify(headersObj)); const reader = resp.body.getReader(); const dec = new TextDecoder(); while (true) { const { done: streamDone, value } = await reader.read(); if (streamDone) break; await send(dec.decode(value)); } } catch (e) { const msg = e.name === "AbortError" ? `请求超时(${Math.floor(effectiveTimeoutMs / 1000)}s)` : (e.message || String(e)); await send(errPrefix + msg); } await send(done); } """ PAGE_FETCH_JSON_JS = """ async ({ url, method, body, headers, timeoutMs }) => { const ctrl = new AbortController(); const t = setTimeout(() => ctrl.abort(), timeoutMs || 15000); try { const resp = await fetch(url, { method: method || "GET", body: body ?? undefined, headers: headers || {}, credentials: "include", signal: ctrl.signal }); clearTimeout(t); const text = await resp.text(); const headersObj = {}; resp.headers.forEach((v, k) => { headersObj[k] = v; }); return { ok: resp.ok, status: resp.status, statusText: resp.statusText, url: resp.url, redirected: resp.redirected, headers: headersObj, text, }; } catch (e) { clearTimeout(t); const msg = e.name === "AbortError" ? `请求超时(${Math.floor((timeoutMs || 15000) / 1000)}s)` : (e.message || String(e)); return { error: msg }; } } """ PAGE_FETCH_MULTIPART_JS = """ async ({ url, filename, mimeType, dataBase64, fieldName, extraFields, timeoutMs }) => { const ctrl = new AbortController(); const t = setTimeout(() => ctrl.abort(), timeoutMs || 30000); try { const binary = atob(dataBase64); const bytes = new Uint8Array(binary.length); for (let i = 0; i < binary.length; i += 1) { bytes[i] = binary.charCodeAt(i); } const form = new FormData(); if (extraFields) { Object.entries(extraFields).forEach(([k, v]) => { if (v !== undefined && v !== null) form.append(k, String(v)); }); } const file = new File([bytes], filename, { type: mimeType || "application/octet-stream" }); form.append(fieldName || "file", file); const resp = await fetch(url, { method: "POST", body: form, credentials: "include", signal: ctrl.signal }); clearTimeout(t); const text = await resp.text(); const headersObj = {}; resp.headers.forEach((v, k) => { headersObj[k] = v; }); return { ok: resp.ok, status: resp.status, statusText: resp.statusText, url: resp.url, redirected: resp.redirected, headers: headersObj, text, }; } catch (e) { clearTimeout(t); const msg = e.name === "AbortError" ? `请求超时(${Math.floor((timeoutMs || 30000) / 1000)}s)` : (e.message || String(e)); return { error: msg }; } } """ async def ensure_page_for_site( context: BrowserContext, url_contains: str, start_url: str, *, timeout: int = 45000, ) -> Page: """ 若已有页面 URL 包含 url_contains 则复用,否则 new_page 并 goto start_url。 接入方只需提供「站点特征」和「入口 URL」。 """ if context.pages: for p in context.pages: if url_contains in (p.url or ""): return p page = await context.new_page() await page.goto(start_url, wait_until="commit", timeout=timeout) return page async def create_page_for_site( context: BrowserContext, start_url: str, *, reuse_page: Page | None = None, timeout: int = 45000, ) -> Page: """ 若传入 reuse_page 则在其上 goto start_url,否则 new_page 再 goto。 用于复用浏览器默认空白页或 page 池的初始化与补回。 """ if reuse_page is not None: await reuse_page.goto(start_url, wait_until="commit", timeout=timeout) return reuse_page page = await context.new_page() await page.goto(start_url, wait_until="commit", timeout=timeout) return page def _cookie_domain_matches(cookie_domain: str, site_domain: str) -> bool: """判断 cookie 的 domain 是否属于站点 domain(如 .claude.ai 与 claude.ai 视为同一域)。""" a = cookie_domain if cookie_domain.startswith(".") else f".{cookie_domain}" b = site_domain if site_domain.startswith(".") else f".{site_domain}" return a == b def _cookie_to_set_param(c: Any) -> dict[str, str]: """将 context.cookies() 返回的项转为 add_cookies 接受的 SetCookieParam 格式。""" return { "name": c["name"], "value": c["value"], "domain": c.get("domain") or "", "path": c.get("path") or "/", } async def clear_cookies_for_domain( context: BrowserContext, site_domain: str, ) -> None: """清除 context 内属于指定站点域的所有 cookie,保留其他域。""" cookies = await context.cookies() keep = [ c for c in cookies if not _cookie_domain_matches(c.get("domain", ""), site_domain) ] await context.clear_cookies() if keep: await context.add_cookies([_cookie_to_set_param(c) for c in keep]) # type: ignore[arg-type] logger.info( "[auth] cleared cookies for domain=%s (kept %s cookies)", site_domain, len(keep) ) async def clear_page_storage_for_switch(page: Page) -> None: """切号前清空当前页面的 localStorage(当前 origin)。""" try: await page.evaluate("() => { window.localStorage.clear(); }") logger.info("[auth] cleared localStorage for switch") except Exception as e: logger.warning("[auth] clear localStorage failed (page may be detached): %s", e) async def safe_page_reload(page: Page, url: str | None = None) -> None: """安全地 reload 或 goto(url),忽略因 ERR_ABORTED / frame detached 导致的异常。""" try: if url: await page.goto(url, wait_until="commit", timeout=45000) else: await page.reload(wait_until="domcontentloaded", timeout=45000) except Exception as e: err_msg = str(e) if "ERR_ABORTED" in err_msg or "detached" in err_msg.lower(): logger.warning( "[auth] page.reload/goto 被中止或 frame 已分离: %s", err_msg[:200] ) else: raise async def apply_cookie_auth( context: BrowserContext, page: Page, auth: dict[str, Any], cookie_name: str, auth_keys: list[str], domain: str, *, path: str = "/", reload: bool = True, ) -> None: """ 从 auth 中按 auth_keys 顺序取第一个非空值作为 cookie 值,写入 context 并可选 reload。 接入方只需提供 cookie 名、auth 里的 key 列表、域名。 仅写 cookie 不 reload 时,同 context 内的 fetch() 仍会带上 cookie;reload 仅在需要页面文档同步登录态时用。 """ value = None for k in auth_keys: v = auth.get(k) if v is not None and v != "": value = str(v).strip() if value: break if not value: raise ValueError(f"auth 需包含以下其一且非空: {auth_keys}") logger.info( "[auth] context.add_cookies domain=%s name=%s reload=%s page.url=%s", domain, cookie_name, reload, page.url, ) await context.add_cookies( [ { "name": cookie_name, "value": value, "domain": domain, "path": path, "secure": True, "httpOnly": True, } ] ) if reload: await safe_page_reload(page) def _attach_json_body(result: dict[str, Any], *, invalid_message: str) -> dict[str, Any]: if not isinstance(result, dict): raise RuntimeError(invalid_message) error = result.get("error") if error: raise RuntimeError(str(error)) text = result.get("text") if isinstance(text, str) and text: try: result["json"] = json.loads(text) except json.JSONDecodeError: result["json"] = None else: result["json"] = None return result def _cookie_domain_matches_url(cookie_domain: str, target_url: str) -> bool: host = (urlparse(target_url).hostname or "").lower().lstrip(".") domain = (cookie_domain or "").lower().lstrip(".") if not host or not domain: return False return host == domain or host.endswith(f".{domain}") def _cookies_for_url(cookies: list[dict[str, Any]], target_url: str) -> dict[str, str]: target_host = (urlparse(target_url).hostname or "").lower().lstrip(".") if not target_host: return {} selected: dict[str, str] = {} for cookie in cookies: name = str(cookie.get("name") or "").strip() value = str(cookie.get("value") or "") domain = str(cookie.get("domain") or "").strip() if not name or not _cookie_domain_matches_url(domain, target_url): continue selected[name] = value return selected async def _stream_via_http_client( context: BrowserContext, page: Page | None, url: str, body: str, request_id: str, *, on_http_error: Callable[[str, dict[str, str] | None], int | None] | None = None, on_headers: Callable[[dict[str, str]], None] | None = None, connect_timeout: float = 30.0, read_timeout: float = 300.0, impersonate: str = "chrome142", proxy_url: str | None = None, proxy_auth: tuple[str, str] | None = None, ) -> AsyncIterator[str]: logger.info( "[fetch] helper=stream_raw_via_context_request request_id=%s stage=http_client url=%s page.url=%s", request_id, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), ) parsed = urlparse(url) referer = "" if parsed.scheme and parsed.netloc: referer = f"{parsed.scheme}://{parsed.netloc}/" try: cookies = await context.cookies([url]) except Exception as e: classified = _classify_browser_resource_error( e, helper_name="stream_raw_via_context_request", operation="context.cookies", stage="load_cookies", request_url=url, page=page, request_id=request_id, stream_phase="fetch", ) if classified is not None: raise classified from e raise BrowserResourceInvalidError( str(e), helper_name="stream_raw_via_context_request", operation="context.cookies", stage="load_cookies", resource_hint="page", request_url=url, page_url=_safe_page_url(page), request_id=request_id, stream_phase="fetch", ) from e cookie_jar = _cookies_for_url(cookies, url) session_kwargs: dict[str, Any] = { "impersonate": impersonate, "timeout": (connect_timeout, read_timeout), "verify": True, "allow_redirects": True, "default_headers": True, } if cookie_jar: session_kwargs["cookies"] = cookie_jar if proxy_url: session_kwargs["proxy"] = proxy_url if proxy_auth: session_kwargs["proxy_auth"] = proxy_auth response = None try: async with curl_requests.AsyncSession(**session_kwargs) as session: try: request_headers = { "Content-Type": "application/json", "Accept": "text/event-stream", } if referer: request_headers["Origin"] = referer.rstrip("/") async with session.stream( "POST", url, data=body.encode("utf-8"), headers=request_headers, ) as response: headers = { str(k).lower(): str(v) for k, v in response.headers.items() } if on_headers: on_headers(headers) status = int(response.status_code) if status < 200 or status >= 300: body_parts: list[str] = [] decoder = codecs.getincrementaldecoder("utf-8")("replace") async for chunk in response.aiter_content(): if not chunk: continue body_parts.append(decoder.decode(chunk)) if sum(len(part) for part in body_parts) >= 800: break body_parts.append(decoder.decode(b"", final=True)) snippet = "".join(body_parts) if len(snippet) > 800: snippet = snippet[:800] + "..." msg = f"HTTP {status} {snippet}".strip() if on_http_error: unfreeze_at = on_http_error(msg, headers) if isinstance(unfreeze_at, int): logger.warning("[fetch] HTTP error from context request: %s", msg) raise AccountFrozenError(msg, unfreeze_at) raise RuntimeError(msg) decoder = codecs.getincrementaldecoder("utf-8")("replace") async for chunk in response.aiter_content(): if not chunk: continue text = decoder.decode(chunk) if text: yield text tail = decoder.decode(b"", final=True) if tail: yield tail except Exception as e: classified = _classify_browser_resource_error( e, helper_name="stream_raw_via_context_request", operation="http_client", stage="stream", request_url=url, page=page, request_id=request_id, stream_phase="body", ) if classified is not None: raise classified from e raise BrowserResourceInvalidError( str(e), helper_name="stream_raw_via_context_request", operation="http_client", stage="stream", resource_hint="transport", request_url=url, page_url=_safe_page_url(page), request_id=request_id, stream_phase="body", ) from e except AccountFrozenError: raise except BrowserResourceInvalidError: raise except Exception as e: classified = _classify_browser_resource_error( e, helper_name="stream_raw_via_context_request", operation="http_client", stage="request", request_url=url, page=page, request_id=request_id, stream_phase="fetch", ) if classified is not None: raise classified from e logger.warning( "[fetch] helper=stream_raw_via_context_request request_id=%s http_client failed url=%s page.url=%s err=%s", request_id, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), _truncate_for_log(str(e), 400), ) raise BrowserResourceInvalidError( str(e), helper_name="stream_raw_via_context_request", operation="http_client", stage="request", resource_hint="transport", request_url=url, page_url=_safe_page_url(page), request_id=request_id, stream_phase="fetch", ) from e async def _request_via_context_request( context: BrowserContext, page: Page | None, url: str, *, method: str = "GET", body: str | None = None, headers: dict[str, str] | None = None, multipart: dict[str, Any] | None = None, timeout_ms: int = 15000, request_id: str | None = None, helper_name: str, ) -> dict[str, Any]: logger.info( "[fetch] helper=%s method=%s request_id=%s url=%s page.url=%s", helper_name, method, request_id, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), ) response = None try: response = await context.request.fetch( url, method=method, headers=headers or None, data=body, multipart=multipart, timeout=timeout_ms, fail_on_status_code=False, ) text = await response.text() return { "ok": bool(response.ok), "status": int(response.status), "statusText": str(response.status_text), "url": str(response.url), "redirected": str(response.url) != url, "headers": {str(k): str(v) for k, v in response.headers.items()}, "text": text, } except Exception as e: classified = _classify_browser_resource_error( e, helper_name=helper_name, operation="context.request", stage="fetch", request_url=url, page=page, request_id=request_id, ) if classified is not None: raise classified from e logger.warning( "[fetch] helper=%s request_id=%s context.request failed url=%s page.url=%s err=%s", helper_name, request_id, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), _truncate_for_log(str(e), 400), ) raise RuntimeError(str(e)) from e finally: if response is not None: try: await response.dispose() except Exception: pass async def request_json_via_context_request( context: BrowserContext, page: Page | None, url: str, *, method: str = "GET", body: str | None = None, headers: dict[str, str] | None = None, timeout_ms: int = 15000, request_id: str | None = None, ) -> dict[str, Any]: result = await _request_via_context_request( context, page, url, method=method, body=body, headers=headers, timeout_ms=timeout_ms, request_id=request_id, helper_name="request_json_via_context_request", ) return _attach_json_body(result, invalid_message="控制请求返回结果异常") async def request_json_via_page_fetch( page: Page, url: str, *, method: str = "GET", body: str | None = None, headers: dict[str, str] | None = None, timeout_ms: int = 15000, request_id: str | None = None, ) -> dict[str, Any]: """ 在页面内发起非流式 fetch,请求结果按 JSON 优先解析返回。 这样能复用浏览器真实网络栈、cookie 与代理扩展能力。 """ logger.info( "[fetch] helper=request_json_via_page_fetch method=%s request_id=%s url=%s page.url=%s", method, request_id, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), ) try: result = await asyncio.wait_for( page.evaluate( PAGE_FETCH_JSON_JS, { "url": url, "method": method, "body": body, "headers": headers or {}, "timeoutMs": timeout_ms, }, ), timeout=_evaluate_timeout_seconds(timeout_ms), ) except asyncio.TimeoutError as e: logger.warning( "[fetch] helper=request_json_via_page_fetch request_id=%s evaluate timeout url=%s page.url=%s", request_id, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), ) raise BrowserResourceInvalidError( f"page.evaluate timeout after {_evaluate_timeout_seconds(timeout_ms):.1f}s", helper_name="request_json_via_page_fetch", operation="page.evaluate", stage="evaluate_timeout", resource_hint="page", request_url=url, page_url=_safe_page_url(page), request_id=request_id, ) from e except Exception as e: classified = _classify_browser_resource_error( e, helper_name="request_json_via_page_fetch", operation="page.evaluate", stage="evaluate", request_url=url, page=page, request_id=request_id, ) if classified is not None: raise classified from e raise return _attach_json_body(result, invalid_message="页面 fetch 返回结果异常") async def upload_file_via_context_request( context: BrowserContext, page: Page | None, url: str, *, filename: str, mime_type: str, data: bytes, field_name: str = "file", extra_fields: dict[str, str] | None = None, timeout_ms: int = 30000, request_id: str | None = None, ) -> dict[str, Any]: multipart: dict[str, Any] = dict(extra_fields or {}) multipart[field_name] = { "name": filename, "mimeType": mime_type or "application/octet-stream", "buffer": data, } result = await _request_via_context_request( context, page, url, method="POST", multipart=multipart, timeout_ms=timeout_ms, request_id=request_id, helper_name="upload_file_via_context_request", ) return _attach_json_body(result, invalid_message="控制上传返回结果异常") async def upload_file_via_page_fetch( page: Page, url: str, *, filename: str, mime_type: str, data: bytes, field_name: str = "file", extra_fields: dict[str, str] | None = None, timeout_ms: int = 30000, request_id: str | None = None, ) -> dict[str, Any]: logger.info( "[fetch] helper=upload_file_via_page_fetch filename=%s mime=%s request_id=%s url=%s page.url=%s", filename, mime_type, request_id, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), ) try: result = await asyncio.wait_for( page.evaluate( PAGE_FETCH_MULTIPART_JS, { "url": url, "filename": filename, "mimeType": mime_type, "dataBase64": base64.b64encode(data).decode("ascii"), "fieldName": field_name, "extraFields": extra_fields or {}, "timeoutMs": timeout_ms, }, ), timeout=_evaluate_timeout_seconds(timeout_ms), ) except asyncio.TimeoutError as e: logger.warning( "[fetch] helper=upload_file_via_page_fetch request_id=%s evaluate timeout url=%s page.url=%s", request_id, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), ) raise BrowserResourceInvalidError( f"page.evaluate timeout after {_evaluate_timeout_seconds(timeout_ms):.1f}s", helper_name="upload_file_via_page_fetch", operation="page.evaluate", stage="evaluate_timeout", resource_hint="page", request_url=url, page_url=_safe_page_url(page), request_id=request_id, ) from e except Exception as e: classified = _classify_browser_resource_error( e, helper_name="upload_file_via_page_fetch", operation="page.evaluate", stage="evaluate", request_url=url, page=page, request_id=request_id, ) if classified is not None: raise classified from e raise return _attach_json_body(result, invalid_message="页面上传返回结果异常") async def stream_raw_via_context_request( context: BrowserContext, page: Page | None, url: str, body: str, request_id: str, *, on_http_error: Callable[[str, dict[str, str] | None], int | None] | None = None, on_headers: Callable[[dict[str, str]], None] | None = None, fetch_timeout: float = 90.0, body_timeout: float = 300.0, proxy_url: str | None = None, proxy_auth: tuple[str, str] | None = None, ) -> AsyncIterator[str]: """通过真实流式 HTTP client 发起 completion 请求,避免先读完整 body。""" del fetch_timeout async for chunk in _stream_via_http_client( context, page, url, body, request_id, on_http_error=on_http_error, on_headers=on_headers, read_timeout=body_timeout, proxy_url=proxy_url, proxy_auth=proxy_auth, ): yield chunk async def stream_raw_via_page_fetch( context: BrowserContext, page: Page, url: str, body: str, request_id: str, *, on_http_error: Callable[[str, dict[str, str] | None], int | None] | None = None, on_headers: Callable[[dict[str, str]], None] | None = None, error_state: dict[str, bool] | None = None, fetch_timeout: int = 90, read_timeout: float = 60.0, ) -> AsyncIterator[str]: """ 在浏览器内对 url 发起 POST body,流式回传原始字符串块(含 SSE 等)。 同一 page 多请求用 request_id 区分 binding,互不串数据。 通过 CDP Runtime.addBinding 注入 sendChunk_,用 Runtime.bindingCalled 接收。 收到 __headers__: 时解析 JSON 并调用 on_headers(headers);收到 __error__: 时调用 on_http_error(msg);收到 __done__ 结束。 """ chunk_queue: asyncio.Queue[str] = asyncio.Queue() binding_name = "sendChunk_" + request_id stream_phase = "cdp_setup" def on_binding_called(event: dict[str, Any]) -> None: name = event.get("name") payload = event.get("payload", "") if name == binding_name: chunk_queue.put_nowait( payload if isinstance(payload, str) else str(payload) ) def classify_stream_error( exc: Exception, *, stage: str, ) -> BrowserResourceInvalidError | None: return _classify_browser_resource_error( exc, helper_name="stream_raw_via_page_fetch", operation="stream", stage=stage, request_url=url, page=page, request_id=request_id, stream_phase=stream_phase, ) cdp = None fetch_task: asyncio.Task[None] | None = None try: try: cdp = await context.new_cdp_session(page) except Exception as e: classified = classify_stream_error(e, stage="new_cdp_session") if classified is not None: raise classified from e raise cdp.on("Runtime.bindingCalled", on_binding_called) try: await cdp.send("Runtime.addBinding", {"name": binding_name}) except Exception as e: classified = classify_stream_error(e, stage="add_binding") if classified is not None: raise classified from e raise logger.info( "[fetch] helper=stream_raw_via_page_fetch request_id=%s stage=page.evaluate url=%s page.url=%s", request_id, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), ) async def run_fetch() -> None: nonlocal stream_phase try: stream_phase = "page_evaluate" await asyncio.wait_for( page.evaluate( PAGE_FETCH_STREAM_JS, { "url": url, "body": body, "bindingName": binding_name, "timeoutMs": max(1, int(fetch_timeout * 1000)), }, ), timeout=max(float(fetch_timeout) + 5.0, 10.0), ) except asyncio.TimeoutError as e: logger.warning( "[fetch] helper=stream_raw_via_page_fetch request_id=%s stage=page.evaluate evaluate timeout url=%s page.url=%s", request_id, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), ) raise BrowserResourceInvalidError( f"page.evaluate timeout after {max(float(fetch_timeout) + 5.0, 10.0):.1f}s", helper_name="stream_raw_via_page_fetch", operation="stream", stage="evaluate_timeout", resource_hint="page", request_url=url, page_url=_safe_page_url(page), request_id=request_id, stream_phase=stream_phase, ) from e except Exception as e: classified = classify_stream_error(e, stage="page.evaluate") if classified is not None: raise classified from e raise fetch_task = asyncio.create_task(run_fetch()) try: headers = None while True: if fetch_task.done(): exc = fetch_task.exception() if exc is not None: raise exc try: chunk = await asyncio.wait_for( chunk_queue.get(), timeout=read_timeout ) except asyncio.TimeoutError as e: stream_phase = "body" logger.warning( "[fetch] helper=stream_raw_via_page_fetch request_id=%s stream_phase=%s read timeout url=%s page.url=%s", request_id, stream_phase, _truncate_for_log(url, 120), _truncate_for_log(_safe_page_url(page), 120), ) raise BrowserResourceInvalidError( f"stream read timeout after {read_timeout:.1f}s", helper_name="stream_raw_via_page_fetch", operation="stream", stage="read_timeout", resource_hint="page", request_url=url, page_url=_safe_page_url(page), request_id=request_id, stream_phase=stream_phase, ) from e if chunk == "__done__": break if chunk.startswith("__headers__:"): stream_phase = "headers" try: headers = json.loads(chunk[12:]) if on_headers and isinstance(headers, dict): on_headers({k: str(v) for k, v in headers.items()}) except (json.JSONDecodeError, TypeError) as e: logger.debug("[fetch] 解析 __headers__ 失败: %s", e) continue if chunk.startswith("__error__:"): msg = chunk[10:].strip() saw_terminal = bool(error_state and error_state.get("terminal")) stream_phase = "terminal_event" if saw_terminal else ("body" if headers else "before_headers") if on_http_error: unfreeze_at = on_http_error(msg, headers) if isinstance(unfreeze_at, int): logger.warning("[fetch] __error__ from page: %s", msg) raise AccountFrozenError(msg, unfreeze_at) classified = _classify_browser_resource_error( RuntimeError(msg), helper_name="stream_raw_via_page_fetch", operation="page_fetch_stream", stage="page_error_event", request_url=url, page=page, request_id=request_id, stream_phase=stream_phase, ) if classified is not None: raise classified if saw_terminal: logger.info( "[fetch] page fetch disconnected after terminal event request_id=%s stream_phase=%s: %s", request_id, stream_phase, msg, ) continue logger.warning( "[fetch] __error__ from page before terminal event request_id=%s stream_phase=%s: %s", request_id, stream_phase, msg, ) raise RuntimeError(msg) stream_phase = "body" yield chunk finally: if fetch_task is not None: done, pending = await asyncio.wait({fetch_task}, timeout=5.0) if pending: fetch_task.cancel() fetch_task.add_done_callback(_consume_background_task_result) else: try: fetch_task.result() except asyncio.CancelledError: pass except BrowserResourceInvalidError: pass finally: if cdp is not None: try: await asyncio.wait_for(cdp.detach(), timeout=2.0) except asyncio.TimeoutError: logger.warning( "[fetch] helper=stream_raw_via_page_fetch request_id=%s detach CDP session timeout page.url=%s", request_id, _truncate_for_log(_safe_page_url(page), 120), ) except Exception as e: logger.debug("detach CDP session 时异常: %s", e) def parse_sse_to_events(buffer: str, chunk: str) -> tuple[str, list[str]]: """ 把 chunk 追加到 buffer,按行拆出 data: 后的 payload 列表,返回 (剩余 buffer, payload 列表)。 接入方对每个 payload 自行 JSON 解析并抽取 text / message_id / error。 """ buffer += chunk lines = buffer.split("\n") buffer = lines[-1] payloads: list[str] = [] for line in lines[:-1]: line = line.strip() if not line.startswith("data: "): continue payload = line[6:].strip() if payload == "[DONE]" or not payload: continue payloads.append(payload) return (buffer, payloads) async def stream_completion_via_sse( context: BrowserContext, page: Page, url: str, body: str, parse_event: ParseSseEvent, request_id: str, *, on_http_error: Callable, is_terminal_event: Callable[[str], bool] | None = None, collect_message_id: list[str] | None = None, first_token_timeout: float = 30.0, transport: str = "page_fetch", transport_options: dict[str, Any] | None = None, ) -> AsyncIterator[str]: """ 在浏览器内 POST 拿到流,按 SSE 行拆成 data 事件,用 parse_event(payload) 解析每条; 逐块 yield 文本,可选把 message_id 收集到 collect_message_id。 parse_event(payload) 返回 (texts, message_id, error),error 非空时仅打 debug 日志不抛错。 """ buffer = "" stream_state: dict[str, bool] = {"terminal": False} saw_text = False loop = asyncio.get_running_loop() started_at = loop.time() opts = dict(transport_options or {}) if transport == "context_request": raw_stream = stream_raw_via_context_request( context, page, url, body, request_id, on_http_error=on_http_error, **opts, ) resource_hint = "transport" else: raw_stream = stream_raw_via_page_fetch( context, page, url, body, request_id, on_http_error=on_http_error, error_state=stream_state, ) resource_hint = "page" async for chunk in raw_stream: buffer, payloads = parse_sse_to_events(buffer, chunk) for payload in payloads: if is_terminal_event and is_terminal_event(payload): stream_state["terminal"] = True try: texts, message_id, error = parse_event(payload) except Exception as e: logger.debug("parse_stream_event 单条解析异常: %s", e) continue if error: logger.warning("SSE error from upstream: %s", error) raise RuntimeError(error) if message_id and collect_message_id is not None: collect_message_id.append(message_id) for t in texts: saw_text = True yield t if ( not saw_text and not stream_state["terminal"] and loop.time() - started_at >= first_token_timeout ): raise BrowserResourceInvalidError( f"no text token received within {first_token_timeout:.1f}s", helper_name="stream_completion_via_sse", operation="parse_stream", stage="first_token_timeout", resource_hint=resource_hint, request_url=url, page_url=_safe_page_url(page), request_id=request_id, stream_phase="before_first_text", )