DS2api / app /converter.py
cjovs's picture
Deploy DS2api Docker Space with bucket-backed runtime
33f27ba verified
"""
OpenAI Chat Completions ↔ Anthropic Messages API 格式转换模块。
提供请求/响应格式的双向转换,以及 DeepSeek SSE 中间解析函数,
使 /v1/chat/completions 和 /v1/messages 可以共享同一套下游逻辑,
而 Anthropic Messages API 作为内部"标准格式"。
用法:
from .converter import openai_to_anthropic, anthropic_to_openai
# 将 OpenAI 请求转为 Anthropic 内部格式
anth_body = openai_to_anthropic(openai_body)
# 将内部 Anthropic 响应转回 OpenAI 格式
openai_resp = anthropic_to_openai(anth_body, model="deepseek-v4-flash")
"""
import json
import logging
import time
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# 请求格式转换:OpenAI Chat Completions → Anthropic Messages API
# ---------------------------------------------------------------------------
def openai_to_anthropic(body: dict) -> dict:
"""将 OpenAI Chat Completions 请求体转换为 Anthropic Messages API 格式。
- system 消息 → 顶层 ``system`` 字段
- thinking_enabled / thinking.type → ``thinking`` 对象
- OpenAI tools → Anthropic tools / tool_choice
- 确保 messages 以 user 开头(Anthropic 要求)
"""
result: dict = {}
# -- model --
result["model"] = body.get("model", "deepseek-v4-flash")
# -- system(从 system role 消息提取到顶层) --
messages = body.get("messages", [])
system_parts: list[str] = []
non_system: list[dict] = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
if isinstance(content, list):
texts = [p.get("text", "") for p in content if p.get("type") == "text"]
content = "\n".join(texts)
if role == "system":
if content:
system_parts.append(str(content))
else:
non_system.append({**msg, "content": str(content)})
if system_parts:
result["system"] = "\n".join(system_parts)
# -- messages(排除 system,转换 role,确保以 user 开头) --
anth_messages: list[dict] = []
for msg in non_system:
role = msg.get("role", "user")
if role not in ("user", "assistant"):
role = "user" # Anthropic 只认 user / assistant
anth_messages.append({"role": role, "content": msg.get("content", "")})
if anth_messages and anth_messages[0]["role"] != "user":
# Anthropic 要求第一条消息必须是 user
anth_messages.insert(0, {"role": "user", "content": "."})
result["messages"] = anth_messages
# -- 标量参数 --
for key in ("max_tokens", "temperature", "top_p", "stream"):
if key in body:
result[key] = body[key]
stop = body.get("stop")
if stop:
result["stop_sequences"] = [stop] if isinstance(stop, str) else stop
# -- thinking --
thinking_obj = body.get("thinking")
if isinstance(thinking_obj, dict) and thinking_obj.get("type") == "enabled":
thinking: dict = {"type": "enabled"}
if "budget_tokens" in thinking_obj:
thinking["budget_tokens"] = thinking_obj["budget_tokens"]
result["thinking"] = thinking
elif body.get("thinking_enabled", True):
result["thinking"] = {"type": "enabled"}
# -- tools --
tools = body.get("tools", [])
if tools:
anth_tools = []
for tool in tools:
func = tool.get("function", {})
anth_tools.append({
"name": func.get("name", ""),
"description": func.get("description", ""),
"input_schema": func.get("parameters", {}),
})
if anth_tools:
result["tools"] = anth_tools
return result
# ---------------------------------------------------------------------------
# 请求格式转换:Anthropic Messages API → OpenAI Chat Completions
# ---------------------------------------------------------------------------
def anthropic_to_openai_request(anth_body: dict) -> dict:
"""将 Anthropic Messages API 请求体转换为 OpenAI Chat Completions 格式。"""
result: dict = {}
result["model"] = anth_body.get("model", "deepseek-v4-flash")
# system 顶层字段 → system 消息
messages: list[dict] = []
system_text = anth_body.get("system", "")
if system_text:
messages.append({"role": "system", "content": system_text})
# messages
for msg in anth_body.get("messages", []):
role = msg.get("role", "user")
content = msg.get("content", "")
if isinstance(content, list):
texts = [p.get("text", "") for p in content if p.get("type") == "text"]
content = "\n".join(texts)
messages.append({"role": role, "content": str(content)})
result["messages"] = messages
for key in ("max_tokens", "temperature", "top_p", "stream"):
if key in anth_body:
result[key] = anth_body[key]
# thinking → thinking_enabled
thinking = anth_body.get("thinking")
if isinstance(thinking, dict) and thinking.get("type") == "enabled":
result["thinking_enabled"] = True
elif isinstance(thinking, bool):
result["thinking_enabled"] = thinking
# Anthropic tools → OpenAI tools
tools = anth_body.get("tools", [])
if tools:
oai_tools = []
for tool in tools:
oai_tools.append({
"type": "function",
"function": {
"name": tool.get("name", ""),
"description": tool.get("description", ""),
"parameters": tool.get("input_schema", {}),
},
})
if oai_tools:
result["tools"] = oai_tools
return result
# ---------------------------------------------------------------------------
# 响应格式转换:Anthropic Messages API → OpenAI Chat Completions
# ---------------------------------------------------------------------------
def anthropic_to_openai(anth_body: dict, model: str | None = None) -> dict:
"""将 Anthropic Messages API **响应体** 转换为 OpenAI Chat Completions 响应体。
content blocks 映射:
type=text → assistant content
type=thinking → reasoning_content
type=tool_use → tool_calls
"""
if model is None:
model = anth_body.get("model", "deepseek-v4-flash")
content_blocks = anth_body.get("content", [])
text_parts: list[str] = []
reasoning_parts: list[str] = []
tool_calls: list[dict] = []
for block in content_blocks:
btype = block.get("type", "")
if btype == "text":
text_parts.append(block.get("text", ""))
elif btype == "thinking":
reasoning_parts.append(block.get("thinking", ""))
elif btype == "tool_use":
tool_calls.append({
"id": block.get("id", "call_001"),
"type": "function",
"function": {
"name": block.get("name", ""),
"arguments": json.dumps(block.get("input", {}), ensure_ascii=False),
},
})
content = "".join(text_parts) or None
reasoning = "".join(reasoning_parts)
message: dict = {"role": "assistant", "content": content}
if reasoning:
message["reasoning_content"] = reasoning
if tool_calls:
message["tool_calls"] = tool_calls
# finish_reason 映射
stop_reason = anth_body.get("stop_reason", "end_turn")
finish_reason = "stop"
if stop_reason == "tool_use":
finish_reason = "tool_calls"
elif stop_reason in ("max_tokens", "max_tokens_reached"):
finish_reason = "length"
elif stop_reason == "stop_sequence":
finish_reason = "stop"
elif stop_reason == "end_turn":
finish_reason = "stop"
usage = anth_body.get("usage", {})
in_tokens = usage.get("input_tokens", 0)
out_tokens = usage.get("output_tokens", 0)
response: dict = {
"id": anth_body.get("id", f"chatcmpl-{int(time.time())}"),
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"message": message,
"finish_reason": finish_reason,
}],
"usage": {
"prompt_tokens": in_tokens,
"completion_tokens": out_tokens,
"total_tokens": in_tokens + out_tokens,
},
}
if reasoning:
response["usage"]["completion_tokens_details"] = {
"reasoning_tokens": len(reasoning) // 4,
}
return response
# ---------------------------------------------------------------------------
# DeepSeek SSE → Anthropic SSE 事件构建器
# ---------------------------------------------------------------------------
def deepseek_line_to_anthropic_events(line: str, state: dict) -> list[dict]:
"""解析 DeepSeek SSE 行,生成 0~N 个 Anthropic SSE event dict。
每个 event dict 格式:{"event": "<event_name>", "data": <json-serializable>}
``state`` 是一个可变字典,调用方初始化为::
state = {"ptype": "text", "next_block_index": 0, "active_block_index": None,
"block_active": False, "has_thinking": False}
"""
events: list[dict] = []
if not line or not line.startswith("data:"):
return events
data_str = line[5:].strip()
if data_str == "[DONE]":
return events
try:
chunk = json.loads(data_str)
except json.JSONDecodeError:
return events
# ---- 辅助函数 ----
def _start_block(btype: str) -> dict:
idx = state.get("next_block_index", 0)
state["next_block_index"] = idx + 1
state["active_block_index"] = idx
state["block_active"] = True
state["current_block_type"] = btype
if btype == "thinking":
state["has_thinking"] = True
block: dict = {"type": btype}
if btype == "thinking":
block["thinking"] = ""
elif btype == "text":
block["text"] = ""
elif btype == "tool_use":
block["type"] = "tool_use"
block["id"] = ""
block["name"] = ""
block["input"] = {}
return {
"event": "content_block_start",
"data": {"type": "content_block_start", "index": idx, "content_block": block},
}
def _delta(btype: str, text: str) -> dict:
delta_type = {
"thinking": "thinking_delta",
"text": "text_delta",
"tool_use": "input_json_delta",
}.get(btype, "text_delta")
delta: dict = {"type": delta_type}
if btype == "thinking":
delta["thinking"] = text
elif btype == "text":
delta["text"] = text
elif btype == "tool_use":
delta["partial_json"] = text
return {
"event": "content_block_delta",
"data": {
"type": "content_block_delta",
"index": state.get("active_block_index", 0),
"delta": delta,
},
}
def _stop_block(_btype: str = None) -> dict:
idx = state.get("active_block_index", 0)
state["block_active"] = False
state["active_block_index"] = None
return {
"event": "content_block_stop",
"data": {"type": "content_block_stop", "index": idx},
}
def _switch_to(btype: str):
"""切换到新的 content block 类型,必要时先 close old block。"""
old = state.get("ptype", "text")
if old != btype and state.get("block_active"):
events.append(_stop_block(old))
if old != btype or not state.get("block_active"):
events.append(_start_block(btype))
state["ptype"] = btype
# ---- 判断内容类型 ----
ptype = state.get("ptype", "text")
# 新格式:fragments 中的 type 决定内容类型
if "v" in chunk and isinstance(chunk["v"], dict) and "response" in chunk["v"]:
fragments = chunk["v"]["response"].get("fragments", [])
if fragments and isinstance(fragments, list) and len(fragments) > 0:
frag_type = fragments[0].get("type", "")
new_ptype = "thinking" if frag_type == "THINK" else "text"
frag_content = fragments[0].get("content", "")
_switch_to(new_ptype)
if frag_content:
events.append(_delta(new_ptype, frag_content))
return events
if "p" in chunk and chunk.get("p") == "response/fragments" and chunk.get("o") == "APPEND":
new_frags = chunk.get("v", [])
if new_frags and isinstance(new_frags, list) and len(new_frags) > 0:
frag_type = new_frags[0].get("type", "")
new_ptype = "thinking" if frag_type == "THINK" else "text"
frag_content = new_frags[0].get("content", "")
_switch_to(new_ptype)
if frag_content:
events.append(_delta(new_ptype, frag_content))
return events
# ---- 兼容旧格式:路径标记 ----
if "p" in chunk:
p_value = chunk["p"]
if p_value == "response/thinking_content":
_switch_to("thinking")
elif p_value == "response/content":
_switch_to("text")
elif p_value == "response/status":
if chunk.get("v") == "FINISHED":
if state.get("block_active"):
events.append(_stop_block(state.get("ptype", "text")))
events.append({"event": "__FINISHED__", "data": {}})
return events
elif p_value == "response/search_status":
return events
# 刷新 ptype
ptype = state.get("ptype", "text")
# ---- v 字段中的文本内容 ----
if "v" in chunk:
v_value = chunk["v"]
if isinstance(v_value, str):
content = v_value
if not state.get("block_active"):
events.append(_start_block(ptype))
events.append(_delta(ptype, content))
elif isinstance(v_value, list):
for item in v_value:
if item.get("p") == "status" and item.get("v") == "FINISHED":
if state.get("block_active"):
events.append(_stop_block(state.get("ptype", "text")))
events.append({"event": "__FINISHED__", "data": {}})
return events
# ---------------------------------------------------------------------------
# 构建 Anthropic message_start event
# ---------------------------------------------------------------------------
def make_message_start_event(msg_id: str, model: str, usage: dict | None = None) -> dict:
"""生成 ``message_start`` SSE 事件。"""
if usage is None:
usage = {"input_tokens": 0, "output_tokens": 0}
return {
"event": "message_start",
"data": {
"type": "message_start",
"message": {
"id": msg_id or f"msg_{int(time.time())}",
"type": "message",
"role": "assistant",
"content": [],
"model": model,
"stop_reason": None,
"stop_sequence": None,
"usage": usage,
},
},
}
# ---------------------------------------------------------------------------
# 构建 Anthropic message_delta + message_stop events
# ---------------------------------------------------------------------------
def make_message_delta_event(stop_reason: str = "end_turn",
stop_sequence: str | None = None,
output_tokens: int = 0) -> dict:
"""生成 ``message_delta`` SSE 事件。"""
return {
"event": "message_delta",
"data": {
"type": "message_delta",
"delta": {"stop_reason": stop_reason, "stop_sequence": stop_sequence},
"usage": {"output_tokens": output_tokens},
},
}
def make_message_stop_event() -> dict:
"""生成 ``message_stop`` SSE 事件。"""
return {"event": "message_stop", "data": {"type": "message_stop"}}
# ---------------------------------------------------------------------------
# 构建非流式 Anthropic 响应体(从已收集的 content blocks)
# ---------------------------------------------------------------------------
def build_anthropic_response(msg_id: str, model: str,
content_blocks: list[dict],
stop_reason: str = "end_turn",
input_tokens: int = 0,
output_tokens: int = 0) -> dict:
"""将已收集的 content blocks 组装为 Anthropic Messages API 响应体。"""
return {
"id": msg_id or f"msg_{int(time.time())}",
"type": "message",
"role": "assistant",
"content": content_blocks,
"model": model,
"stop_reason": stop_reason,
"stop_sequence": None,
"usage": {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
},
}