| """ |
| OpenAI Chat Completions ↔ Anthropic Messages API 格式转换模块。 |
| |
| 提供请求/响应格式的双向转换,以及 DeepSeek SSE 中间解析函数, |
| 使 /v1/chat/completions 和 /v1/messages 可以共享同一套下游逻辑, |
| 而 Anthropic Messages API 作为内部"标准格式"。 |
| |
| 用法: |
| from .converter import openai_to_anthropic, anthropic_to_openai |
| |
| # 将 OpenAI 请求转为 Anthropic 内部格式 |
| anth_body = openai_to_anthropic(openai_body) |
| |
| # 将内部 Anthropic 响应转回 OpenAI 格式 |
| openai_resp = anthropic_to_openai(anth_body, model="deepseek-v4-flash") |
| """ |
|
|
| import json |
| import logging |
| import time |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
| def openai_to_anthropic(body: dict) -> dict: |
| """将 OpenAI Chat Completions 请求体转换为 Anthropic Messages API 格式。 |
| |
| - system 消息 → 顶层 ``system`` 字段 |
| - thinking_enabled / thinking.type → ``thinking`` 对象 |
| - OpenAI tools → Anthropic tools / tool_choice |
| - 确保 messages 以 user 开头(Anthropic 要求) |
| """ |
| result: dict = {} |
|
|
| |
| result["model"] = body.get("model", "deepseek-v4-flash") |
|
|
| |
| messages = body.get("messages", []) |
| system_parts: list[str] = [] |
| non_system: list[dict] = [] |
| for msg in messages: |
| role = msg.get("role", "") |
| content = msg.get("content", "") |
| if isinstance(content, list): |
| texts = [p.get("text", "") for p in content if p.get("type") == "text"] |
| content = "\n".join(texts) |
| if role == "system": |
| if content: |
| system_parts.append(str(content)) |
| else: |
| non_system.append({**msg, "content": str(content)}) |
| if system_parts: |
| result["system"] = "\n".join(system_parts) |
|
|
| |
| anth_messages: list[dict] = [] |
| for msg in non_system: |
| role = msg.get("role", "user") |
| if role not in ("user", "assistant"): |
| role = "user" |
| anth_messages.append({"role": role, "content": msg.get("content", "")}) |
|
|
| if anth_messages and anth_messages[0]["role"] != "user": |
| |
| anth_messages.insert(0, {"role": "user", "content": "."}) |
| result["messages"] = anth_messages |
|
|
| |
| for key in ("max_tokens", "temperature", "top_p", "stream"): |
| if key in body: |
| result[key] = body[key] |
|
|
| stop = body.get("stop") |
| if stop: |
| result["stop_sequences"] = [stop] if isinstance(stop, str) else stop |
|
|
| |
| thinking_obj = body.get("thinking") |
| if isinstance(thinking_obj, dict) and thinking_obj.get("type") == "enabled": |
| thinking: dict = {"type": "enabled"} |
| if "budget_tokens" in thinking_obj: |
| thinking["budget_tokens"] = thinking_obj["budget_tokens"] |
| result["thinking"] = thinking |
| elif body.get("thinking_enabled", True): |
| result["thinking"] = {"type": "enabled"} |
|
|
| |
| tools = body.get("tools", []) |
| if tools: |
| anth_tools = [] |
| for tool in tools: |
| func = tool.get("function", {}) |
| anth_tools.append({ |
| "name": func.get("name", ""), |
| "description": func.get("description", ""), |
| "input_schema": func.get("parameters", {}), |
| }) |
| if anth_tools: |
| result["tools"] = anth_tools |
|
|
| return result |
|
|
|
|
| |
| |
| |
| def anthropic_to_openai_request(anth_body: dict) -> dict: |
| """将 Anthropic Messages API 请求体转换为 OpenAI Chat Completions 格式。""" |
| result: dict = {} |
|
|
| result["model"] = anth_body.get("model", "deepseek-v4-flash") |
|
|
| |
| messages: list[dict] = [] |
| system_text = anth_body.get("system", "") |
| if system_text: |
| messages.append({"role": "system", "content": system_text}) |
|
|
| |
| for msg in anth_body.get("messages", []): |
| role = msg.get("role", "user") |
| content = msg.get("content", "") |
| if isinstance(content, list): |
| texts = [p.get("text", "") for p in content if p.get("type") == "text"] |
| content = "\n".join(texts) |
| messages.append({"role": role, "content": str(content)}) |
|
|
| result["messages"] = messages |
|
|
| for key in ("max_tokens", "temperature", "top_p", "stream"): |
| if key in anth_body: |
| result[key] = anth_body[key] |
|
|
| |
| thinking = anth_body.get("thinking") |
| if isinstance(thinking, dict) and thinking.get("type") == "enabled": |
| result["thinking_enabled"] = True |
| elif isinstance(thinking, bool): |
| result["thinking_enabled"] = thinking |
|
|
| |
| tools = anth_body.get("tools", []) |
| if tools: |
| oai_tools = [] |
| for tool in tools: |
| oai_tools.append({ |
| "type": "function", |
| "function": { |
| "name": tool.get("name", ""), |
| "description": tool.get("description", ""), |
| "parameters": tool.get("input_schema", {}), |
| }, |
| }) |
| if oai_tools: |
| result["tools"] = oai_tools |
|
|
| return result |
|
|
|
|
| |
| |
| |
| def anthropic_to_openai(anth_body: dict, model: str | None = None) -> dict: |
| """将 Anthropic Messages API **响应体** 转换为 OpenAI Chat Completions 响应体。 |
| |
| content blocks 映射: |
| type=text → assistant content |
| type=thinking → reasoning_content |
| type=tool_use → tool_calls |
| """ |
| if model is None: |
| model = anth_body.get("model", "deepseek-v4-flash") |
|
|
| content_blocks = anth_body.get("content", []) |
| text_parts: list[str] = [] |
| reasoning_parts: list[str] = [] |
| tool_calls: list[dict] = [] |
|
|
| for block in content_blocks: |
| btype = block.get("type", "") |
| if btype == "text": |
| text_parts.append(block.get("text", "")) |
| elif btype == "thinking": |
| reasoning_parts.append(block.get("thinking", "")) |
| elif btype == "tool_use": |
| tool_calls.append({ |
| "id": block.get("id", "call_001"), |
| "type": "function", |
| "function": { |
| "name": block.get("name", ""), |
| "arguments": json.dumps(block.get("input", {}), ensure_ascii=False), |
| }, |
| }) |
|
|
| content = "".join(text_parts) or None |
| reasoning = "".join(reasoning_parts) |
|
|
| message: dict = {"role": "assistant", "content": content} |
| if reasoning: |
| message["reasoning_content"] = reasoning |
| if tool_calls: |
| message["tool_calls"] = tool_calls |
|
|
| |
| stop_reason = anth_body.get("stop_reason", "end_turn") |
| finish_reason = "stop" |
| if stop_reason == "tool_use": |
| finish_reason = "tool_calls" |
| elif stop_reason in ("max_tokens", "max_tokens_reached"): |
| finish_reason = "length" |
| elif stop_reason == "stop_sequence": |
| finish_reason = "stop" |
| elif stop_reason == "end_turn": |
| finish_reason = "stop" |
|
|
| usage = anth_body.get("usage", {}) |
| in_tokens = usage.get("input_tokens", 0) |
| out_tokens = usage.get("output_tokens", 0) |
|
|
| response: dict = { |
| "id": anth_body.get("id", f"chatcmpl-{int(time.time())}"), |
| "object": "chat.completion", |
| "created": int(time.time()), |
| "model": model, |
| "choices": [{ |
| "index": 0, |
| "message": message, |
| "finish_reason": finish_reason, |
| }], |
| "usage": { |
| "prompt_tokens": in_tokens, |
| "completion_tokens": out_tokens, |
| "total_tokens": in_tokens + out_tokens, |
| }, |
| } |
| if reasoning: |
| response["usage"]["completion_tokens_details"] = { |
| "reasoning_tokens": len(reasoning) // 4, |
| } |
|
|
| return response |
|
|
|
|
| |
| |
| |
| def deepseek_line_to_anthropic_events(line: str, state: dict) -> list[dict]: |
| """解析 DeepSeek SSE 行,生成 0~N 个 Anthropic SSE event dict。 |
| |
| 每个 event dict 格式:{"event": "<event_name>", "data": <json-serializable>} |
| |
| ``state`` 是一个可变字典,调用方初始化为:: |
| state = {"ptype": "text", "next_block_index": 0, "active_block_index": None, |
| "block_active": False, "has_thinking": False} |
| """ |
| events: list[dict] = [] |
|
|
| if not line or not line.startswith("data:"): |
| return events |
|
|
| data_str = line[5:].strip() |
| if data_str == "[DONE]": |
| return events |
|
|
| try: |
| chunk = json.loads(data_str) |
| except json.JSONDecodeError: |
| return events |
|
|
| |
| def _start_block(btype: str) -> dict: |
| idx = state.get("next_block_index", 0) |
| state["next_block_index"] = idx + 1 |
| state["active_block_index"] = idx |
| state["block_active"] = True |
| state["current_block_type"] = btype |
| if btype == "thinking": |
| state["has_thinking"] = True |
| block: dict = {"type": btype} |
| if btype == "thinking": |
| block["thinking"] = "" |
| elif btype == "text": |
| block["text"] = "" |
| elif btype == "tool_use": |
| block["type"] = "tool_use" |
| block["id"] = "" |
| block["name"] = "" |
| block["input"] = {} |
| return { |
| "event": "content_block_start", |
| "data": {"type": "content_block_start", "index": idx, "content_block": block}, |
| } |
|
|
| def _delta(btype: str, text: str) -> dict: |
| delta_type = { |
| "thinking": "thinking_delta", |
| "text": "text_delta", |
| "tool_use": "input_json_delta", |
| }.get(btype, "text_delta") |
| delta: dict = {"type": delta_type} |
| if btype == "thinking": |
| delta["thinking"] = text |
| elif btype == "text": |
| delta["text"] = text |
| elif btype == "tool_use": |
| delta["partial_json"] = text |
| return { |
| "event": "content_block_delta", |
| "data": { |
| "type": "content_block_delta", |
| "index": state.get("active_block_index", 0), |
| "delta": delta, |
| }, |
| } |
|
|
| def _stop_block(_btype: str = None) -> dict: |
| idx = state.get("active_block_index", 0) |
| state["block_active"] = False |
| state["active_block_index"] = None |
| return { |
| "event": "content_block_stop", |
| "data": {"type": "content_block_stop", "index": idx}, |
| } |
|
|
| def _switch_to(btype: str): |
| """切换到新的 content block 类型,必要时先 close old block。""" |
| old = state.get("ptype", "text") |
| if old != btype and state.get("block_active"): |
| events.append(_stop_block(old)) |
| if old != btype or not state.get("block_active"): |
| events.append(_start_block(btype)) |
| state["ptype"] = btype |
|
|
| |
| ptype = state.get("ptype", "text") |
|
|
| |
| if "v" in chunk and isinstance(chunk["v"], dict) and "response" in chunk["v"]: |
| fragments = chunk["v"]["response"].get("fragments", []) |
| if fragments and isinstance(fragments, list) and len(fragments) > 0: |
| frag_type = fragments[0].get("type", "") |
| new_ptype = "thinking" if frag_type == "THINK" else "text" |
| frag_content = fragments[0].get("content", "") |
|
|
| _switch_to(new_ptype) |
|
|
| if frag_content: |
| events.append(_delta(new_ptype, frag_content)) |
| return events |
|
|
| if "p" in chunk and chunk.get("p") == "response/fragments" and chunk.get("o") == "APPEND": |
| new_frags = chunk.get("v", []) |
| if new_frags and isinstance(new_frags, list) and len(new_frags) > 0: |
| frag_type = new_frags[0].get("type", "") |
| new_ptype = "thinking" if frag_type == "THINK" else "text" |
| frag_content = new_frags[0].get("content", "") |
|
|
| _switch_to(new_ptype) |
|
|
| if frag_content: |
| events.append(_delta(new_ptype, frag_content)) |
| return events |
|
|
| |
| if "p" in chunk: |
| p_value = chunk["p"] |
|
|
| if p_value == "response/thinking_content": |
| _switch_to("thinking") |
|
|
| elif p_value == "response/content": |
| _switch_to("text") |
|
|
| elif p_value == "response/status": |
| if chunk.get("v") == "FINISHED": |
| if state.get("block_active"): |
| events.append(_stop_block(state.get("ptype", "text"))) |
| events.append({"event": "__FINISHED__", "data": {}}) |
| return events |
|
|
| elif p_value == "response/search_status": |
| return events |
|
|
| |
| ptype = state.get("ptype", "text") |
|
|
| |
| if "v" in chunk: |
| v_value = chunk["v"] |
| if isinstance(v_value, str): |
| content = v_value |
| if not state.get("block_active"): |
| events.append(_start_block(ptype)) |
| events.append(_delta(ptype, content)) |
| elif isinstance(v_value, list): |
| for item in v_value: |
| if item.get("p") == "status" and item.get("v") == "FINISHED": |
| if state.get("block_active"): |
| events.append(_stop_block(state.get("ptype", "text"))) |
| events.append({"event": "__FINISHED__", "data": {}}) |
|
|
| return events |
|
|
|
|
| |
| |
| |
| def make_message_start_event(msg_id: str, model: str, usage: dict | None = None) -> dict: |
| """生成 ``message_start`` SSE 事件。""" |
| if usage is None: |
| usage = {"input_tokens": 0, "output_tokens": 0} |
| return { |
| "event": "message_start", |
| "data": { |
| "type": "message_start", |
| "message": { |
| "id": msg_id or f"msg_{int(time.time())}", |
| "type": "message", |
| "role": "assistant", |
| "content": [], |
| "model": model, |
| "stop_reason": None, |
| "stop_sequence": None, |
| "usage": usage, |
| }, |
| }, |
| } |
|
|
|
|
| |
| |
| |
| def make_message_delta_event(stop_reason: str = "end_turn", |
| stop_sequence: str | None = None, |
| output_tokens: int = 0) -> dict: |
| """生成 ``message_delta`` SSE 事件。""" |
| return { |
| "event": "message_delta", |
| "data": { |
| "type": "message_delta", |
| "delta": {"stop_reason": stop_reason, "stop_sequence": stop_sequence}, |
| "usage": {"output_tokens": output_tokens}, |
| }, |
| } |
|
|
|
|
| def make_message_stop_event() -> dict: |
| """生成 ``message_stop`` SSE 事件。""" |
| return {"event": "message_stop", "data": {"type": "message_stop"}} |
|
|
|
|
| |
| |
| |
| def build_anthropic_response(msg_id: str, model: str, |
| content_blocks: list[dict], |
| stop_reason: str = "end_turn", |
| input_tokens: int = 0, |
| output_tokens: int = 0) -> dict: |
| """将已收集的 content blocks 组装为 Anthropic Messages API 响应体。""" |
| return { |
| "id": msg_id or f"msg_{int(time.time())}", |
| "type": "message", |
| "role": "assistant", |
| "content": content_blocks, |
| "model": model, |
| "stop_reason": stop_reason, |
| "stop_sequence": None, |
| "usage": { |
| "input_tokens": input_tokens, |
| "output_tokens": output_tokens, |
| }, |
| } |
|
|