web2api / core /plugin /helpers.py
ohmyapi's picture
feat: align hosted Space deployment with latest upstream
77169b4
"""
插件通用能力:页面复用、Cookie 登录、在浏览器内发起 fetch 并流式回传。
接入方只需实现站点特有的 URL/请求体/SSE 解析,其余复用此处逻辑。
"""
import asyncio
import base64
import codecs
import json
import logging
from collections.abc import Callable
from typing import Any, AsyncIterator
from urllib.parse import urlparse
from curl_cffi import requests as curl_requests
from playwright.async_api import BrowserContext, Page
from core.plugin.errors import AccountFrozenError, BrowserResourceInvalidError
ParseSseEvent = Callable[[str], tuple[list[str], str | None, str | None]]
logger = logging.getLogger(__name__)
_BROWSER_RESOURCE_ERROR_PATTERNS: tuple[tuple[str, str, str], ...] = (
("target crashed", "page", "target_crashed"),
("page crashed", "browser", "page_crashed"),
("execution context was destroyed", "page", "execution_context_destroyed"),
("navigating frame was detached", "page", "frame_detached"),
("frame was detached", "page", "frame_detached"),
("session closed. most likely the page has been closed", "page", "page_closed"),
("most likely the page has been closed", "page", "page_closed"),
("browser context has been closed", "page", "context_closed"),
("context has been closed", "page", "context_closed"),
("target page, context or browser has been closed", "page", "page_or_browser_closed"),
("page has been closed", "page", "page_closed"),
("target closed", "page", "target_closed"),
("browser has been closed", "browser", "browser_closed"),
("browser closed", "browser", "browser_closed"),
("connection closed", "browser", "browser_disconnected"),
("connection terminated", "browser", "browser_disconnected"),
("has been disconnected", "browser", "browser_disconnected"),
# Proxy / network tunnel errors — retryable via browser re-launch
("err_tunnel_connection_failed", "browser", "proxy_tunnel_failed"),
("err_proxy_connection_failed", "browser", "proxy_connection_failed"),
("err_connection_refused", "browser", "connection_refused"),
("err_connection_timed_out", "browser", "connection_timed_out"),
("err_connection_reset", "browser", "connection_reset"),
)
def _truncate_for_log(value: str, limit: int = 240) -> str:
if len(value) <= limit:
return value
return value[:limit] + "..."
def _safe_page_url(page: Page | None) -> str:
if page is None:
return ""
try:
return page.url or ""
except Exception:
return ""
def _evaluate_timeout_seconds(timeout_ms: int, grace_seconds: float = 5.0) -> float:
return max(5.0, float(timeout_ms) / 1000.0 + grace_seconds)
def _consume_background_task_result(task: asyncio.Task[Any]) -> None:
try:
if not task.cancelled():
task.exception()
except Exception:
pass
def _classify_browser_resource_error(
exc: Exception,
*,
helper_name: str,
operation: str,
stage: str,
request_url: str,
page: Page | None,
request_id: str | None = None,
stream_phase: str | None = None,
) -> BrowserResourceInvalidError | None:
message = str(exc).strip() or exc.__class__.__name__
normalized = message.lower()
for pattern, resource_hint, reason in _BROWSER_RESOURCE_ERROR_PATTERNS:
if pattern not in normalized:
continue
page_url = _safe_page_url(page)
logger.warning(
"[browser-resource-invalid] helper=%s operation=%s stage=%s reason=%s resource=%s request_id=%s stream_phase=%s request_url=%s page.url=%s err=%s",
helper_name,
operation,
stage,
reason,
resource_hint,
request_id,
stream_phase,
_truncate_for_log(request_url),
_truncate_for_log(page_url),
_truncate_for_log(message, 400),
)
return BrowserResourceInvalidError(
message,
helper_name=helper_name,
operation=operation,
stage=stage,
resource_hint=resource_hint,
request_url=request_url,
page_url=page_url,
request_id=request_id,
stream_phase=stream_phase,
)
return None
# 在页面内 POST 请求并流式回传:成功时逐块发送响应体,失败时发送 __error__: 前缀 + 信息,最后发送 __done__
# bindingName 按请求唯一,同一 page 多并发时互不串数据
PAGE_FETCH_STREAM_JS = """
async ({ url, body, bindingName, timeoutMs }) => {
const send = globalThis[bindingName];
const done = "__done__";
const errPrefix = "__error__:";
try {
const ctrl = new AbortController();
const effectiveTimeoutMs = timeoutMs || 90000;
const t = setTimeout(() => ctrl.abort(), effectiveTimeoutMs);
const resp = await fetch(url, {
method: "POST",
body: body,
headers: { "Content-Type": "application/json", "Accept": "text/event-stream" },
credentials: "include",
signal: ctrl.signal
});
clearTimeout(t);
if (!resp.ok) {
const errText = await resp.text();
const errSnippet = (errText && errText.length > 800) ? errText.slice(0, 800) + "..." : (errText || "");
await send(errPrefix + "HTTP " + resp.status + " " + errSnippet);
await send(done);
return;
}
if (!resp.body) {
await send(errPrefix + "No response body");
await send(done);
return;
}
const headersObj = {};
resp.headers.forEach((v, k) => { headersObj[k] = v; });
await send("__headers__:" + JSON.stringify(headersObj));
const reader = resp.body.getReader();
const dec = new TextDecoder();
while (true) {
const { done: streamDone, value } = await reader.read();
if (streamDone) break;
await send(dec.decode(value));
}
} catch (e) {
const msg = e.name === "AbortError" ? `请求超时(${Math.floor(effectiveTimeoutMs / 1000)}s)` : (e.message || String(e));
await send(errPrefix + msg);
}
await send(done);
}
"""
PAGE_FETCH_JSON_JS = """
async ({ url, method, body, headers, timeoutMs }) => {
const ctrl = new AbortController();
const t = setTimeout(() => ctrl.abort(), timeoutMs || 15000);
try {
const resp = await fetch(url, {
method: method || "GET",
body: body ?? undefined,
headers: headers || {},
credentials: "include",
signal: ctrl.signal
});
clearTimeout(t);
const text = await resp.text();
const headersObj = {};
resp.headers.forEach((v, k) => { headersObj[k] = v; });
return {
ok: resp.ok,
status: resp.status,
statusText: resp.statusText,
url: resp.url,
redirected: resp.redirected,
headers: headersObj,
text,
};
} catch (e) {
clearTimeout(t);
const msg = e.name === "AbortError" ? `请求超时(${Math.floor((timeoutMs || 15000) / 1000)}s)` : (e.message || String(e));
return { error: msg };
}
}
"""
PAGE_FETCH_MULTIPART_JS = """
async ({ url, filename, mimeType, dataBase64, fieldName, extraFields, timeoutMs }) => {
const ctrl = new AbortController();
const t = setTimeout(() => ctrl.abort(), timeoutMs || 30000);
try {
const binary = atob(dataBase64);
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < binary.length; i += 1) {
bytes[i] = binary.charCodeAt(i);
}
const form = new FormData();
if (extraFields) {
Object.entries(extraFields).forEach(([k, v]) => {
if (v !== undefined && v !== null) form.append(k, String(v));
});
}
const file = new File([bytes], filename, { type: mimeType || "application/octet-stream" });
form.append(fieldName || "file", file);
const resp = await fetch(url, {
method: "POST",
body: form,
credentials: "include",
signal: ctrl.signal
});
clearTimeout(t);
const text = await resp.text();
const headersObj = {};
resp.headers.forEach((v, k) => { headersObj[k] = v; });
return {
ok: resp.ok,
status: resp.status,
statusText: resp.statusText,
url: resp.url,
redirected: resp.redirected,
headers: headersObj,
text,
};
} catch (e) {
clearTimeout(t);
const msg = e.name === "AbortError" ? `请求超时(${Math.floor((timeoutMs || 30000) / 1000)}s)` : (e.message || String(e));
return { error: msg };
}
}
"""
async def ensure_page_for_site(
context: BrowserContext,
url_contains: str,
start_url: str,
*,
timeout: int = 45000,
) -> Page:
"""
若已有页面 URL 包含 url_contains 则复用,否则 new_page 并 goto start_url。
接入方只需提供「站点特征」和「入口 URL」。
"""
if context.pages:
for p in context.pages:
if url_contains in (p.url or ""):
return p
page = await context.new_page()
await page.goto(start_url, wait_until="commit", timeout=timeout)
return page
async def create_page_for_site(
context: BrowserContext,
start_url: str,
*,
reuse_page: Page | None = None,
timeout: int = 45000,
) -> Page:
"""
若传入 reuse_page 则在其上 goto start_url,否则 new_page 再 goto。
用于复用浏览器默认空白页或 page 池的初始化与补回。
"""
if reuse_page is not None:
await reuse_page.goto(start_url, wait_until="commit", timeout=timeout)
return reuse_page
page = await context.new_page()
await page.goto(start_url, wait_until="commit", timeout=timeout)
return page
def _cookie_domain_matches(cookie_domain: str, site_domain: str) -> bool:
"""判断 cookie 的 domain 是否属于站点 domain(如 .claude.ai 与 claude.ai 视为同一域)。"""
a = cookie_domain if cookie_domain.startswith(".") else f".{cookie_domain}"
b = site_domain if site_domain.startswith(".") else f".{site_domain}"
return a == b
def _cookie_to_set_param(c: Any) -> dict[str, str]:
"""将 context.cookies() 返回的项转为 add_cookies 接受的 SetCookieParam 格式。"""
return {
"name": c["name"],
"value": c["value"],
"domain": c.get("domain") or "",
"path": c.get("path") or "/",
}
async def clear_cookies_for_domain(
context: BrowserContext,
site_domain: str,
) -> None:
"""清除 context 内属于指定站点域的所有 cookie,保留其他域。"""
cookies = await context.cookies()
keep = [
c
for c in cookies
if not _cookie_domain_matches(c.get("domain", ""), site_domain)
]
await context.clear_cookies()
if keep:
await context.add_cookies([_cookie_to_set_param(c) for c in keep]) # type: ignore[arg-type]
logger.info(
"[auth] cleared cookies for domain=%s (kept %s cookies)", site_domain, len(keep)
)
async def clear_page_storage_for_switch(page: Page) -> None:
"""切号前清空当前页面的 localStorage(当前 origin)。"""
try:
await page.evaluate("() => { window.localStorage.clear(); }")
logger.info("[auth] cleared localStorage for switch")
except Exception as e:
logger.warning("[auth] clear localStorage failed (page may be detached): %s", e)
async def safe_page_reload(page: Page, url: str | None = None) -> None:
"""安全地 reload 或 goto(url),忽略因 ERR_ABORTED / frame detached 导致的异常。"""
try:
if url:
await page.goto(url, wait_until="commit", timeout=45000)
else:
await page.reload(wait_until="domcontentloaded", timeout=45000)
except Exception as e:
err_msg = str(e)
if "ERR_ABORTED" in err_msg or "detached" in err_msg.lower():
logger.warning(
"[auth] page.reload/goto 被中止或 frame 已分离: %s", err_msg[:200]
)
else:
raise
async def apply_cookie_auth(
context: BrowserContext,
page: Page,
auth: dict[str, Any],
cookie_name: str,
auth_keys: list[str],
domain: str,
*,
path: str = "/",
reload: bool = True,
) -> None:
"""
从 auth 中按 auth_keys 顺序取第一个非空值作为 cookie 值,写入 context 并可选 reload。
接入方只需提供 cookie 名、auth 里的 key 列表、域名。
仅写 cookie 不 reload 时,同 context 内的 fetch() 仍会带上 cookie;reload 仅在需要页面文档同步登录态时用。
"""
value = None
for k in auth_keys:
v = auth.get(k)
if v is not None and v != "":
value = str(v).strip()
if value:
break
if not value:
raise ValueError(f"auth 需包含以下其一且非空: {auth_keys}")
logger.info(
"[auth] context.add_cookies domain=%s name=%s reload=%s page.url=%s",
domain,
cookie_name,
reload,
page.url,
)
await context.add_cookies(
[
{
"name": cookie_name,
"value": value,
"domain": domain,
"path": path,
"secure": True,
"httpOnly": True,
}
]
)
if reload:
await safe_page_reload(page)
def _attach_json_body(result: dict[str, Any], *, invalid_message: str) -> dict[str, Any]:
if not isinstance(result, dict):
raise RuntimeError(invalid_message)
error = result.get("error")
if error:
raise RuntimeError(str(error))
text = result.get("text")
if isinstance(text, str) and text:
try:
result["json"] = json.loads(text)
except json.JSONDecodeError:
result["json"] = None
else:
result["json"] = None
return result
def _cookie_domain_matches_url(cookie_domain: str, target_url: str) -> bool:
host = (urlparse(target_url).hostname or "").lower().lstrip(".")
domain = (cookie_domain or "").lower().lstrip(".")
if not host or not domain:
return False
return host == domain or host.endswith(f".{domain}")
def _cookies_for_url(cookies: list[dict[str, Any]], target_url: str) -> dict[str, str]:
target_host = (urlparse(target_url).hostname or "").lower().lstrip(".")
if not target_host:
return {}
selected: dict[str, str] = {}
for cookie in cookies:
name = str(cookie.get("name") or "").strip()
value = str(cookie.get("value") or "")
domain = str(cookie.get("domain") or "").strip()
if not name or not _cookie_domain_matches_url(domain, target_url):
continue
selected[name] = value
return selected
async def _stream_via_http_client(
context: BrowserContext,
page: Page | None,
url: str,
body: str,
request_id: str,
*,
on_http_error: Callable[[str, dict[str, str] | None], int | None] | None = None,
on_headers: Callable[[dict[str, str]], None] | None = None,
connect_timeout: float = 30.0,
read_timeout: float = 300.0,
impersonate: str = "chrome142",
proxy_url: str | None = None,
proxy_auth: tuple[str, str] | None = None,
) -> AsyncIterator[str]:
logger.info(
"[fetch] helper=stream_raw_via_context_request request_id=%s stage=http_client url=%s page.url=%s",
request_id,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
)
parsed = urlparse(url)
referer = ""
if parsed.scheme and parsed.netloc:
referer = f"{parsed.scheme}://{parsed.netloc}/"
try:
cookies = await context.cookies([url])
except Exception as e:
classified = _classify_browser_resource_error(
e,
helper_name="stream_raw_via_context_request",
operation="context.cookies",
stage="load_cookies",
request_url=url,
page=page,
request_id=request_id,
stream_phase="fetch",
)
if classified is not None:
raise classified from e
raise BrowserResourceInvalidError(
str(e),
helper_name="stream_raw_via_context_request",
operation="context.cookies",
stage="load_cookies",
resource_hint="page",
request_url=url,
page_url=_safe_page_url(page),
request_id=request_id,
stream_phase="fetch",
) from e
cookie_jar = _cookies_for_url(cookies, url)
session_kwargs: dict[str, Any] = {
"impersonate": impersonate,
"timeout": (connect_timeout, read_timeout),
"verify": True,
"allow_redirects": True,
"default_headers": True,
}
if cookie_jar:
session_kwargs["cookies"] = cookie_jar
if proxy_url:
session_kwargs["proxy"] = proxy_url
if proxy_auth:
session_kwargs["proxy_auth"] = proxy_auth
response = None
try:
async with curl_requests.AsyncSession(**session_kwargs) as session:
try:
request_headers = {
"Content-Type": "application/json",
"Accept": "text/event-stream",
}
if referer:
request_headers["Origin"] = referer.rstrip("/")
async with session.stream(
"POST",
url,
data=body.encode("utf-8"),
headers=request_headers,
) as response:
headers = {
str(k).lower(): str(v) for k, v in response.headers.items()
}
if on_headers:
on_headers(headers)
status = int(response.status_code)
if status < 200 or status >= 300:
body_parts: list[str] = []
decoder = codecs.getincrementaldecoder("utf-8")("replace")
async for chunk in response.aiter_content():
if not chunk:
continue
body_parts.append(decoder.decode(chunk))
if sum(len(part) for part in body_parts) >= 800:
break
body_parts.append(decoder.decode(b"", final=True))
snippet = "".join(body_parts)
if len(snippet) > 800:
snippet = snippet[:800] + "..."
msg = f"HTTP {status} {snippet}".strip()
if on_http_error:
unfreeze_at = on_http_error(msg, headers)
if isinstance(unfreeze_at, int):
logger.warning("[fetch] HTTP error from context request: %s", msg)
raise AccountFrozenError(msg, unfreeze_at)
raise RuntimeError(msg)
decoder = codecs.getincrementaldecoder("utf-8")("replace")
async for chunk in response.aiter_content():
if not chunk:
continue
text = decoder.decode(chunk)
if text:
yield text
tail = decoder.decode(b"", final=True)
if tail:
yield tail
except Exception as e:
classified = _classify_browser_resource_error(
e,
helper_name="stream_raw_via_context_request",
operation="http_client",
stage="stream",
request_url=url,
page=page,
request_id=request_id,
stream_phase="body",
)
if classified is not None:
raise classified from e
raise BrowserResourceInvalidError(
str(e),
helper_name="stream_raw_via_context_request",
operation="http_client",
stage="stream",
resource_hint="transport",
request_url=url,
page_url=_safe_page_url(page),
request_id=request_id,
stream_phase="body",
) from e
except AccountFrozenError:
raise
except BrowserResourceInvalidError:
raise
except Exception as e:
classified = _classify_browser_resource_error(
e,
helper_name="stream_raw_via_context_request",
operation="http_client",
stage="request",
request_url=url,
page=page,
request_id=request_id,
stream_phase="fetch",
)
if classified is not None:
raise classified from e
logger.warning(
"[fetch] helper=stream_raw_via_context_request request_id=%s http_client failed url=%s page.url=%s err=%s",
request_id,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
_truncate_for_log(str(e), 400),
)
raise BrowserResourceInvalidError(
str(e),
helper_name="stream_raw_via_context_request",
operation="http_client",
stage="request",
resource_hint="transport",
request_url=url,
page_url=_safe_page_url(page),
request_id=request_id,
stream_phase="fetch",
) from e
async def _request_via_context_request(
context: BrowserContext,
page: Page | None,
url: str,
*,
method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
multipart: dict[str, Any] | None = None,
timeout_ms: int = 15000,
request_id: str | None = None,
helper_name: str,
) -> dict[str, Any]:
logger.info(
"[fetch] helper=%s method=%s request_id=%s url=%s page.url=%s",
helper_name,
method,
request_id,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
)
response = None
try:
response = await context.request.fetch(
url,
method=method,
headers=headers or None,
data=body,
multipart=multipart,
timeout=timeout_ms,
fail_on_status_code=False,
)
text = await response.text()
return {
"ok": bool(response.ok),
"status": int(response.status),
"statusText": str(response.status_text),
"url": str(response.url),
"redirected": str(response.url) != url,
"headers": {str(k): str(v) for k, v in response.headers.items()},
"text": text,
}
except Exception as e:
classified = _classify_browser_resource_error(
e,
helper_name=helper_name,
operation="context.request",
stage="fetch",
request_url=url,
page=page,
request_id=request_id,
)
if classified is not None:
raise classified from e
logger.warning(
"[fetch] helper=%s request_id=%s context.request failed url=%s page.url=%s err=%s",
helper_name,
request_id,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
_truncate_for_log(str(e), 400),
)
raise RuntimeError(str(e)) from e
finally:
if response is not None:
try:
await response.dispose()
except Exception:
pass
async def request_json_via_context_request(
context: BrowserContext,
page: Page | None,
url: str,
*,
method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
timeout_ms: int = 15000,
request_id: str | None = None,
) -> dict[str, Any]:
result = await _request_via_context_request(
context,
page,
url,
method=method,
body=body,
headers=headers,
timeout_ms=timeout_ms,
request_id=request_id,
helper_name="request_json_via_context_request",
)
return _attach_json_body(result, invalid_message="控制请求返回结果异常")
async def request_json_via_page_fetch(
page: Page,
url: str,
*,
method: str = "GET",
body: str | None = None,
headers: dict[str, str] | None = None,
timeout_ms: int = 15000,
request_id: str | None = None,
) -> dict[str, Any]:
"""
在页面内发起非流式 fetch,请求结果按 JSON 优先解析返回。
这样能复用浏览器真实网络栈、cookie 与代理扩展能力。
"""
logger.info(
"[fetch] helper=request_json_via_page_fetch method=%s request_id=%s url=%s page.url=%s",
method,
request_id,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
)
try:
result = await asyncio.wait_for(
page.evaluate(
PAGE_FETCH_JSON_JS,
{
"url": url,
"method": method,
"body": body,
"headers": headers or {},
"timeoutMs": timeout_ms,
},
),
timeout=_evaluate_timeout_seconds(timeout_ms),
)
except asyncio.TimeoutError as e:
logger.warning(
"[fetch] helper=request_json_via_page_fetch request_id=%s evaluate timeout url=%s page.url=%s",
request_id,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
)
raise BrowserResourceInvalidError(
f"page.evaluate timeout after {_evaluate_timeout_seconds(timeout_ms):.1f}s",
helper_name="request_json_via_page_fetch",
operation="page.evaluate",
stage="evaluate_timeout",
resource_hint="page",
request_url=url,
page_url=_safe_page_url(page),
request_id=request_id,
) from e
except Exception as e:
classified = _classify_browser_resource_error(
e,
helper_name="request_json_via_page_fetch",
operation="page.evaluate",
stage="evaluate",
request_url=url,
page=page,
request_id=request_id,
)
if classified is not None:
raise classified from e
raise
return _attach_json_body(result, invalid_message="页面 fetch 返回结果异常")
async def upload_file_via_context_request(
context: BrowserContext,
page: Page | None,
url: str,
*,
filename: str,
mime_type: str,
data: bytes,
field_name: str = "file",
extra_fields: dict[str, str] | None = None,
timeout_ms: int = 30000,
request_id: str | None = None,
) -> dict[str, Any]:
multipart: dict[str, Any] = dict(extra_fields or {})
multipart[field_name] = {
"name": filename,
"mimeType": mime_type or "application/octet-stream",
"buffer": data,
}
result = await _request_via_context_request(
context,
page,
url,
method="POST",
multipart=multipart,
timeout_ms=timeout_ms,
request_id=request_id,
helper_name="upload_file_via_context_request",
)
return _attach_json_body(result, invalid_message="控制上传返回结果异常")
async def upload_file_via_page_fetch(
page: Page,
url: str,
*,
filename: str,
mime_type: str,
data: bytes,
field_name: str = "file",
extra_fields: dict[str, str] | None = None,
timeout_ms: int = 30000,
request_id: str | None = None,
) -> dict[str, Any]:
logger.info(
"[fetch] helper=upload_file_via_page_fetch filename=%s mime=%s request_id=%s url=%s page.url=%s",
filename,
mime_type,
request_id,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
)
try:
result = await asyncio.wait_for(
page.evaluate(
PAGE_FETCH_MULTIPART_JS,
{
"url": url,
"filename": filename,
"mimeType": mime_type,
"dataBase64": base64.b64encode(data).decode("ascii"),
"fieldName": field_name,
"extraFields": extra_fields or {},
"timeoutMs": timeout_ms,
},
),
timeout=_evaluate_timeout_seconds(timeout_ms),
)
except asyncio.TimeoutError as e:
logger.warning(
"[fetch] helper=upload_file_via_page_fetch request_id=%s evaluate timeout url=%s page.url=%s",
request_id,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
)
raise BrowserResourceInvalidError(
f"page.evaluate timeout after {_evaluate_timeout_seconds(timeout_ms):.1f}s",
helper_name="upload_file_via_page_fetch",
operation="page.evaluate",
stage="evaluate_timeout",
resource_hint="page",
request_url=url,
page_url=_safe_page_url(page),
request_id=request_id,
) from e
except Exception as e:
classified = _classify_browser_resource_error(
e,
helper_name="upload_file_via_page_fetch",
operation="page.evaluate",
stage="evaluate",
request_url=url,
page=page,
request_id=request_id,
)
if classified is not None:
raise classified from e
raise
return _attach_json_body(result, invalid_message="页面上传返回结果异常")
async def stream_raw_via_context_request(
context: BrowserContext,
page: Page | None,
url: str,
body: str,
request_id: str,
*,
on_http_error: Callable[[str, dict[str, str] | None], int | None] | None = None,
on_headers: Callable[[dict[str, str]], None] | None = None,
fetch_timeout: float = 90.0,
body_timeout: float = 300.0,
proxy_url: str | None = None,
proxy_auth: tuple[str, str] | None = None,
) -> AsyncIterator[str]:
"""通过真实流式 HTTP client 发起 completion 请求,避免先读完整 body。"""
del fetch_timeout
async for chunk in _stream_via_http_client(
context,
page,
url,
body,
request_id,
on_http_error=on_http_error,
on_headers=on_headers,
read_timeout=body_timeout,
proxy_url=proxy_url,
proxy_auth=proxy_auth,
):
yield chunk
async def stream_raw_via_page_fetch(
context: BrowserContext,
page: Page,
url: str,
body: str,
request_id: str,
*,
on_http_error: Callable[[str, dict[str, str] | None], int | None] | None = None,
on_headers: Callable[[dict[str, str]], None] | None = None,
error_state: dict[str, bool] | None = None,
fetch_timeout: int = 90,
read_timeout: float = 60.0,
) -> AsyncIterator[str]:
"""
在浏览器内对 url 发起 POST body,流式回传原始字符串块(含 SSE 等)。
同一 page 多请求用 request_id 区分 binding,互不串数据。
通过 CDP Runtime.addBinding 注入 sendChunk_<request_id>,用 Runtime.bindingCalled 接收。
收到 __headers__: 时解析 JSON 并调用 on_headers(headers);收到 __error__: 时调用 on_http_error(msg);收到 __done__ 结束。
"""
chunk_queue: asyncio.Queue[str] = asyncio.Queue()
binding_name = "sendChunk_" + request_id
stream_phase = "cdp_setup"
def on_binding_called(event: dict[str, Any]) -> None:
name = event.get("name")
payload = event.get("payload", "")
if name == binding_name:
chunk_queue.put_nowait(
payload if isinstance(payload, str) else str(payload)
)
def classify_stream_error(
exc: Exception,
*,
stage: str,
) -> BrowserResourceInvalidError | None:
return _classify_browser_resource_error(
exc,
helper_name="stream_raw_via_page_fetch",
operation="stream",
stage=stage,
request_url=url,
page=page,
request_id=request_id,
stream_phase=stream_phase,
)
cdp = None
fetch_task: asyncio.Task[None] | None = None
try:
try:
cdp = await context.new_cdp_session(page)
except Exception as e:
classified = classify_stream_error(e, stage="new_cdp_session")
if classified is not None:
raise classified from e
raise
cdp.on("Runtime.bindingCalled", on_binding_called)
try:
await cdp.send("Runtime.addBinding", {"name": binding_name})
except Exception as e:
classified = classify_stream_error(e, stage="add_binding")
if classified is not None:
raise classified from e
raise
logger.info(
"[fetch] helper=stream_raw_via_page_fetch request_id=%s stage=page.evaluate url=%s page.url=%s",
request_id,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
)
async def run_fetch() -> None:
nonlocal stream_phase
try:
stream_phase = "page_evaluate"
await asyncio.wait_for(
page.evaluate(
PAGE_FETCH_STREAM_JS,
{
"url": url,
"body": body,
"bindingName": binding_name,
"timeoutMs": max(1, int(fetch_timeout * 1000)),
},
),
timeout=max(float(fetch_timeout) + 5.0, 10.0),
)
except asyncio.TimeoutError as e:
logger.warning(
"[fetch] helper=stream_raw_via_page_fetch request_id=%s stage=page.evaluate evaluate timeout url=%s page.url=%s",
request_id,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
)
raise BrowserResourceInvalidError(
f"page.evaluate timeout after {max(float(fetch_timeout) + 5.0, 10.0):.1f}s",
helper_name="stream_raw_via_page_fetch",
operation="stream",
stage="evaluate_timeout",
resource_hint="page",
request_url=url,
page_url=_safe_page_url(page),
request_id=request_id,
stream_phase=stream_phase,
) from e
except Exception as e:
classified = classify_stream_error(e, stage="page.evaluate")
if classified is not None:
raise classified from e
raise
fetch_task = asyncio.create_task(run_fetch())
try:
headers = None
while True:
if fetch_task.done():
exc = fetch_task.exception()
if exc is not None:
raise exc
try:
chunk = await asyncio.wait_for(
chunk_queue.get(), timeout=read_timeout
)
except asyncio.TimeoutError as e:
stream_phase = "body"
logger.warning(
"[fetch] helper=stream_raw_via_page_fetch request_id=%s stream_phase=%s read timeout url=%s page.url=%s",
request_id,
stream_phase,
_truncate_for_log(url, 120),
_truncate_for_log(_safe_page_url(page), 120),
)
raise BrowserResourceInvalidError(
f"stream read timeout after {read_timeout:.1f}s",
helper_name="stream_raw_via_page_fetch",
operation="stream",
stage="read_timeout",
resource_hint="page",
request_url=url,
page_url=_safe_page_url(page),
request_id=request_id,
stream_phase=stream_phase,
) from e
if chunk == "__done__":
break
if chunk.startswith("__headers__:"):
stream_phase = "headers"
try:
headers = json.loads(chunk[12:])
if on_headers and isinstance(headers, dict):
on_headers({k: str(v) for k, v in headers.items()})
except (json.JSONDecodeError, TypeError) as e:
logger.debug("[fetch] 解析 __headers__ 失败: %s", e)
continue
if chunk.startswith("__error__:"):
msg = chunk[10:].strip()
saw_terminal = bool(error_state and error_state.get("terminal"))
stream_phase = "terminal_event" if saw_terminal else ("body" if headers else "before_headers")
if on_http_error:
unfreeze_at = on_http_error(msg, headers)
if isinstance(unfreeze_at, int):
logger.warning("[fetch] __error__ from page: %s", msg)
raise AccountFrozenError(msg, unfreeze_at)
classified = _classify_browser_resource_error(
RuntimeError(msg),
helper_name="stream_raw_via_page_fetch",
operation="page_fetch_stream",
stage="page_error_event",
request_url=url,
page=page,
request_id=request_id,
stream_phase=stream_phase,
)
if classified is not None:
raise classified
if saw_terminal:
logger.info(
"[fetch] page fetch disconnected after terminal event request_id=%s stream_phase=%s: %s",
request_id,
stream_phase,
msg,
)
continue
logger.warning(
"[fetch] __error__ from page before terminal event request_id=%s stream_phase=%s: %s",
request_id,
stream_phase,
msg,
)
raise RuntimeError(msg)
stream_phase = "body"
yield chunk
finally:
if fetch_task is not None:
done, pending = await asyncio.wait({fetch_task}, timeout=5.0)
if pending:
fetch_task.cancel()
fetch_task.add_done_callback(_consume_background_task_result)
else:
try:
fetch_task.result()
except asyncio.CancelledError:
pass
except BrowserResourceInvalidError:
pass
finally:
if cdp is not None:
try:
await asyncio.wait_for(cdp.detach(), timeout=2.0)
except asyncio.TimeoutError:
logger.warning(
"[fetch] helper=stream_raw_via_page_fetch request_id=%s detach CDP session timeout page.url=%s",
request_id,
_truncate_for_log(_safe_page_url(page), 120),
)
except Exception as e:
logger.debug("detach CDP session 时异常: %s", e)
def parse_sse_to_events(buffer: str, chunk: str) -> tuple[str, list[str]]:
"""
把 chunk 追加到 buffer,按行拆出 data: 后的 payload 列表,返回 (剩余 buffer, payload 列表)。
接入方对每个 payload 自行 JSON 解析并抽取 text / message_id / error。
"""
buffer += chunk
lines = buffer.split("\n")
buffer = lines[-1]
payloads: list[str] = []
for line in lines[:-1]:
line = line.strip()
if not line.startswith("data: "):
continue
payload = line[6:].strip()
if payload == "[DONE]" or not payload:
continue
payloads.append(payload)
return (buffer, payloads)
async def stream_completion_via_sse(
context: BrowserContext,
page: Page,
url: str,
body: str,
parse_event: ParseSseEvent,
request_id: str,
*,
on_http_error: Callable,
is_terminal_event: Callable[[str], bool] | None = None,
collect_message_id: list[str] | None = None,
first_token_timeout: float = 30.0,
transport: str = "page_fetch",
transport_options: dict[str, Any] | None = None,
) -> AsyncIterator[str]:
"""
在浏览器内 POST 拿到流,按 SSE 行拆成 data 事件,用 parse_event(payload) 解析每条;
逐块 yield 文本,可选把 message_id 收集到 collect_message_id。
parse_event(payload) 返回 (texts, message_id, error),error 非空时仅打 debug 日志不抛错。
"""
buffer = ""
stream_state: dict[str, bool] = {"terminal": False}
saw_text = False
loop = asyncio.get_running_loop()
started_at = loop.time()
opts = dict(transport_options or {})
if transport == "context_request":
raw_stream = stream_raw_via_context_request(
context,
page,
url,
body,
request_id,
on_http_error=on_http_error,
**opts,
)
resource_hint = "transport"
else:
raw_stream = stream_raw_via_page_fetch(
context,
page,
url,
body,
request_id,
on_http_error=on_http_error,
error_state=stream_state,
)
resource_hint = "page"
async for chunk in raw_stream:
buffer, payloads = parse_sse_to_events(buffer, chunk)
for payload in payloads:
if is_terminal_event and is_terminal_event(payload):
stream_state["terminal"] = True
try:
texts, message_id, error = parse_event(payload)
except Exception as e:
logger.debug("parse_stream_event 单条解析异常: %s", e)
continue
if error:
logger.warning("SSE error from upstream: %s", error)
raise RuntimeError(error)
if message_id and collect_message_id is not None:
collect_message_id.append(message_id)
for t in texts:
saw_text = True
yield t
if (
not saw_text
and not stream_state["terminal"]
and loop.time() - started_at >= first_token_timeout
):
raise BrowserResourceInvalidError(
f"no text token received within {first_token_timeout:.1f}s",
helper_name="stream_completion_via_sse",
operation="parse_stream",
stage="first_token_timeout",
resource_hint=resource_hint,
request_url=url,
page_url=_safe_page_url(page),
request_id=request_id,
stream_phase="before_first_text",
)