""" Function Call 层:解析模型输出的 格式,转换为 OpenAI tool_calls; 将 tools 和 tool 结果拼入 prompt。对外统一使用 OpenAI 格式。 """ import json import re import uuid from collections.abc import Callable from typing import Any TOOL_CALL_PREFIX = "" TOOL_CALL_PREFIX_LEN = len(TOOL_CALL_PREFIX) TOOL_CALL_PATTERN = re.compile( r"\s*(.*?)\s*", re.DOTALL, ) def parse_tool_calls(text: str) -> list[dict[str, Any]]: """ 从文本中解析所有 ... 块。 返回 [{"name": str, "arguments": dict | str}, ...] """ if not text or not text.strip(): return [] matches = TOOL_CALL_PATTERN.findall(text) result: list[dict[str, Any]] = [] for m in matches: try: obj = json.loads(m.strip()) if isinstance(obj, dict) and "name" in obj: args = obj.get("arguments", {}) if isinstance(args, str): try: args = json.loads(args) except json.JSONDecodeError: args = {} result.append({"name": obj["name"], "arguments": args}) except json.JSONDecodeError: pass return result def detect_tool_call_mode(buffer: str, *, strip_session_id: bool = True) -> bool | None: """ 根据 buffer 内容判断是否为 tool_call 模式。 None=尚未确定,True=tool_call,False=普通文本。 strip_session_id: 若 True,先去掉开头的零宽 session_id 前缀再判断。 """ content = buffer if strip_session_id: from core.api.conv_parser import strip_session_id_suffix content = strip_session_id_suffix(buffer) stripped = content.lstrip() if stripped.startswith(TOOL_CALL_PREFIX): return True if len(stripped) > TOOL_CALL_PREFIX_LEN: return False return None def format_tools_for_prompt(tools: list[dict[str, Any]]) -> str: """ 将 OpenAI 格式的 tools 转为可读文本,用于 prompt。 兼容 OpenAI 格式 {type, function: {name, description, parameters}} 和 Cursor 格式 {name, description, input_schema}。 """ if not tools: return "" lines: list[str] = [] for t in tools: if not isinstance(t, dict): continue fn = t.get("function") if t.get("type") == "function" else t if not isinstance(fn, dict): fn = t name = fn.get("name") if not name: continue desc = fn.get("description") or fn.get("summary") or "" params = fn.get("parameters") or fn.get("input_schema") or {} if isinstance(params, str): try: params = json.loads(params) except json.JSONDecodeError: params = {} props = params.get("properties") or {} required = params.get("required") or [] args_desc = ", ".join( f"{k}: {v.get('type', 'any')}" + (" (必填)" if k in required else "") for k, v in props.items() ) lines.append( f"- {name}({args_desc}): {desc[:200]}" + ("..." if len(desc) > 200 else "") ) return "\n".join(lines) if lines else "" def build_tool_calls_response( tool_calls_list: list[dict[str, Any]], chat_id: str, model: str, created: int, *, text_content: str = "", ) -> dict[str, Any]: """返回 OpenAI 格式的 chat.completion(含 tool_calls)。 message.content 为字符串(或空时 null),tool_calls 为 OpenAI 标准数组。 """ tool_calls: list[dict[str, Any]] = [] for tc in tool_calls_list: name = tc.get("name", "") args = tc.get("arguments", {}) if isinstance(args, dict): args_str = json.dumps(args, ensure_ascii=False) else: try: args_obj = json.loads(str(args)) if args else {} args_str = json.dumps(args_obj, ensure_ascii=False) except json.JSONDecodeError: args_str = "{}" call_id = f"call_{uuid.uuid4().hex[:24]}" tool_calls.append( { "id": call_id, "type": "function", "function": {"name": name, "arguments": args_str}, } ) message: dict[str, Any] = { "role": "assistant", "content": text_content if text_content else None, "tool_calls": tool_calls, } return { "id": chat_id, "object": "chat.completion", "created": created, "model": model, "choices": [ { "index": 0, "message": message, "finish_reason": "tool_calls", } ], } def _openai_sse_chunk( chat_id: str, model: str, created: int, delta: dict, finish_reason: str | None = None, ) -> str: """构建 OpenAI 流式 SSE:data: \\n\\n""" choice: dict[str, Any] = {"index": 0, "delta": delta} if finish_reason is not None: choice["finish_reason"] = finish_reason data = { "id": chat_id, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [choice], } return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" def build_openai_text_sse_events( chat_id: str, model: str, created: int, ) -> tuple[str, Callable[[str], str], Callable[[], str]]: """返回 OpenAI 流式事件的工厂。 返回 (msg_start_sse, make_delta_sse, make_stop_sse)。 msg_start 为带 role 的首 chunk。 """ def msg_start() -> str: return _openai_sse_chunk( chat_id, model, created, delta={"role": "assistant", "content": ""}, finish_reason=None, ) def make_delta_sse(text: str) -> str: return _openai_sse_chunk( chat_id, model, created, delta={ "content": text, }, finish_reason=None, ) def make_stop_sse() -> str: return ( _openai_sse_chunk( chat_id, model, created, delta={}, finish_reason="stop", ) + "data: [DONE]\n\n" ) return msg_start(), make_delta_sse, make_stop_sse def build_tool_calls_with_ids( tool_calls_list: list[dict[str, Any]], ) -> list[dict[str, Any]]: """从 name+arguments 的 tool_calls_list 构建带 id 的 OpenAI 格式 tool_calls。 用于流式下发与 debug 保存共用同一批 id,保证下一轮 request 的 tool_call_id 一致。 """ tool_calls: list[dict[str, Any]] = [] for i, tc in enumerate(tool_calls_list): name = tc.get("name", "") args = tc.get("arguments", {}) if isinstance(args, dict): args_str = json.dumps(args, ensure_ascii=False) else: try: args_obj = json.loads(str(args)) if args else {} args_str = json.dumps(args_obj, ensure_ascii=False) except json.JSONDecodeError: args_str = "{}" tool_calls.append( { "index": i, "id": f"call_{uuid.uuid4().hex[:24]}", "type": "function", "function": {"name": name, "arguments": args_str}, } ) return tool_calls def build_openai_tool_use_sse_events( tool_calls_list: list[dict[str, Any]], chat_id: str, model: str, created: int, *, text_content: str = "", tool_calls_with_ids: list[dict[str, Any]] | None = None, ) -> tuple[list[str], list[dict[str, Any]]]: """构建 OpenAI 流式 SSE 事件,用于 tool_calls 场景。 有 text_content(如 thinking)时:先发 content chunk,再发 tool_calls chunk,便于客户端先展示思考再展示工具调用。 无 text_content 时:单 chunk 发 role + tool_calls。 tool_calls 场景最后只发 finish_reason,不发 data: [DONE](think 之后不跟 [DONE])。 """ if tool_calls_with_ids is not None: tool_calls = tool_calls_with_ids else: tool_calls = build_tool_calls_with_ids(tool_calls_list) sse_list: list[str] = [] if text_content: # 先发 content(thinking),再发 tool_calls,同一条消息内顺序展示 sse_list.append( _openai_sse_chunk( chat_id, model, created, {"role": "assistant", "content": text_content}, None, ) ) sse_list.append( _openai_sse_chunk(chat_id, model, created, {"tool_calls": tool_calls}, None) ) else: sse_list.append( _openai_sse_chunk( chat_id, model, created, { "role": "assistant", "content": "", "tool_calls": tool_calls, }, None, ) ) sse_list.append(_openai_sse_chunk(chat_id, model, created, {}, "tool_calls")) return (sse_list, tool_calls) def stream_openai_tool_use_sse_events( tool_calls_list: list[dict[str, Any]], chat_id: str, model: str, created: int, *, tool_calls_with_ids: list[dict[str, Any]] | None = None, ) -> list[str]: """ 流式下发 tool_calls:先发每个 tool 的 id/name(arguments 为空), 再逐个发 arguments 分片,最后发 finish_reason。便于客户端逐步展示。 content(如 )由调用方已通过 delta 流式发完,此处只发 tool_calls 相关 chunk。 """ if tool_calls_with_ids is not None: tool_calls = tool_calls_with_ids else: tool_calls = build_tool_calls_with_ids(tool_calls_list) sse_list: list[str] = [] # 第一块:仅 id + type + name,arguments 为空,让客户端先展示“正在调用 xxx” tool_calls_heads: list[dict[str, Any]] = [] for tc in tool_calls: tool_calls_heads.append( { "index": tc["index"], "id": tc["id"], "type": "function", "function": {"name": tc["function"]["name"], "arguments": ""}, } ) sse_list.append( _openai_sse_chunk( chat_id, model, created, {"tool_calls": tool_calls_heads}, None ) ) # 后续每块:只带 index + function.arguments,可整段发或分片发,这里按 tool 逐个发 for tc in tool_calls: args = tc.get("function", {}).get("arguments", "") or "" if not args: continue sse_list.append( _openai_sse_chunk( chat_id, model, created, { "tool_calls": [ {"index": tc["index"], "function": {"arguments": args}} ] }, None, ) ) sse_list.append(_openai_sse_chunk(chat_id, model, created, {}, "tool_calls")) return sse_list