| """ |
| Function Call 层:解析模型输出的 <tool_call> 格式,转换为 OpenAI tool_calls; |
| 将 tools 和 tool 结果拼入 prompt。对外统一使用 OpenAI 格式。 |
| """ |
|
|
| import json |
| import re |
| import uuid |
| from collections.abc import Callable |
| from typing import Any |
|
|
| TOOL_CALL_PREFIX = "<tool_call>" |
| TOOL_CALL_PREFIX_LEN = len(TOOL_CALL_PREFIX) |
| TOOL_CALL_PATTERN = re.compile( |
| r"<tool_call>\s*(.*?)\s*</tool_call>", |
| re.DOTALL, |
| ) |
|
|
|
|
| def parse_tool_calls(text: str) -> list[dict[str, Any]]: |
| """ |
| 从文本中解析所有 <tool_call>...</tool_call> 块。 |
| 返回 [{"name": str, "arguments": dict | str}, ...] |
| """ |
| if not text or not text.strip(): |
| return [] |
| matches = TOOL_CALL_PATTERN.findall(text) |
| result: list[dict[str, Any]] = [] |
| for m in matches: |
| try: |
| obj = json.loads(m.strip()) |
| if isinstance(obj, dict) and "name" in obj: |
| args = obj.get("arguments", {}) |
| if isinstance(args, str): |
| try: |
| args = json.loads(args) |
| except json.JSONDecodeError: |
| args = {} |
| result.append({"name": obj["name"], "arguments": args}) |
| except json.JSONDecodeError: |
| pass |
| return result |
|
|
|
|
| def detect_tool_call_mode(buffer: str, *, strip_session_id: bool = True) -> bool | None: |
| """ |
| 根据 buffer 内容判断是否为 tool_call 模式。 |
| None=尚未确定,True=tool_call,False=普通文本。 |
| strip_session_id: 若 True,先去掉开头的零宽 session_id 前缀再判断。 |
| """ |
| content = buffer |
| if strip_session_id: |
| from core.api.conv_parser import strip_session_id_suffix |
|
|
| content = strip_session_id_suffix(buffer) |
| stripped = content.lstrip() |
| if stripped.startswith(TOOL_CALL_PREFIX): |
| return True |
| if len(stripped) > TOOL_CALL_PREFIX_LEN: |
| return False |
| return None |
|
|
|
|
| def format_tools_for_prompt(tools: list[dict[str, Any]]) -> str: |
| """ |
| 将 OpenAI 格式的 tools 转为可读文本,用于 prompt。 |
| 兼容 OpenAI 格式 {type, function: {name, description, parameters}} |
| 和 Cursor 格式 {name, description, input_schema}。 |
| """ |
| if not tools: |
| return "" |
| lines: list[str] = [] |
| for t in tools: |
| if not isinstance(t, dict): |
| continue |
| fn = t.get("function") if t.get("type") == "function" else t |
| if not isinstance(fn, dict): |
| fn = t |
| name = fn.get("name") |
| if not name: |
| continue |
| desc = fn.get("description") or fn.get("summary") or "" |
| params = fn.get("parameters") or fn.get("input_schema") or {} |
| if isinstance(params, str): |
| try: |
| params = json.loads(params) |
| except json.JSONDecodeError: |
| params = {} |
| props = params.get("properties") or {} |
| required = params.get("required") or [] |
| args_desc = ", ".join( |
| f"{k}: {v.get('type', 'any')}" + (" (必填)" if k in required else "") |
| for k, v in props.items() |
| ) |
| lines.append( |
| f"- {name}({args_desc}): {desc[:200]}" + ("..." if len(desc) > 200 else "") |
| ) |
| return "\n".join(lines) if lines else "" |
|
|
|
|
| def build_tool_calls_response( |
| tool_calls_list: list[dict[str, Any]], |
| chat_id: str, |
| model: str, |
| created: int, |
| *, |
| text_content: str = "", |
| ) -> dict[str, Any]: |
| """返回 OpenAI 格式的 chat.completion(含 tool_calls)。 |
| message.content 为字符串(或空时 null),tool_calls 为 OpenAI 标准数组。 |
| """ |
| tool_calls: list[dict[str, Any]] = [] |
| for tc in tool_calls_list: |
| name = tc.get("name", "") |
| args = tc.get("arguments", {}) |
| if isinstance(args, dict): |
| args_str = json.dumps(args, ensure_ascii=False) |
| else: |
| try: |
| args_obj = json.loads(str(args)) if args else {} |
| args_str = json.dumps(args_obj, ensure_ascii=False) |
| except json.JSONDecodeError: |
| args_str = "{}" |
| call_id = f"call_{uuid.uuid4().hex[:24]}" |
| tool_calls.append( |
| { |
| "id": call_id, |
| "type": "function", |
| "function": {"name": name, "arguments": args_str}, |
| } |
| ) |
| message: dict[str, Any] = { |
| "role": "assistant", |
| "content": text_content if text_content else None, |
| "tool_calls": tool_calls, |
| } |
| return { |
| "id": chat_id, |
| "object": "chat.completion", |
| "created": created, |
| "model": model, |
| "choices": [ |
| { |
| "index": 0, |
| "message": message, |
| "finish_reason": "tool_calls", |
| } |
| ], |
| } |
|
|
|
|
| def _openai_sse_chunk( |
| chat_id: str, |
| model: str, |
| created: int, |
| delta: dict, |
| finish_reason: str | None = None, |
| ) -> str: |
| """构建 OpenAI 流式 SSE:data: <json>\\n\\n""" |
| choice: dict[str, Any] = {"index": 0, "delta": delta} |
| if finish_reason is not None: |
| choice["finish_reason"] = finish_reason |
| data = { |
| "id": chat_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": model, |
| "choices": [choice], |
| } |
| return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" |
|
|
|
|
| def build_openai_text_sse_events( |
| chat_id: str, |
| model: str, |
| created: int, |
| ) -> tuple[str, Callable[[str], str], Callable[[], str]]: |
| """返回 OpenAI 流式事件的工厂。 |
| 返回 (msg_start_sse, make_delta_sse, make_stop_sse)。 |
| msg_start 为带 role 的首 chunk。 |
| """ |
|
|
| def msg_start() -> str: |
| return _openai_sse_chunk( |
| chat_id, |
| model, |
| created, |
| delta={"role": "assistant", "content": ""}, |
| finish_reason=None, |
| ) |
|
|
| def make_delta_sse(text: str) -> str: |
| return _openai_sse_chunk( |
| chat_id, |
| model, |
| created, |
| delta={ |
| "content": text, |
| }, |
| finish_reason=None, |
| ) |
|
|
| def make_stop_sse() -> str: |
| return ( |
| _openai_sse_chunk( |
| chat_id, |
| model, |
| created, |
| delta={}, |
| finish_reason="stop", |
| ) |
| + "data: [DONE]\n\n" |
| ) |
|
|
| return msg_start(), make_delta_sse, make_stop_sse |
|
|
|
|
| def build_tool_calls_with_ids( |
| tool_calls_list: list[dict[str, Any]], |
| ) -> list[dict[str, Any]]: |
| """从 name+arguments 的 tool_calls_list 构建带 id 的 OpenAI 格式 tool_calls。 |
| 用于流式下发与 debug 保存共用同一批 id,保证下一轮 request 的 tool_call_id 一致。 |
| """ |
| tool_calls: list[dict[str, Any]] = [] |
| for i, tc in enumerate(tool_calls_list): |
| name = tc.get("name", "") |
| args = tc.get("arguments", {}) |
| if isinstance(args, dict): |
| args_str = json.dumps(args, ensure_ascii=False) |
| else: |
| try: |
| args_obj = json.loads(str(args)) if args else {} |
| args_str = json.dumps(args_obj, ensure_ascii=False) |
| except json.JSONDecodeError: |
| args_str = "{}" |
| tool_calls.append( |
| { |
| "index": i, |
| "id": f"call_{uuid.uuid4().hex[:24]}", |
| "type": "function", |
| "function": {"name": name, "arguments": args_str}, |
| } |
| ) |
| return tool_calls |
|
|
|
|
| def build_openai_tool_use_sse_events( |
| tool_calls_list: list[dict[str, Any]], |
| chat_id: str, |
| model: str, |
| created: int, |
| *, |
| text_content: str = "", |
| tool_calls_with_ids: list[dict[str, Any]] | None = None, |
| ) -> tuple[list[str], list[dict[str, Any]]]: |
| """构建 OpenAI 流式 SSE 事件,用于 tool_calls 场景。 |
| 有 text_content(如 thinking)时:先发 content chunk,再发 tool_calls chunk,便于客户端先展示思考再展示工具调用。 |
| 无 text_content 时:单 chunk 发 role + tool_calls。 |
| tool_calls 场景最后只发 finish_reason,不发 data: [DONE](think 之后不跟 [DONE])。 |
| """ |
| if tool_calls_with_ids is not None: |
| tool_calls = tool_calls_with_ids |
| else: |
| tool_calls = build_tool_calls_with_ids(tool_calls_list) |
| sse_list: list[str] = [] |
| if text_content: |
| |
| sse_list.append( |
| _openai_sse_chunk( |
| chat_id, |
| model, |
| created, |
| {"role": "assistant", "content": text_content}, |
| None, |
| ) |
| ) |
| sse_list.append( |
| _openai_sse_chunk(chat_id, model, created, {"tool_calls": tool_calls}, None) |
| ) |
| else: |
| sse_list.append( |
| _openai_sse_chunk( |
| chat_id, |
| model, |
| created, |
| { |
| "role": "assistant", |
| "content": "", |
| "tool_calls": tool_calls, |
| }, |
| None, |
| ) |
| ) |
| sse_list.append(_openai_sse_chunk(chat_id, model, created, {}, "tool_calls")) |
| return (sse_list, tool_calls) |
|
|
|
|
| def stream_openai_tool_use_sse_events( |
| tool_calls_list: list[dict[str, Any]], |
| chat_id: str, |
| model: str, |
| created: int, |
| *, |
| tool_calls_with_ids: list[dict[str, Any]] | None = None, |
| ) -> list[str]: |
| """ |
| 流式下发 tool_calls:先发每个 tool 的 id/name(arguments 为空), |
| 再逐个发 arguments 分片,最后发 finish_reason。便于客户端逐步展示。 |
| content(如 <think>)由调用方已通过 delta 流式发完,此处只发 tool_calls 相关 chunk。 |
| """ |
| if tool_calls_with_ids is not None: |
| tool_calls = tool_calls_with_ids |
| else: |
| tool_calls = build_tool_calls_with_ids(tool_calls_list) |
| sse_list: list[str] = [] |
| |
| tool_calls_heads: list[dict[str, Any]] = [] |
| for tc in tool_calls: |
| tool_calls_heads.append( |
| { |
| "index": tc["index"], |
| "id": tc["id"], |
| "type": "function", |
| "function": {"name": tc["function"]["name"], "arguments": ""}, |
| } |
| ) |
| sse_list.append( |
| _openai_sse_chunk( |
| chat_id, model, created, {"tool_calls": tool_calls_heads}, None |
| ) |
| ) |
| |
| for tc in tool_calls: |
| args = tc.get("function", {}).get("arguments", "") or "" |
| if not args: |
| continue |
| sse_list.append( |
| _openai_sse_chunk( |
| chat_id, |
| model, |
| created, |
| { |
| "tool_calls": [ |
| {"index": tc["index"], "function": {"arguments": args}} |
| ] |
| }, |
| None, |
| ) |
| ) |
| sse_list.append(_openai_sse_chunk(chat_id, model, created, {}, "tool_calls")) |
| return sse_list |
|
|