File size: 6,549 Bytes
77169b4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | """Canonical 请求桥接到 OpenAI 语义事件流(唯一中间态)。"""
from __future__ import annotations
from collections.abc import AsyncIterator
from core.api.chat_handler import ChatHandler
from core.api.schemas import (
InputAttachment,
OpenAIChatRequest,
OpenAIContentPart,
OpenAIMessage,
)
from core.protocol.images import (
MAX_IMAGE_COUNT,
download_remote_image,
parse_base64_image,
parse_data_url,
)
from core.hub.schemas import OpenAIStreamEvent
from core.protocol.schemas import CanonicalChatRequest, CanonicalContentBlock, CanonicalMessage
class CanonicalChatService:
def __init__(self, handler: ChatHandler) -> None:
self._handler = handler
async def stream_raw(
self, req: CanonicalChatRequest
) -> AsyncIterator[OpenAIStreamEvent]:
openai_req = await self._to_openai_request(req)
async for event in self._handler.stream_openai_events(req.provider, openai_req):
yield event
async def collect_raw(self, req: CanonicalChatRequest) -> list[OpenAIStreamEvent]:
events: list[OpenAIStreamEvent] = []
async for event in self.stream_raw(req):
events.append(event)
return events
async def _to_openai_request(self, req: CanonicalChatRequest) -> OpenAIChatRequest:
messages: list[OpenAIMessage] = []
if req.system:
messages.append(
OpenAIMessage(
role="system",
content=self._to_openai_content(req.system),
)
)
for msg in req.messages:
messages.append(
OpenAIMessage(
role=msg.role,
content=self._to_openai_content(msg.content),
tool_call_id=msg.content[0].tool_use_id
if msg.role == "tool" and msg.content
else None,
)
)
openai_tools = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.input_schema,
"strict": tool.strict,
},
}
for tool in req.tools
]
last_user_attachments, all_attachments = await self._resolve_attachments(req)
return OpenAIChatRequest(
model=req.model,
messages=messages,
stream=req.stream,
tools=openai_tools or None,
tool_choice=req.tool_choice,
resume_session_id=req.resume_session_id,
upstream_model=str(req.metadata.get("upstream_model") or "") or None,
# 由 ChatHandler 根据是否 full_history 选择实际赋值给 attachment_files
attachment_files=[],
attachment_files_last_user=last_user_attachments,
attachment_files_all_users=all_attachments,
)
async def _resolve_attachments(
self, req: CanonicalChatRequest
) -> tuple[list[InputAttachment], list[InputAttachment]]:
"""
解析图片附件,返回 (last_user_attachments, all_user_attachments):
- 复用会话(full_history=False)时,仅需最后一条 user 的图片;
- 重建会话(full_history=True)时,需要把所有历史 user 的图片一并补上。
"""
last_user: CanonicalMessage | None = None
for msg in reversed(req.messages):
if msg.role == "user":
last_user = msg
break
# 所有 user 消息里的图片(用于重建会话补历史)
all_image_blocks: list[CanonicalContentBlock] = []
for msg in req.messages:
if msg.role != "user":
continue
all_image_blocks.extend(
block for block in msg.content if block.type == "image"
)
last_user_blocks: list[CanonicalContentBlock] = []
if last_user is not None:
last_user_blocks = [
block for block in last_user.content if block.type == "image"
]
if len(all_image_blocks) > MAX_IMAGE_COUNT:
raise ValueError(f"单次最多上传 {MAX_IMAGE_COUNT} 张图片")
async def _prepare(
blocks: list[CanonicalContentBlock],
) -> list[InputAttachment]:
attachments: list[InputAttachment] = []
for idx, block in enumerate(blocks, start=1):
if block.url:
prepared = await download_remote_image(
block.url, prefix=f"message_image_{idx}"
)
elif block.data and block.data.startswith("data:"):
prepared = parse_data_url(block.data, prefix=f"message_image_{idx}")
elif block.data and block.mime_type:
prepared = parse_base64_image(
block.data,
block.mime_type,
prefix=f"message_image_{idx}",
)
else:
raise ValueError("图片块缺少可用数据")
attachments.append(
InputAttachment(
filename=prepared.filename,
mime_type=prepared.mime_type,
data=prepared.data,
)
)
return attachments
last_attachments = await _prepare(last_user_blocks)
all_attachments = await _prepare(all_image_blocks)
return last_attachments, all_attachments
@staticmethod
def _to_openai_content(
blocks: list[CanonicalContentBlock],
) -> str | list[OpenAIContentPart]:
if not blocks:
return ""
parts: list[OpenAIContentPart] = []
for block in blocks:
if block.type in {"text", "thinking", "tool_result"}:
parts.append(OpenAIContentPart(type="text", text=block.text or ""))
elif block.type == "image":
url = block.url or block.data or ""
parts.append(
OpenAIContentPart(
type="image_url",
image_url={"url": url},
)
)
if not parts:
return ""
if len(parts) == 1 and parts[0].type == "text":
return parts[0].text or ""
return parts
|