web2api / core /plugin /claude.py
ohmyapi's picture
feat: align hosted Space deployment with latest upstream
77169b4
"""
Claude 插件:仅实现站点特有的上下文获取、会话创建、请求体构建、SSE 解析和限流处理。
其余编排逻辑(create_page / apply_auth / stream_completion 流程)全部由 BaseSitePlugin 完成。
调试时可在 config.yaml 的 claude.start_url、claude.api_base 指向 mock。
"""
import datetime
import json
import logging
import re
import time
from asyncio import Lock
from typing import Any
from urllib.parse import urlparse
from playwright.async_api import BrowserContext, Page
from core.api.schemas import InputAttachment
from core.constants import TIMEZONE
from core.plugin.base import BaseSitePlugin, PluginRegistry, SiteConfig
from core.plugin.errors import BrowserResourceInvalidError
from core.plugin.helpers import (
_classify_browser_resource_error,
clear_cookies_for_domain,
clear_page_storage_for_switch,
request_json_via_context_request,
safe_page_reload,
upload_file_via_context_request,
)
logger = logging.getLogger(__name__)
# Probe cache: skip redundant ensure_request_ready probes within this window.
_PROBE_CACHE_TTL_SECONDS = 60.0
def _truncate_url_for_log(value: str, limit: int = 200) -> str:
if len(value) <= limit:
return value
return value[:limit] + "..."
def _safe_page_url(page: Page) -> str:
try:
return page.url or ""
except Exception:
return ""
# ---------------------------------------------------------------------------
# 站点特有:请求体 & SSE 解析
# ---------------------------------------------------------------------------
def _is_thinking_model(public_model: str) -> bool:
"""Any model ending with -thinking enables extended thinking (paprika_mode)."""
return public_model.endswith("-thinking")
def _base_upstream_model(public_model: str) -> str:
"""Strip -thinking suffix to get the upstream model ID for Claude Web."""
return public_model.removesuffix("-thinking")
def _default_completion_body(
message: str,
*,
is_follow_up: bool = False,
timezone: str = TIMEZONE,
public_model: str = "",
) -> dict[str, Any]:
"""构建 Claude completion 请求体。续写时不带 create_conversation_params,否则 API 返回 400。"""
body: dict[str, Any] = {
"prompt": message,
"timezone": timezone,
"personalized_styles": [
{
"type": "default",
"key": "Default",
"name": "Normal",
"nameKey": "normal_style_name",
"prompt": "Normal\n",
"summary": "Default responses from Claude",
"summaryKey": "normal_style_summary",
"isDefault": True,
}
],
"locale": "en-US",
"tools": [
{"type": "web_search_v0", "name": "web_search"},
{"type": "artifacts_v0", "name": "artifacts"},
{"type": "repl_v0", "name": "repl"},
{"type": "widget", "name": "weather_fetch"},
{"type": "widget", "name": "recipe_display_v0"},
{"type": "widget", "name": "places_map_display_v0"},
{"type": "widget", "name": "message_compose_v1"},
{"type": "widget", "name": "ask_user_input_v0"},
{"type": "widget", "name": "places_search"},
{"type": "widget", "name": "fetch_sports_data"},
],
"attachments": [],
"files": [],
"sync_sources": [],
"rendering_mode": "messages",
}
if _is_thinking_model(public_model):
body["model"] = _base_upstream_model(public_model)
if not is_follow_up:
body["create_conversation_params"] = {
"name": "",
"include_conversation_preferences": True,
"is_temporary": False,
}
if _is_thinking_model(public_model):
body["create_conversation_params"]["paprika_mode"] = "extended"
return body
def _parse_one_sse_event(payload: str) -> tuple[list[str], str | None, str | None]:
"""解析单条 Claude SSE data 行,返回 (texts, message_id, error)。"""
result: list[str] = []
message_id: str | None = None
error_message: str | None = None
try:
obj = json.loads(payload)
if not isinstance(obj, dict):
return (result, message_id, error_message)
kind = obj.get("type")
if kind == "error":
err = obj.get("error") or {}
error_message = err.get("message") or err.get("type") or "Unknown error"
return (result, message_id, error_message)
if "text" in obj and obj.get("text"):
result.append(str(obj["text"]))
elif kind == "content_block_delta":
delta = obj.get("delta")
if isinstance(delta, dict) and "text" in delta:
result.append(str(delta["text"]))
elif isinstance(delta, str) and delta:
result.append(delta)
elif kind == "message_start":
msg = obj.get("message")
if isinstance(msg, dict):
for key in ("uuid", "id"):
if msg.get(key):
message_id = str(msg[key])
break
if not message_id:
mid = (
obj.get("message_uuid") or obj.get("uuid") or obj.get("message_id")
)
if mid:
message_id = str(mid)
elif (
kind
and kind
not in (
"ping",
"content_block_start",
"content_block_stop",
"message_stop",
"message_delta",
"message_limit",
)
and not result
):
logger.debug(
"SSE 未解析出正文 type=%s payload=%s",
kind,
payload[:200] if len(payload) > 200 else payload,
)
except json.JSONDecodeError:
pass
return (result, message_id, error_message)
def _is_terminal_sse_event(payload: str) -> bool:
"""Claude 正常流结束时会发送 message_stop。"""
try:
obj = json.loads(payload)
except json.JSONDecodeError:
return False
return isinstance(obj, dict) and obj.get("type") == "message_stop"
# ---------------------------------------------------------------------------
# ClaudePlugin — 只需声明配置 + 实现 5 个 hook
# ---------------------------------------------------------------------------
class ClaudePlugin(BaseSitePlugin):
"""Claude Web2API plugin. auth must include sessionKey."""
type_name = "claude"
DEFAULT_MODEL_MAPPING = {
"claude-sonnet-4-6": "claude-sonnet-4-6",
"claude-sonnet-4-5": "claude-sonnet-4-5",
"claude-sonnet-4-5-thinking": "claude-sonnet-4-5-thinking",
"claude-sonnet-4-6-thinking": "claude-sonnet-4-6-thinking",
"claude-haiku-4-5": "claude-haiku-4-5",
"claude-haiku-4-5-thinking": "claude-haiku-4-5-thinking",
"claude-opus-4-6": "claude-opus-4-6",
"claude-opus-4-6-thinking": "claude-opus-4-6-thinking",
}
# Models that require a Claude Pro subscription.
PRO_MODELS = frozenset({
"claude-haiku-4-5",
"claude-haiku-4-5-thinking",
"claude-opus-4-6",
"claude-opus-4-6-thinking",
})
MODEL_ALIASES = {
"s4": "claude-sonnet-4-6",
# dot-notation aliases (e.g. 4.6 / 4.5) → canonical dash form
"claude-sonnet-4.6": "claude-sonnet-4-6",
"claude-sonnet-4.5": "claude-sonnet-4-5",
"claude-opus-4.6": "claude-opus-4-6",
"claude-haiku-4.5": "claude-haiku-4-5",
# thinking variants
"claude-sonnet-4.6-thinking": "claude-sonnet-4-6-thinking",
"claude-sonnet-4.5-thinking": "claude-sonnet-4-5-thinking",
"claude-opus-4.6-thinking": "claude-opus-4-6-thinking",
"claude-haiku-4.5-thinking": "claude-haiku-4-5-thinking",
}
site = SiteConfig(
start_url="https://claude.ai/login",
api_base="https://claude.ai/api",
cookie_name="sessionKey",
cookie_domain=".claude.ai",
auth_keys=["sessionKey", "session_key"],
config_section="claude",
)
def __init__(self) -> None:
super().__init__()
# Per-page probe cache: page id -> last successful probe timestamp
self._probe_ok_at: dict[int, float] = {}
# Per-page navigation lock: prevents concurrent page.goto/reload
self._nav_locks: dict[int, Lock] = {}
# Per-page site_context cache: page id -> (context_dict, timestamp)
self._site_context_cache: dict[int, tuple[dict[str, Any], float]] = {}
_SITE_CONTEXT_TTL = 300.0 # 5 minutes
def model_mapping(self) -> dict[str, str] | None:
configured = super().model_mapping() or {}
mapping = dict(self.DEFAULT_MODEL_MAPPING)
mapping.update(configured)
for alias, upstream_model in self.MODEL_ALIASES.items():
mapping.setdefault(alias, upstream_model)
return mapping
def listed_model_mapping(self) -> dict[str, str]:
configured = super().model_mapping() or {}
mapping = dict(self.DEFAULT_MODEL_MAPPING)
mapping.update(configured)
for alias in self.MODEL_ALIASES:
mapping.pop(alias, None)
return mapping
async def apply_auth(
self,
context: BrowserContext,
page: Page,
auth: dict[str, Any],
*,
reload: bool = True,
) -> None:
await clear_cookies_for_domain(context, self.site.cookie_domain)
await clear_page_storage_for_switch(page)
await super().apply_auth(context, page, auth, reload=False)
if reload:
await safe_page_reload(page, url=self.start_url)
def _is_claude_domain(self, url: str) -> bool:
host = (urlparse(url).hostname or "").lower().lstrip(".")
if not host:
return False
allowed_hosts = {"claude.ai", "claude.com"}
for configured_url in (self.start_url, self.api_base):
configured_host = (urlparse(configured_url).hostname or "").lower().lstrip(".")
if configured_host:
allowed_hosts.add(configured_host)
return any(host == allowed or host.endswith(f".{allowed}") for allowed in allowed_hosts)
def _suspicious_page_reason(self, url: str) -> str | None:
if not url:
return "empty_page_url"
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
return "invalid_page_url"
if not self._is_claude_domain(url):
return "non_claude_domain"
path = parsed.path or "/"
if path == "/new" or path.startswith("/new/"):
return "new_chat_page"
if path in {"/logout", "/auth", "/signed-out"}:
return "logout_page"
if path.startswith("/signup"):
return "signup_page"
if path == "/app-unavailable-in-region" or path.startswith(
"/app-unavailable-in-region/"
):
return "app_unavailable_in_region"
return None
def _is_suspicious_page_url(self, url: str) -> bool:
return self._suspicious_page_reason(url) is not None
async def _probe_request_ready(
self,
context: BrowserContext,
page: Page,
*,
request_id: str,
) -> tuple[bool, str | None]:
current_url = _safe_page_url(page)
suspicious_reason = self._suspicious_page_reason(current_url)
if suspicious_reason is not None:
logger.warning(
"[%s] request-ready probe sees suspicious page url request_id=%s reason=%s page.url=%s",
self.type_name,
request_id,
suspicious_reason,
_truncate_url_for_log(current_url),
)
return (False, suspicious_reason)
try:
site_context = await self.fetch_site_context(
context,
page,
request_id=request_id,
)
except BrowserResourceInvalidError:
raise
except Exception as e:
logger.warning(
"[%s] request-ready probe failed request_id=%s page.url=%s err=%s",
self.type_name,
request_id,
_truncate_url_for_log(current_url),
str(e)[:240],
)
return (False, f"control_probe_error:{str(e)[:120]}")
return (site_context is not None, None if site_context is not None else "account_probe_empty")
async def ensure_request_ready(
self,
context: BrowserContext,
page: Page,
*,
request_id: str = "",
session_id: str | None = None,
phase: str = "",
account_id: str = "",
) -> None:
initial_url = _safe_page_url(page)
current_url = initial_url
probe_request_id = request_id or f"ready:{phase or 'request'}"
action = "none"
probe_before = False
probe_after = False
probe_reason: str | None = None
page_id = id(page)
# Fast path (lock-free): page URL is clean and probe succeeded recently.
suspicious_reason = self._suspicious_page_reason(current_url)
if suspicious_reason is None:
last_ok = self._probe_ok_at.get(page_id, 0.0)
if (time.time() - last_ok) < _PROBE_CACHE_TTL_SECONDS:
return
if suspicious_reason == "app_unavailable_in_region":
raise RuntimeError(
"Claude page is app-unavailable-in-region; the runtime IP or region cannot reach Claude Web"
)
# Slow path: acquire per-page nav lock to prevent concurrent navigation.
nav_lock = self._nav_locks.setdefault(page_id, Lock())
async with nav_lock:
# Re-check after acquiring lock — another request may have fixed the page.
current_url = _safe_page_url(page)
suspicious_reason = self._suspicious_page_reason(current_url)
if suspicious_reason is None:
last_ok = self._probe_ok_at.get(page_id, 0.0)
if (time.time() - last_ok) < _PROBE_CACHE_TTL_SECONDS:
return
if suspicious_reason == "app_unavailable_in_region":
raise RuntimeError(
"Claude page is app-unavailable-in-region; the runtime IP or region cannot reach Claude Web"
)
try:
if suspicious_reason is not None:
action = "goto"
try:
await safe_page_reload(page, url=self.start_url)
except Exception as e:
classified = _classify_browser_resource_error(
e,
helper_name="claude.ensure_request_ready",
operation="preflight",
stage="goto_start_url",
request_url=self.start_url,
page=page,
request_id=request_id or None,
stream_phase=phase or None,
)
if classified is not None:
raise classified from e
raise
current_url = _safe_page_url(page)
suspicious_reason = self._suspicious_page_reason(current_url)
if suspicious_reason == "app_unavailable_in_region":
probe_reason = suspicious_reason
raise RuntimeError(
"Claude page is app-unavailable-in-region after goto; the runtime IP or region cannot reach Claude Web"
)
probe_before = self._suspicious_page_reason(current_url) is None
if probe_before:
probe_after, probe_reason = await self._probe_request_ready(
context,
page,
request_id=f"{probe_request_id}:initial",
)
if probe_after:
self._probe_ok_at[page_id] = time.time()
return
if probe_reason == "app_unavailable_in_region":
raise RuntimeError(
"Claude page is app-unavailable-in-region during control probe; the runtime IP or region cannot reach Claude Web"
)
else:
probe_after = False
probe_reason = suspicious_reason or "suspicious_page_url"
action = "reload"
try:
await safe_page_reload(page)
except Exception as e:
classified = _classify_browser_resource_error(
e,
helper_name="claude.ensure_request_ready",
operation="preflight",
stage="reload",
request_url=current_url or self.start_url,
page=page,
request_id=request_id or None,
stream_phase=phase or None,
)
if classified is not None:
raise classified from e
raise
current_url = _safe_page_url(page)
if self._suspicious_page_reason(current_url) == "app_unavailable_in_region":
probe_reason = "app_unavailable_in_region"
raise RuntimeError(
"Claude page is app-unavailable-in-region after reload; the runtime IP or region cannot reach Claude Web"
)
probe_after, probe_reason = await self._probe_request_ready(
context,
page,
request_id=f"{probe_request_id}:reload",
)
if probe_after:
self._probe_ok_at[page_id] = time.time()
return
if probe_reason == "app_unavailable_in_region":
raise RuntimeError(
"Claude page is app-unavailable-in-region after reload probe; the runtime IP or region cannot reach Claude Web"
)
action = "goto"
try:
await safe_page_reload(page, url=self.start_url)
except Exception as e:
classified = _classify_browser_resource_error(
e,
helper_name="claude.ensure_request_ready",
operation="preflight",
stage="goto_start_url",
request_url=self.start_url,
page=page,
request_id=request_id or None,
stream_phase=phase or None,
)
if classified is not None:
raise classified from e
raise
current_url = _safe_page_url(page)
if self._suspicious_page_reason(current_url) == "app_unavailable_in_region":
probe_reason = "app_unavailable_in_region"
raise RuntimeError(
"Claude page is app-unavailable-in-region after page correction; the runtime IP or region cannot reach Claude Web"
)
probe_after, probe_reason = await self._probe_request_ready(
context,
page,
request_id=f"{probe_request_id}:goto",
)
if not probe_after:
if probe_reason == "suspicious_page_url":
raise BrowserResourceInvalidError(
"Claude request preflight failed after page correction: suspicious_page_url",
helper_name="claude.ensure_request_ready",
operation="preflight",
stage="probe_after_goto",
resource_hint="page",
request_url=self.start_url,
page_url=current_url,
request_id=request_id or None,
stream_phase=phase or None,
)
raise RuntimeError(
f"Claude request control probe failed after page correction: {probe_reason or 'unknown'}"
)
self._probe_ok_at[page_id] = time.time()
finally:
logger.info(
"[%s] ensure_request_ready phase=%s account=%s session_id=%s action=%s probe_before=%s probe_after=%s probe_reason=%s page.url.before=%s page.url.after=%s",
self.type_name,
phase,
account_id,
session_id,
action,
probe_before,
probe_after,
probe_reason,
_truncate_url_for_log(initial_url),
_truncate_url_for_log(current_url),
)
# ---- 5 个必须实现的 hook ----
async def fetch_site_context(
self,
context: BrowserContext,
page: Page,
request_id: str = "",
) -> dict[str, Any] | None:
page_id = id(page)
cached = self._site_context_cache.get(page_id)
if cached is not None:
ctx, ts = cached
if (time.time() - ts) < self._SITE_CONTEXT_TTL:
return ctx
resp = await request_json_via_context_request(
context,
page,
f"{self.api_base}/account",
timeout_ms=15000,
request_id=request_id or "site-context",
)
if int(resp.get("status") or 0) != 200:
text = str(resp.get("text") or "")[:500]
logger.warning(
"[%s] fetch_site_context 失败 status=%s url=%s body=%s",
self.type_name,
resp.get("status"),
resp.get("url"),
text,
)
return None
data = resp.get("json")
if not isinstance(data, dict):
logger.warning("[%s] fetch_site_context 返回非 JSON", self.type_name)
return None
memberships = data.get("memberships") or []
if not memberships:
return None
org = memberships[0].get("organization") or {}
org_uuid = org.get("uuid")
if org_uuid:
result = {"org_uuid": org_uuid}
self._site_context_cache[page_id] = (result, time.time())
return result
return None
async def create_session(
self,
context: BrowserContext,
page: Page,
site_context: dict[str, Any],
**kwargs: Any,
) -> str | None:
org_uuid = site_context["org_uuid"]
public_model = str(kwargs.get("public_model") or "").strip()
upstream_model = str(kwargs.get("upstream_model") or "").strip()
if not upstream_model:
upstream_model = self.resolve_model(None).upstream_model
payload: dict[str, Any] = {
"name": "",
"model": (
_base_upstream_model(public_model)
if _is_thinking_model(public_model)
else upstream_model
),
}
if _is_thinking_model(public_model):
payload["paprika_mode"] = "extended"
url = f"{self.api_base}/organizations/{org_uuid}/chat_conversations"
request_id = str(kwargs.get("request_id") or "").strip()
resp = await request_json_via_context_request(
context,
page,
url,
method="POST",
body=json.dumps(payload),
headers={"Content-Type": "application/json"},
timeout_ms=15000,
request_id=request_id or f"create-session:{org_uuid}",
)
status = int(resp.get("status") or 0)
if status not in (200, 201):
text = str(resp.get("text") or "")[:500]
logger.warning("创建会话失败 %s: %s", status, text)
return None
data = resp.get("json")
if not isinstance(data, dict):
logger.warning("创建会话返回非 JSON")
return None
return data.get("uuid")
def build_completion_url(self, session_id: str, state: dict[str, Any]) -> str:
org_uuid = state["site_context"]["org_uuid"]
return f"{self.api_base}/organizations/{org_uuid}/chat_conversations/{session_id}/completion"
# 构建请求体
def build_completion_body(
self,
message: str,
session_id: str,
state: dict[str, Any],
prepared_attachments: dict[str, Any] | None = None,
) -> dict[str, Any]:
parent = state.get("parent_message_uuid")
tz = state.get("timezone") or TIMEZONE
public_model = str(state.get("public_model") or "").strip()
body = _default_completion_body(
message,
is_follow_up=parent is not None,
timezone=tz,
public_model=public_model,
)
if parent:
body["parent_message_uuid"] = parent
if prepared_attachments:
body.update(prepared_attachments)
return body
def parse_stream_event(
self,
payload: str,
) -> tuple[list[str], str | None, str | None]:
return _parse_one_sse_event(payload)
def is_stream_end_event(self, payload: str) -> bool:
return _is_terminal_sse_event(payload)
# 处理错误
def stream_transport(self) -> str:
return "context_request"
def on_http_error(
self,
message: str,
headers: dict[str, str] | None,
) -> int | None:
if "429" not in message:
return None
if headers:
reset = headers.get("anthropic-ratelimit-requests-reset") or headers.get(
"Anthropic-Ratelimit-Requests-Reset"
)
if reset:
try:
s = str(reset).strip()
if s.endswith("Z"):
s = s[:-1] + "+00:00"
dt = datetime.datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
return int(dt.timestamp())
except Exception:
pass
return int(time.time()) + 5 * 3600
_UUID_RE = re.compile(
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
)
def on_stream_completion_finished(
self,
session_id: str,
message_ids: list[str],
) -> None:
"""Claude 多轮续写需要 parent_message_uuid,取本轮最后一条消息 UUID 写入 state。"""
last_uuid = next(
(m for m in reversed(message_ids) if self._UUID_RE.match(m)), None
)
if last_uuid and session_id in self._session_state:
self._session_state[session_id]["parent_message_uuid"] = last_uuid
logger.info(
"[%s] updated parent_message_uuid=%s", self.type_name, last_uuid
)
async def prepare_attachments(
self,
context: BrowserContext,
page: Page,
session_id: str,
state: dict[str, Any],
attachments: list[InputAttachment],
request_id: str = "",
) -> dict[str, Any]:
if not attachments:
return {}
if len(attachments) > 5:
raise RuntimeError("Claude 单次最多上传 5 张图片")
org_uuid = state["site_context"]["org_uuid"]
url = (
f"{self.api_base}/organizations/{org_uuid}/conversations/"
f"{session_id}/wiggle/upload-file"
)
file_ids: list[str] = []
for attachment in attachments:
resp = await upload_file_via_context_request(
context,
page,
url,
filename=attachment.filename,
mime_type=attachment.mime_type,
data=attachment.data,
field_name="file",
timeout_ms=30000,
request_id=request_id or f"upload:{session_id}",
)
status = int(resp.get("status") or 0)
if status not in (200, 201):
text = str(resp.get("text") or "")[:500]
raise RuntimeError(f"图片上传失败 {status}: {text}")
data = resp.get("json")
if not isinstance(data, dict):
raise RuntimeError("图片上传返回非 JSON")
file_uuid = data.get("file_uuid") or data.get("uuid")
if not file_uuid:
raise RuntimeError("图片上传未返回 file_uuid")
file_ids.append(str(file_uuid))
return {"attachments": [], "files": file_ids}
def register_claude_plugin() -> None:
"""注册 Claude 插件到全局 Registry。"""
PluginRegistry.register(ClaudePlugin())