claude-code-proxy / core /anthropic /native_messages_request.py
Yash030's picture
Deploy claude-code-nvidia proxy to Hugging Face Spaces
0157ac7
"""Native Anthropic Messages request body construction (JSON-ready dicts).
Provider adapters supply policy via parameters (defaults, OpenRouter post-steps).
"""
from __future__ import annotations
from collections.abc import Sequence
from typing import Any
from pydantic import BaseModel
_REQUEST_FIELDS = (
"model",
"messages",
"system",
"max_tokens",
"stop_sequences",
"stream",
"temperature",
"top_p",
"top_k",
"metadata",
"tools",
"tool_choice",
"thinking",
"context_management",
"output_config",
"mcp_servers",
"extra_body",
)
# Keys that would override routed canonical request fields if merged from ``extra_body``.
_OPENROUTER_EXTRA_BODY_FORBIDDEN_KEYS = frozenset(
{
"model",
"messages",
"system",
"tools",
"tool_choice",
"stream",
"max_tokens",
"temperature",
"top_p",
"top_k",
"metadata",
"stop_sequences",
"context_management",
"output_config",
"mcp_servers",
}
)
class OpenRouterExtraBodyError(ValueError):
"""``extra_body`` contained reserved keys that would override canonical fields."""
def validate_openrouter_extra_body(extra: Any) -> None:
"""Reject ``extra_body`` keys that must not override routed request fields."""
if not isinstance(extra, dict) or not extra:
return
bad = _OPENROUTER_EXTRA_BODY_FORBIDDEN_KEYS & extra.keys()
if bad:
raise OpenRouterExtraBodyError(
f"extra_body must not override canonical request fields: {sorted(bad)}"
)
_INTERNAL_FIELDS = {
"thinking",
"extra_body",
}
def _serialize_value(value: Any) -> Any:
"""Convert Pydantic models and lightweight objects into JSON-ready values."""
if isinstance(value, BaseModel):
return value.model_dump(exclude_none=True)
if isinstance(value, dict):
return {
key: _serialize_value(item)
for key, item in value.items()
if item is not None
}
if isinstance(value, Sequence) and not isinstance(value, str | bytes | bytearray):
return [_serialize_value(item) for item in value]
if value is None or isinstance(value, str | int | float | bool):
return value
if hasattr(value, "__dict__"):
return {
key: _serialize_value(item)
for key, item in vars(value).items()
if not key.startswith("_") and item is not None
}
return value
def _dump_request_fields(request_data: Any) -> dict[str, Any]:
"""Extract the public request fields (OpenRouter-style explicit field list)."""
if isinstance(request_data, BaseModel):
return request_data.model_dump(exclude_none=True)
dumped: dict[str, Any] = {}
for field in _REQUEST_FIELDS:
value = getattr(request_data, field, None)
if value is not None:
dumped[field] = _serialize_value(value)
return dumped
def dump_raw_messages_request(request_data: Any) -> dict[str, Any]:
"""Public JSON-ready dict of Anthropic public request fields (for native adapters)."""
return _dump_request_fields(request_data)
def sanitize_native_messages_thinking_policy(
messages: Any, *, thinking_enabled: bool
) -> Any:
"""Filter assistant message thinking blocks for upstream native Anthropic JSON.
When ``thinking_enabled`` is false, remove ``thinking`` and ``redacted_thinking``
history so disabled policy is not undermined by prior turns.
When true, keep ``redacted_thinking`` and signed ``thinking``; remove only
unsigned plain ``thinking`` blocks (not replayable).
"""
if not isinstance(messages, list):
return messages
sanitized_messages: list[Any] = []
for message in messages:
if not isinstance(message, dict):
sanitized_messages.append(message)
continue
if message.get("role") != "assistant":
sanitized_messages.append(message)
continue
content = message.get("content")
if not isinstance(content, list):
sanitized_messages.append(message)
continue
if not thinking_enabled:
sanitized_content = [
block
for block in content
if not (
isinstance(block, dict)
and block.get("type") in ("thinking", "redacted_thinking")
)
]
else:
sanitized_content = [
block
for block in content
if not (
isinstance(block, dict)
and block.get("type") == "thinking"
and not isinstance(block.get("signature"), str)
)
]
sanitized_message = dict(message)
sanitized_message["content"] = sanitized_content or ""
sanitized_messages.append(sanitized_message)
return sanitized_messages
def _normalize_system_prompt_for_openrouter(system: Any) -> Any:
"""Flatten Claude SDK system blocks for OpenRouter's native endpoint."""
if not isinstance(system, list):
return system
text_parts: list[str] = []
for block in system:
if not isinstance(block, dict):
continue
if block.get("type") == "text" and isinstance(block.get("text"), str):
text_parts.append(block["text"])
return "\n\n".join(text_parts).strip() if text_parts else system
def _apply_openrouter_reasoning_policy(body: dict[str, Any], thinking_cfg: Any) -> None:
"""Map Anthropic thinking controls onto OpenRouter reasoning controls."""
reasoning = body.setdefault("reasoning", {"enabled": True})
if not isinstance(reasoning, dict):
return
reasoning.setdefault("enabled", True)
if not isinstance(thinking_cfg, dict):
return
budget_tokens = thinking_cfg.get("budget_tokens")
if isinstance(budget_tokens, int):
reasoning.setdefault("max_tokens", budget_tokens)
def build_base_native_anthropic_request_body(
request: Any,
*,
default_max_tokens: int,
thinking_enabled: bool,
) -> dict[str, Any]:
"""Serialize a Pydantic messages request to a generic native Anthropic body."""
body = request.model_dump(exclude_none=True)
body.pop("extra_body", None)
if "thinking" in body:
thinking_cfg = body.pop("thinking")
if thinking_enabled and isinstance(thinking_cfg, dict):
thinking_payload: dict[str, Any] = {"type": "enabled"}
budget_tokens = thinking_cfg.get("budget_tokens")
if isinstance(budget_tokens, int):
thinking_payload["budget_tokens"] = budget_tokens
body["thinking"] = thinking_payload
if "max_tokens" not in body:
body["max_tokens"] = default_max_tokens
if "messages" in body:
body["messages"] = sanitize_native_messages_thinking_policy(
body["messages"],
thinking_enabled=thinking_enabled,
)
return body
def build_openrouter_native_request_body(
request_data: Any,
*,
thinking_enabled: bool,
default_max_tokens: int,
) -> dict[str, Any]:
"""Build an Anthropic-format request body for OpenRouter (policy hooks built-in)."""
dumped_request = _dump_request_fields(request_data)
request_extra = dumped_request.pop("extra_body", None)
thinking_cfg = dumped_request.get("thinking")
body: dict[str, Any] = {
key: value
for key, value in dumped_request.items()
if key not in _INTERNAL_FIELDS
}
if isinstance(request_extra, dict):
validate_openrouter_extra_body(request_extra)
body.update(request_extra)
body["messages"] = sanitize_native_messages_thinking_policy(
body.get("messages"),
thinking_enabled=thinking_enabled,
)
if "system" in body:
body["system"] = _normalize_system_prompt_for_openrouter(body["system"])
body["stream"] = True
if body.get("max_tokens") is None:
body["max_tokens"] = default_max_tokens
if thinking_enabled:
_apply_openrouter_reasoning_policy(body, thinking_cfg)
return body