File size: 6,549 Bytes
77169b4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""Canonical 请求桥接到 OpenAI 语义事件流(唯一中间态)。"""

from __future__ import annotations

from collections.abc import AsyncIterator

from core.api.chat_handler import ChatHandler
from core.api.schemas import (
    InputAttachment,
    OpenAIChatRequest,
    OpenAIContentPart,
    OpenAIMessage,
)
from core.protocol.images import (
    MAX_IMAGE_COUNT,
    download_remote_image,
    parse_base64_image,
    parse_data_url,
)
from core.hub.schemas import OpenAIStreamEvent
from core.protocol.schemas import CanonicalChatRequest, CanonicalContentBlock, CanonicalMessage


class CanonicalChatService:
    def __init__(self, handler: ChatHandler) -> None:
        self._handler = handler

    async def stream_raw(
        self, req: CanonicalChatRequest
    ) -> AsyncIterator[OpenAIStreamEvent]:
        openai_req = await self._to_openai_request(req)
        async for event in self._handler.stream_openai_events(req.provider, openai_req):
            yield event

    async def collect_raw(self, req: CanonicalChatRequest) -> list[OpenAIStreamEvent]:
        events: list[OpenAIStreamEvent] = []
        async for event in self.stream_raw(req):
            events.append(event)
        return events

    async def _to_openai_request(self, req: CanonicalChatRequest) -> OpenAIChatRequest:
        messages: list[OpenAIMessage] = []
        if req.system:
            messages.append(
                OpenAIMessage(
                    role="system",
                    content=self._to_openai_content(req.system),
                )
            )
        for msg in req.messages:
            messages.append(
                OpenAIMessage(
                    role=msg.role,
                    content=self._to_openai_content(msg.content),
                    tool_call_id=msg.content[0].tool_use_id
                    if msg.role == "tool" and msg.content
                    else None,
                )
            )

        openai_tools = [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.input_schema,
                    "strict": tool.strict,
                },
            }
            for tool in req.tools
        ]
        last_user_attachments, all_attachments = await self._resolve_attachments(req)
        return OpenAIChatRequest(
            model=req.model,
            messages=messages,
            stream=req.stream,
            tools=openai_tools or None,
            tool_choice=req.tool_choice,
            resume_session_id=req.resume_session_id,
            upstream_model=str(req.metadata.get("upstream_model") or "") or None,
            # 由 ChatHandler 根据是否 full_history 选择实际赋值给 attachment_files
            attachment_files=[],
            attachment_files_last_user=last_user_attachments,
            attachment_files_all_users=all_attachments,
        )

    async def _resolve_attachments(
        self, req: CanonicalChatRequest
    ) -> tuple[list[InputAttachment], list[InputAttachment]]:
        """
        解析图片附件,返回 (last_user_attachments, all_user_attachments):

        - 复用会话(full_history=False)时,仅需最后一条 user 的图片;
        - 重建会话(full_history=True)时,需要把所有历史 user 的图片一并补上。
        """
        last_user: CanonicalMessage | None = None
        for msg in reversed(req.messages):
            if msg.role == "user":
                last_user = msg
                break

        # 所有 user 消息里的图片(用于重建会话补历史)
        all_image_blocks: list[CanonicalContentBlock] = []
        for msg in req.messages:
            if msg.role != "user":
                continue
            all_image_blocks.extend(
                block for block in msg.content if block.type == "image"
            )

        last_user_blocks: list[CanonicalContentBlock] = []
        if last_user is not None:
            last_user_blocks = [
                block for block in last_user.content if block.type == "image"
            ]

        if len(all_image_blocks) > MAX_IMAGE_COUNT:
            raise ValueError(f"单次最多上传 {MAX_IMAGE_COUNT} 张图片")

        async def _prepare(
            blocks: list[CanonicalContentBlock],
        ) -> list[InputAttachment]:
            attachments: list[InputAttachment] = []
            for idx, block in enumerate(blocks, start=1):
                if block.url:
                    prepared = await download_remote_image(
                        block.url, prefix=f"message_image_{idx}"
                    )
                elif block.data and block.data.startswith("data:"):
                    prepared = parse_data_url(block.data, prefix=f"message_image_{idx}")
                elif block.data and block.mime_type:
                    prepared = parse_base64_image(
                        block.data,
                        block.mime_type,
                        prefix=f"message_image_{idx}",
                    )
                else:
                    raise ValueError("图片块缺少可用数据")
                attachments.append(
                    InputAttachment(
                        filename=prepared.filename,
                        mime_type=prepared.mime_type,
                        data=prepared.data,
                    )
                )
            return attachments

        last_attachments = await _prepare(last_user_blocks)
        all_attachments = await _prepare(all_image_blocks)
        return last_attachments, all_attachments

    @staticmethod
    def _to_openai_content(
        blocks: list[CanonicalContentBlock],
    ) -> str | list[OpenAIContentPart]:
        if not blocks:
            return ""
        parts: list[OpenAIContentPart] = []
        for block in blocks:
            if block.type in {"text", "thinking", "tool_result"}:
                parts.append(OpenAIContentPart(type="text", text=block.text or ""))
            elif block.type == "image":
                url = block.url or block.data or ""
                parts.append(
                    OpenAIContentPart(
                        type="image_url",
                        image_url={"url": url},
                    )
                )
        if not parts:
            return ""
        if len(parts) == 1 and parts[0].type == "text":
            return parts[0].text or ""
        return parts