|
|
import ast |
|
|
import json |
|
|
import regex as re |
|
|
from collections.abc import Sequence |
|
|
from typing import List, Any |
|
|
|
|
|
from transformers import PreTrainedTokenizerBase |
|
|
from vllm.entrypoints.openai.protocol import ( |
|
|
ChatCompletionRequest, |
|
|
ChatCompletionToolsParam, |
|
|
DeltaFunctionCall, |
|
|
DeltaMessage, |
|
|
DeltaToolCall, |
|
|
ExtractedToolCallInformation, |
|
|
FunctionCall, |
|
|
ToolCall, |
|
|
) |
|
|
from vllm.entrypoints.openai.tool_parsers.abstract_tool_parser import ( |
|
|
ToolParser, |
|
|
ToolParserManager, |
|
|
) |
|
|
|
|
|
from vllm.logger import init_logger |
|
|
|
|
|
logger = init_logger(__name__) |
|
|
|
|
|
|
|
|
def _is_string_type( |
|
|
tool_name: str, arg_name: str, tools: List[ChatCompletionToolsParam] | None |
|
|
): |
|
|
if tools is None: |
|
|
return False |
|
|
for tool in tools: |
|
|
if tool.function.name == tool_name: |
|
|
if tool.function.parameters is None: |
|
|
return False |
|
|
arg_type = ( |
|
|
tool.function.parameters.get("properties", {}) |
|
|
.get(arg_name, {}) |
|
|
.get("type", None) |
|
|
) |
|
|
return arg_type == "string" |
|
|
logger.debug("No tool named '%s'.", tool_name) |
|
|
return False |
|
|
|
|
|
|
|
|
def _deserialize(value: str) -> Any: |
|
|
try: |
|
|
return json.loads(value) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
try: |
|
|
return ast.literal_eval(value) |
|
|
except Exception: |
|
|
pass |
|
|
return value |
|
|
|
|
|
|
|
|
@ToolParserManager.register_module("telechat3") |
|
|
class TeleChat3ModelToolParser(ToolParser): |
|
|
""" |
|
|
Tool call parser for TeleChat3-36B models. |
|
|
Used when --enable-auto-tool-choice --tool-call-parser telechat3 |
|
|
""" |
|
|
|
|
|
def __init__(self, tokenizer: PreTrainedTokenizerBase): |
|
|
super().__init__(tokenizer) |
|
|
|
|
|
|
|
|
|
|
|
self.current_tool_id: int = -1 |
|
|
|
|
|
self.tool_start_token = "<tool_call>" |
|
|
self.tool_end_token = "</tool_call>" |
|
|
|
|
|
self.func_detail_regex = re.compile( |
|
|
r"<tool_call>(.*?)(<param_key>.*?)?</tool_call>", re.DOTALL |
|
|
) |
|
|
self.func_arg_regex = re.compile( |
|
|
r"<param_key>(.*?)</param_key>(?:\\n|\s)*<param_value>(.*?)</param_value>", |
|
|
re.DOTALL, |
|
|
) |
|
|
self._buffer = "" |
|
|
|
|
|
def extract_tool_calls(self, model_output: str, request: ChatCompletionRequest): |
|
|
|
|
|
matched_tool_calls = self.func_detail_regex.findall(model_output) |
|
|
logger.debug("model_output: %s", model_output) |
|
|
|
|
|
tool_calls = [] |
|
|
try: |
|
|
for match in matched_tool_calls: |
|
|
tc_name = match[0].strip() |
|
|
arg_dict = {} |
|
|
if len(match) > 1: |
|
|
for key, value in self.func_arg_regex.findall(match[1]): |
|
|
arg_key = key.strip() |
|
|
arg_val = value.strip() |
|
|
if not _is_string_type(tc_name, key, request.tools): |
|
|
arg_val = _deserialize(arg_val) |
|
|
logger.debug("arg_key = %s, arg_val = %s", arg_key, arg_val) |
|
|
arg_dict[arg_key] = arg_val |
|
|
tool_calls.append( |
|
|
ToolCall( |
|
|
type="function", |
|
|
function=FunctionCall( |
|
|
name=tc_name, |
|
|
arguments=json.dumps(arg_dict, ensure_ascii=False), |
|
|
), |
|
|
) |
|
|
) |
|
|
except Exception: |
|
|
logger.exception("Failed to extract tool call spec") |
|
|
return ExtractedToolCallInformation( |
|
|
tools_called=False, tool_calls=[], content=model_output |
|
|
) |
|
|
else: |
|
|
if len(tool_calls) > 0: |
|
|
content = model_output[: model_output.find(self.tool_start_token)] |
|
|
return ExtractedToolCallInformation( |
|
|
tools_called=True, tool_calls=tool_calls, content=content |
|
|
) |
|
|
return ExtractedToolCallInformation( |
|
|
tools_called=False, tool_calls=[], content=model_output |
|
|
) |
|
|
|
|
|
def extract_tool_calls_streaming( |
|
|
self, |
|
|
previous_text: str, |
|
|
current_text: str, |
|
|
delta_text: str, |
|
|
previous_token_ids: Sequence[int], |
|
|
current_token_ids: Sequence[int], |
|
|
delta_token_ids: Sequence[int], |
|
|
request: ChatCompletionRequest, |
|
|
) -> DeltaMessage | None: |
|
|
self._buffer += delta_text |
|
|
cur_text = self._buffer |
|
|
start_idx = cur_text.find(self.tool_start_token) |
|
|
if start_idx == -1: |
|
|
self._buffer = "" |
|
|
return DeltaMessage(content=cur_text) |
|
|
logger.debug("cur_text = %s", cur_text) |
|
|
end_idx = cur_text.find(self.tool_end_token) |
|
|
if end_idx != -1: |
|
|
extracted_tool_calls = self.extract_tool_calls( |
|
|
cur_text[: end_idx + len(self.tool_end_token)], request |
|
|
) |
|
|
if len(extracted_tool_calls.tool_calls) == 0: |
|
|
logger.warning("Failed to extract any tool calls.") |
|
|
return None |
|
|
self.current_tool_id += 1 |
|
|
tool_call = extracted_tool_calls.tool_calls[0] |
|
|
delta = DeltaMessage( |
|
|
content=extracted_tool_calls.content, |
|
|
tool_calls=[ |
|
|
DeltaToolCall( |
|
|
index=self.current_tool_id, |
|
|
id=tool_call.id, |
|
|
type=tool_call.type, |
|
|
function=DeltaFunctionCall( |
|
|
name=tool_call.function.name, |
|
|
arguments=tool_call.function.arguments, |
|
|
), |
|
|
) |
|
|
], |
|
|
) |
|
|
self._buffer = cur_text[end_idx + len(self.tool_end_token) :] |
|
|
return delta |
|
|
self._buffer = cur_text[start_idx:] |
|
|
return DeltaMessage(content=cur_text[:start_idx]) |
|
|
|
|
|
|
|
|
def register_tool_parser(): ... |
|
|
|