claude-code-proxy / core /anthropic /conversion.py
Yash030's picture
$(cat <<EOF
0ba585f
"""Message and tool format converters."""
import json
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
from pydantic import BaseModel
from .content import get_block_attr, get_block_type
from .utils import set_if_not_none
class OpenAIConversionError(Exception):
"""Raised when Anthropic content cannot be converted to OpenAI chat without data loss."""
class ReasoningReplayMode(StrEnum):
"""How assistant reasoning history is replayed to OpenAI-compatible providers."""
DISABLED = "disabled"
THINK_TAGS = "think_tags"
REASONING_CONTENT = "reasoning_content"
def _openai_reject_native_only_top_level_fields(request_data: Any) -> None:
"""OpenAI chat providers may only convert known top-level request fields.
First-class model fields (e.g. ``context_management``) are not forwarded to
the OpenAI API but are allowed so clients do not hit spurious 400s.
Unknown extra keys (``__pydantic_extra__``) are still rejected.
"""
if not isinstance(request_data, BaseModel):
return
extra = getattr(request_data, "__pydantic_extra__", None)
if not extra:
return
raise OpenAIConversionError(
"OpenAI chat conversion does not support these top-level request fields: "
f"{sorted(str(k) for k in extra)}. Use a native Anthropic transport provider."
)
def _tool_name(tool: Any) -> str:
return str(getattr(tool, "name", "") or "")
def _tool_input_schema(tool: Any) -> dict[str, Any]:
schema = getattr(tool, "input_schema", None)
if isinstance(schema, dict):
return schema
return {"type": "object", "properties": {}}
def _serialize_tool_result_content(tool_content: Any) -> str:
"""Serialize tool_result content for OpenAI ``role: tool`` messages (stable JSON for structured values)."""
if tool_content is None:
return ""
if isinstance(tool_content, str):
return tool_content
if isinstance(tool_content, dict):
return json.dumps(tool_content, ensure_ascii=False)
if isinstance(tool_content, list):
parts: list[str] = []
for item in tool_content:
if isinstance(item, dict) and item.get("type") == "text":
parts.append(str(item.get("text", "")))
elif isinstance(item, dict):
parts.append(json.dumps(item, ensure_ascii=False))
else:
parts.append(str(item))
return "\n".join(parts)
return str(tool_content)
def _clean_reasoning_content(value: Any) -> str | None:
if not isinstance(value, str):
return None
return value if value else None
def _think_tag_content(reasoning: str) -> str:
return f"<think>\n{reasoning}\n</think>"
@dataclass
class _PendingAfterTools:
"""Assistant content that appears after ``tool_use`` in an Anthropic message.
OpenAI ``chat.completions`` cannot place assistant text after ``tool_calls`` in the
same message, so it is deferred until the corresponding ``role: tool`` results have
been replayed in order.
"""
# Tool use IDs still missing a ``role: tool`` result before post-tool text may be replayed.
remaining_tool_ids: set[str] = field(default_factory=set)
deferred_blocks: list[Any] = field(default_factory=list)
top_level_reasoning: str | None = None
reasoning_replay: ReasoningReplayMode = ReasoningReplayMode.THINK_TAGS
# True after deferred assistant text has been added to the OpenAI transcript.
deferred_emitted: bool = False
def needs_deferred(self) -> bool:
return bool(self.deferred_blocks) and not self.deferred_emitted
def _index_first_tool_use(blocks: list[Any]) -> int | None:
for i, block in enumerate(blocks):
if get_block_type(block) == "tool_use":
return i
return None
def _iter_tool_uses_in_order(blocks: list[Any]) -> list[dict[str, Any]]:
tool_calls: list[dict[str, Any]] = []
for block in blocks:
if get_block_type(block) == "tool_use":
tool_input = get_block_attr(block, "input", {})
tool_calls.append(
{
"id": get_block_attr(block, "id"),
"type": "function",
"function": {
"name": get_block_attr(block, "name"),
"arguments": json.dumps(tool_input)
if isinstance(tool_input, dict)
else str(tool_input),
},
}
)
return tool_calls
def _deferred_post_tool_blocks(
content: list[Any], *, first_tool_index: int
) -> list[Any]:
return [
b
for i, b in enumerate(content)
if i > first_tool_index and get_block_type(b) != "tool_use"
]
def _assert_no_forbidden_assistant_block(block: Any) -> None:
block_type = get_block_type(block)
if block_type == "image":
raise OpenAIConversionError(
"Assistant image blocks are not supported for OpenAI chat conversion."
)
if block_type in (
"server_tool_use",
"web_search_tool_result",
"web_fetch_tool_result",
):
raise OpenAIConversionError(
"OpenAI chat conversion does not support Anthropic server tool blocks "
f"({block_type!r} in an assistant message). Use a native Anthropic transport provider."
)
class AnthropicToOpenAIConverter:
"""Convert Anthropic message format to OpenAI-compatible format."""
@staticmethod
def convert_messages(
messages: list[Any],
*,
reasoning_replay: ReasoningReplayMode = ReasoningReplayMode.THINK_TAGS,
) -> list[dict[str, Any]]:
result: list[dict[str, Any]] = []
pending: _PendingAfterTools | None = None
for msg in messages:
role = msg.role
content = msg.content
reasoning_content = _clean_reasoning_content(
getattr(msg, "reasoning_content", None)
)
if role == "assistant" and isinstance(content, list):
if pending is not None and pending.needs_deferred():
# Orphan: expected tool result; emit deferred to avoid a stuck session.
result.extend(
AnthropicToOpenAIConverter._deferred_post_tool_to_messages(
pending,
)
)
pending.deferred_emitted = True
pending = None
if (first_i := _index_first_tool_use(content)) is not None:
for block in content:
if get_block_type(block) == "tool_use":
continue
_assert_no_forbidden_assistant_block(block)
out, new_pending = (
AnthropicToOpenAIConverter._convert_assistant_message_with_split(
content,
first_tool_index=first_i,
reasoning_content=reasoning_content,
reasoning_replay=reasoning_replay,
)
)
result.extend(out)
if new_pending is not None:
pending = new_pending
else:
for block in content:
_assert_no_forbidden_assistant_block(block)
result.extend(
AnthropicToOpenAIConverter._convert_assistant_message(
content,
reasoning_content=reasoning_content,
reasoning_replay=reasoning_replay,
)
)
elif isinstance(content, str):
if role == "user" and pending is not None and pending.needs_deferred():
result.extend(
AnthropicToOpenAIConverter._deferred_post_tool_to_messages(
pending
)
)
pending.deferred_emitted = True
pending = None
converted = {"role": role, "content": content}
if role == "assistant" and reasoning_content:
if reasoning_replay == ReasoningReplayMode.REASONING_CONTENT:
converted["reasoning_content"] = reasoning_content
elif reasoning_replay == ReasoningReplayMode.THINK_TAGS:
content_parts = [_think_tag_content(reasoning_content)]
if content:
content_parts.append(content)
converted["content"] = "\n\n".join(content_parts)
result.append(converted)
elif isinstance(content, list):
if role == "user":
if pending is not None and pending.needs_deferred():
if not pending.remaining_tool_ids:
result.extend(
AnthropicToOpenAIConverter._deferred_post_tool_to_messages(
pending
)
)
pending.deferred_emitted = True
pending = None
result.extend(
AnthropicToOpenAIConverter._convert_user_message(
content
)
)
else:
pieces = AnthropicToOpenAIConverter._convert_user_message_with_injection(
content, pending
)
result.extend(pieces["messages"])
if pieces["cleared_pending"]:
pending = None
else:
result.extend(
AnthropicToOpenAIConverter._convert_user_message(content)
)
else:
if role == "user" and pending is not None and pending.needs_deferred():
result.extend(
AnthropicToOpenAIConverter._deferred_post_tool_to_messages(
pending
)
)
pending.deferred_emitted = True
pending = None
result.append({"role": role, "content": str(content)})
if pending is not None and pending.needs_deferred():
result.extend(
AnthropicToOpenAIConverter._deferred_post_tool_to_messages(pending)
)
return result
@staticmethod
def _convert_assistant_message_with_split(
content: list[Any],
*,
first_tool_index: int,
reasoning_content: str | None,
reasoning_replay: ReasoningReplayMode,
) -> tuple[list[dict[str, Any]], _PendingAfterTools | None]:
pre = content[:first_tool_index]
tool_calls = _iter_tool_uses_in_order(content)
if not tool_calls:
return (
AnthropicToOpenAIConverter._convert_assistant_message(
content,
reasoning_content=reasoning_content,
reasoning_replay=reasoning_replay,
),
None,
)
deferred_blocks = _deferred_post_tool_blocks(
content, first_tool_index=first_tool_index
)
pre_msg: dict[str, Any]
if not pre:
pre_msg = {
"role": "assistant",
"content": "",
}
if reasoning_replay == ReasoningReplayMode.REASONING_CONTENT:
replay = reasoning_content
if replay:
pre_msg["reasoning_content"] = replay
else:
pre_msg = AnthropicToOpenAIConverter._convert_assistant_message(
pre,
reasoning_content=reasoning_content,
reasoning_replay=reasoning_replay,
)[0]
pre_msg["tool_calls"] = tool_calls
if tool_calls and pre_msg.get("content") == " ":
pre_msg["content"] = ""
pnd: _PendingAfterTools | None = None
if deferred_blocks:
res_ids: set[str] = set()
for tc in tool_calls:
tid = tc.get("id")
if tid is not None and str(tid).strip() != "":
res_ids.add(str(tid))
pnd = _PendingAfterTools(
remaining_tool_ids=res_ids,
deferred_blocks=deferred_blocks,
top_level_reasoning=reasoning_content,
reasoning_replay=reasoning_replay,
)
return [pre_msg], pnd
@staticmethod
def _convert_assistant_message(
content: list[Any],
*,
reasoning_content: str | None = None,
reasoning_replay: ReasoningReplayMode = ReasoningReplayMode.THINK_TAGS,
) -> list[dict[str, Any]]:
content_parts: list[str] = []
thinking_parts: list[str] = []
tool_calls: list[dict[str, Any]] = []
for block in content:
block_type = get_block_type(block)
if block_type == "text":
content_parts.append(get_block_attr(block, "text", ""))
elif block_type == "thinking":
if reasoning_replay == ReasoningReplayMode.DISABLED:
continue
thinking = get_block_attr(block, "thinking", "")
if reasoning_replay == ReasoningReplayMode.THINK_TAGS:
content_parts.append(_think_tag_content(thinking))
elif reasoning_content is None:
thinking_parts.append(thinking)
elif block_type == "redacted_thinking":
# Opaque provider continuation data; do not materialize as model-visible text
# or reasoning_content for OpenAI chat upstreams.
continue
elif block_type == "tool_use":
tool_input = get_block_attr(block, "input", {})
tool_calls.append(
{
"id": get_block_attr(block, "id"),
"type": "function",
"function": {
"name": get_block_attr(block, "name"),
"arguments": json.dumps(tool_input)
if isinstance(tool_input, dict)
else str(tool_input),
},
}
)
else:
_assert_no_forbidden_assistant_block(block)
content_str = "\n\n".join(content_parts)
if not content_str and not tool_calls:
content_str = " "
msg: dict[str, Any] = {
"role": "assistant",
"content": content_str,
}
if tool_calls:
msg["tool_calls"] = tool_calls
if reasoning_replay == ReasoningReplayMode.REASONING_CONTENT:
replay_reasoning = reasoning_content or "\n".join(thinking_parts)
if replay_reasoning:
msg["reasoning_content"] = replay_reasoning
return [msg]
@staticmethod
def _deferred_post_tool_to_messages(
pending: _PendingAfterTools,
) -> list[dict[str, Any]]:
if not pending.deferred_blocks:
return []
return AnthropicToOpenAIConverter._convert_assistant_message(
pending.deferred_blocks,
reasoning_content=pending.top_level_reasoning,
reasoning_replay=pending.reasoning_replay,
)
@staticmethod
def _convert_user_message_with_injection(
content: list[Any], pending: _PendingAfterTools
) -> dict[str, Any]:
"""Convert user list blocks, emitting deferred assistant after all tool results."""
if not pending.needs_deferred() or not pending.remaining_tool_ids:
return {
"messages": AnthropicToOpenAIConverter._convert_user_message(content),
"cleared_pending": False,
}
result: list[dict[str, Any]] = []
text_parts: list[str] = []
cleared = False
def flush_text() -> None:
if text_parts:
result.append({"role": "user", "content": "\n".join(text_parts)})
text_parts.clear()
for block in content:
block_type = get_block_type(block)
if block_type == "text":
text_parts.append(get_block_attr(block, "text", ""))
elif block_type == "image":
# Convert Anthropic image block to OpenAI image_url format
source = get_block_attr(block, "source", {})
source_type = source.get("type", "base64")
if source_type == "base64":
media_type = source.get("media_type", "image/png")
data = source.get("data", "")
# Size guard - check estimated decoded size
estimated_size = len(data) * 4 // 3
# Use a reasonable default (20MB) as max image size
max_image_bytes = 20 * 1024 * 1024
if estimated_size > max_image_bytes:
raise OpenAIConversionError(
f"Image size ({estimated_size / 1024 / 1024:.1f}MB) exceeds limit "
f"({max_image_bytes / 1024 / 1024:.1f}MB)"
)
image_url = f"data:{media_type};base64,{data}"
result.append(
{"type": "image_url", "image_url": {"url": image_url}}
)
elif source_type == "url":
# Handle URL-based images
url = source.get("url", "")
result.append({"type": "image_url", "image_url": {"url": url}})
else:
logger.warning("Unsupported image source type: {}", source_type)
elif block_type == "tool_result":
flush_text()
tool_content = get_block_attr(block, "content", "")
serialized = _serialize_tool_result_content(tool_content)
tuid = get_block_attr(block, "tool_use_id")
tuid_s = str(tuid) if tuid is not None else ""
result.append(
{
"role": "tool",
"tool_call_id": tuid,
"content": serialized if serialized else "",
}
)
if tuid_s in pending.remaining_tool_ids:
pending.remaining_tool_ids.discard(tuid_s)
if not pending.remaining_tool_ids:
result.extend(
AnthropicToOpenAIConverter._deferred_post_tool_to_messages(
pending
)
)
pending.deferred_emitted = True
cleared = True
else:
pass
flush_text()
return {"messages": result, "cleared_pending": cleared}
@staticmethod
def _convert_user_message(content: list[Any]) -> list[dict[str, Any]]:
result: list[dict[str, Any]] = []
text_parts: list[str] = []
def flush_text() -> None:
if text_parts:
result.append({"role": "user", "content": "\n".join(text_parts)})
text_parts.clear()
for block in content:
block_type = get_block_type(block)
if block_type == "text":
text_parts.append(get_block_attr(block, "text", ""))
elif block_type == "image":
# Convert Anthropic image block to OpenAI image_url format
source = get_block_attr(block, "source", {})
source_type = source.get("type", "base64")
if source_type == "base64":
media_type = source.get("media_type", "image/png")
data = source.get("data", "")
# Size guard - check estimated decoded size
estimated_size = len(data) * 4 // 3
# Use a reasonable default (20MB) as max image size
max_image_bytes = 20 * 1024 * 1024
if estimated_size > max_image_bytes:
raise OpenAIConversionError(
f"Image size ({estimated_size / 1024 / 1024:.1f}MB) exceeds limit "
f"({max_image_bytes / 1024 / 1024:.1f}MB)"
)
image_url = f"data:{media_type};base64,{data}"
result.append(
{"type": "image_url", "image_url": {"url": image_url}}
)
elif source_type == "url":
# Handle URL-based images
url = source.get("url", "")
result.append({"type": "image_url", "image_url": {"url": url}})
else:
logger.warning("Unsupported image source type: {}", source_type)
elif block_type == "tool_result":
flush_text()
tool_content = get_block_attr(block, "content", "")
serialized = _serialize_tool_result_content(tool_content)
result.append(
{
"role": "tool",
"tool_call_id": get_block_attr(block, "tool_use_id"),
"content": serialized if serialized else "",
}
)
flush_text()
return result
@staticmethod
def convert_tools(tools: list[Any]) -> list[dict[str, Any]]:
return [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": _tool_input_schema(tool),
},
}
for tool in tools
]
@staticmethod
def convert_tool_choice(tool_choice: Any) -> Any:
if not isinstance(tool_choice, dict):
return tool_choice
choice_type = tool_choice.get("type")
if choice_type == "tool":
name = tool_choice.get("name")
if name:
return {"type": "function", "function": {"name": name}}
if choice_type == "any":
return "required"
if choice_type in {"auto", "none", "required"}:
return choice_type
if choice_type == "function" and isinstance(tool_choice.get("function"), dict):
return tool_choice
return tool_choice
@staticmethod
def convert_system_prompt(system: Any) -> dict[str, str] | None:
if isinstance(system, str):
return {"role": "system", "content": system}
if isinstance(system, list):
text_parts = [
get_block_attr(block, "text", "")
for block in system
if get_block_type(block) == "text"
]
if text_parts:
return {"role": "system", "content": "\n\n".join(text_parts).strip()}
return None
def build_base_request_body(
request_data: Any,
*,
default_max_tokens: int | None = None,
reasoning_replay: ReasoningReplayMode = ReasoningReplayMode.THINK_TAGS,
) -> dict[str, Any]:
"""Build the common parts of an OpenAI-format request body."""
_openai_reject_native_only_top_level_fields(request_data)
messages = AnthropicToOpenAIConverter.convert_messages(
request_data.messages,
reasoning_replay=reasoning_replay,
)
system = getattr(request_data, "system", None)
if system:
system_msg = AnthropicToOpenAIConverter.convert_system_prompt(system)
if system_msg:
messages.insert(0, system_msg)
body: dict[str, Any] = {"model": request_data.model, "messages": messages}
max_tokens = getattr(request_data, "max_tokens", None)
set_if_not_none(body, "max_tokens", max_tokens or default_max_tokens)
set_if_not_none(body, "temperature", getattr(request_data, "temperature", None))
set_if_not_none(body, "top_p", getattr(request_data, "top_p", None))
stop_sequences = getattr(request_data, "stop_sequences", None)
if stop_sequences:
body["stop"] = stop_sequences
tools = getattr(request_data, "tools", None)
if tools:
body["tools"] = AnthropicToOpenAIConverter.convert_tools(tools)
tool_choice = getattr(request_data, "tool_choice", None)
if tool_choice:
body["tool_choice"] = AnthropicToOpenAIConverter.convert_tool_choice(
tool_choice
)
return body