web2api / core /api /function_call.py
ohmyapi's picture
feat: align hosted Space deployment with latest upstream
77169b4
"""
Function Call 层:解析模型输出的 <tool_call> 格式,转换为 OpenAI tool_calls;
将 tools 和 tool 结果拼入 prompt。对外统一使用 OpenAI 格式。
"""
import json
import re
import uuid
from collections.abc import Callable
from typing import Any
TOOL_CALL_PREFIX = "<tool_call>"
TOOL_CALL_PREFIX_LEN = len(TOOL_CALL_PREFIX)
TOOL_CALL_PATTERN = re.compile(
r"<tool_call>\s*(.*?)\s*</tool_call>",
re.DOTALL,
)
def parse_tool_calls(text: str) -> list[dict[str, Any]]:
"""
从文本中解析所有 <tool_call>...</tool_call> 块。
返回 [{"name": str, "arguments": dict | str}, ...]
"""
if not text or not text.strip():
return []
matches = TOOL_CALL_PATTERN.findall(text)
result: list[dict[str, Any]] = []
for m in matches:
try:
obj = json.loads(m.strip())
if isinstance(obj, dict) and "name" in obj:
args = obj.get("arguments", {})
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {}
result.append({"name": obj["name"], "arguments": args})
except json.JSONDecodeError:
pass
return result
def detect_tool_call_mode(buffer: str, *, strip_session_id: bool = True) -> bool | None:
"""
根据 buffer 内容判断是否为 tool_call 模式。
None=尚未确定,True=tool_call,False=普通文本。
strip_session_id: 若 True,先去掉开头的零宽 session_id 前缀再判断。
"""
content = buffer
if strip_session_id:
from core.api.conv_parser import strip_session_id_suffix
content = strip_session_id_suffix(buffer)
stripped = content.lstrip()
if stripped.startswith(TOOL_CALL_PREFIX):
return True
if len(stripped) > TOOL_CALL_PREFIX_LEN:
return False
return None
def format_tools_for_prompt(tools: list[dict[str, Any]]) -> str:
"""
将 OpenAI 格式的 tools 转为可读文本,用于 prompt。
兼容 OpenAI 格式 {type, function: {name, description, parameters}}
和 Cursor 格式 {name, description, input_schema}。
"""
if not tools:
return ""
lines: list[str] = []
for t in tools:
if not isinstance(t, dict):
continue
fn = t.get("function") if t.get("type") == "function" else t
if not isinstance(fn, dict):
fn = t
name = fn.get("name")
if not name:
continue
desc = fn.get("description") or fn.get("summary") or ""
params = fn.get("parameters") or fn.get("input_schema") or {}
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError:
params = {}
props = params.get("properties") or {}
required = params.get("required") or []
args_desc = ", ".join(
f"{k}: {v.get('type', 'any')}" + (" (必填)" if k in required else "")
for k, v in props.items()
)
lines.append(
f"- {name}({args_desc}): {desc[:200]}" + ("..." if len(desc) > 200 else "")
)
return "\n".join(lines) if lines else ""
def build_tool_calls_response(
tool_calls_list: list[dict[str, Any]],
chat_id: str,
model: str,
created: int,
*,
text_content: str = "",
) -> dict[str, Any]:
"""返回 OpenAI 格式的 chat.completion(含 tool_calls)。
message.content 为字符串(或空时 null),tool_calls 为 OpenAI 标准数组。
"""
tool_calls: list[dict[str, Any]] = []
for tc in tool_calls_list:
name = tc.get("name", "")
args = tc.get("arguments", {})
if isinstance(args, dict):
args_str = json.dumps(args, ensure_ascii=False)
else:
try:
args_obj = json.loads(str(args)) if args else {}
args_str = json.dumps(args_obj, ensure_ascii=False)
except json.JSONDecodeError:
args_str = "{}"
call_id = f"call_{uuid.uuid4().hex[:24]}"
tool_calls.append(
{
"id": call_id,
"type": "function",
"function": {"name": name, "arguments": args_str},
}
)
message: dict[str, Any] = {
"role": "assistant",
"content": text_content if text_content else None,
"tool_calls": tool_calls,
}
return {
"id": chat_id,
"object": "chat.completion",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"message": message,
"finish_reason": "tool_calls",
}
],
}
def _openai_sse_chunk(
chat_id: str,
model: str,
created: int,
delta: dict,
finish_reason: str | None = None,
) -> str:
"""构建 OpenAI 流式 SSE:data: <json>\\n\\n"""
choice: dict[str, Any] = {"index": 0, "delta": delta}
if finish_reason is not None:
choice["finish_reason"] = finish_reason
data = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [choice],
}
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
def build_openai_text_sse_events(
chat_id: str,
model: str,
created: int,
) -> tuple[str, Callable[[str], str], Callable[[], str]]:
"""返回 OpenAI 流式事件的工厂。
返回 (msg_start_sse, make_delta_sse, make_stop_sse)。
msg_start 为带 role 的首 chunk。
"""
def msg_start() -> str:
return _openai_sse_chunk(
chat_id,
model,
created,
delta={"role": "assistant", "content": ""},
finish_reason=None,
)
def make_delta_sse(text: str) -> str:
return _openai_sse_chunk(
chat_id,
model,
created,
delta={
"content": text,
},
finish_reason=None,
)
def make_stop_sse() -> str:
return (
_openai_sse_chunk(
chat_id,
model,
created,
delta={},
finish_reason="stop",
)
+ "data: [DONE]\n\n"
)
return msg_start(), make_delta_sse, make_stop_sse
def build_tool_calls_with_ids(
tool_calls_list: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""从 name+arguments 的 tool_calls_list 构建带 id 的 OpenAI 格式 tool_calls。
用于流式下发与 debug 保存共用同一批 id,保证下一轮 request 的 tool_call_id 一致。
"""
tool_calls: list[dict[str, Any]] = []
for i, tc in enumerate(tool_calls_list):
name = tc.get("name", "")
args = tc.get("arguments", {})
if isinstance(args, dict):
args_str = json.dumps(args, ensure_ascii=False)
else:
try:
args_obj = json.loads(str(args)) if args else {}
args_str = json.dumps(args_obj, ensure_ascii=False)
except json.JSONDecodeError:
args_str = "{}"
tool_calls.append(
{
"index": i,
"id": f"call_{uuid.uuid4().hex[:24]}",
"type": "function",
"function": {"name": name, "arguments": args_str},
}
)
return tool_calls
def build_openai_tool_use_sse_events(
tool_calls_list: list[dict[str, Any]],
chat_id: str,
model: str,
created: int,
*,
text_content: str = "",
tool_calls_with_ids: list[dict[str, Any]] | None = None,
) -> tuple[list[str], list[dict[str, Any]]]:
"""构建 OpenAI 流式 SSE 事件,用于 tool_calls 场景。
有 text_content(如 thinking)时:先发 content chunk,再发 tool_calls chunk,便于客户端先展示思考再展示工具调用。
无 text_content 时:单 chunk 发 role + tool_calls。
tool_calls 场景最后只发 finish_reason,不发 data: [DONE](think 之后不跟 [DONE])。
"""
if tool_calls_with_ids is not None:
tool_calls = tool_calls_with_ids
else:
tool_calls = build_tool_calls_with_ids(tool_calls_list)
sse_list: list[str] = []
if text_content:
# 先发 content(thinking),再发 tool_calls,同一条消息内顺序展示
sse_list.append(
_openai_sse_chunk(
chat_id,
model,
created,
{"role": "assistant", "content": text_content},
None,
)
)
sse_list.append(
_openai_sse_chunk(chat_id, model, created, {"tool_calls": tool_calls}, None)
)
else:
sse_list.append(
_openai_sse_chunk(
chat_id,
model,
created,
{
"role": "assistant",
"content": "",
"tool_calls": tool_calls,
},
None,
)
)
sse_list.append(_openai_sse_chunk(chat_id, model, created, {}, "tool_calls"))
return (sse_list, tool_calls)
def stream_openai_tool_use_sse_events(
tool_calls_list: list[dict[str, Any]],
chat_id: str,
model: str,
created: int,
*,
tool_calls_with_ids: list[dict[str, Any]] | None = None,
) -> list[str]:
"""
流式下发 tool_calls:先发每个 tool 的 id/name(arguments 为空),
再逐个发 arguments 分片,最后发 finish_reason。便于客户端逐步展示。
content(如 <think>)由调用方已通过 delta 流式发完,此处只发 tool_calls 相关 chunk。
"""
if tool_calls_with_ids is not None:
tool_calls = tool_calls_with_ids
else:
tool_calls = build_tool_calls_with_ids(tool_calls_list)
sse_list: list[str] = []
# 第一块:仅 id + type + name,arguments 为空,让客户端先展示“正在调用 xxx”
tool_calls_heads: list[dict[str, Any]] = []
for tc in tool_calls:
tool_calls_heads.append(
{
"index": tc["index"],
"id": tc["id"],
"type": "function",
"function": {"name": tc["function"]["name"], "arguments": ""},
}
)
sse_list.append(
_openai_sse_chunk(
chat_id, model, created, {"tool_calls": tool_calls_heads}, None
)
)
# 后续每块:只带 index + function.arguments,可整段发或分片发,这里按 tool 逐个发
for tc in tool_calls:
args = tc.get("function", {}).get("arguments", "") or ""
if not args:
continue
sse_list.append(
_openai_sse_chunk(
chat_id,
model,
created,
{
"tool_calls": [
{"index": tc["index"], "function": {"arguments": args}}
]
},
None,
)
)
sse_list.append(_openai_sse_chunk(chat_id, model, created, {}, "tool_calls"))
return sse_list