| import re |
| import json |
| import uuid |
|
|
| def parse_arguments(json_value): |
| """ |
| Attempt to parse a string as JSON |
| |
| Args: |
| json_value: String to parse |
| |
| Returns: |
| tuple: (parsed_value, is_valid_json) |
| """ |
| try: |
| parsed_value = json.loads(json_value) |
| return parsed_value, True |
| except: |
| return json_value, False |
|
|
| def get_argument_type(func_name: str, arg_key: str, defined_tools: list): |
| """ |
| Get the type definition of a tool parameter |
| |
| Args: |
| func_name: Name of the function/tool |
| arg_key: Parameter key name |
| defined_tools: List of tool definitions |
| |
| Returns: |
| str or None: Type of the parameter ('string', 'object', 'array', 'integer', 'number', 'boolean') |
| """ |
| name2tool = {tool["name"]: tool for tool in defined_tools} |
| if func_name not in name2tool: |
| return None |
| tool = name2tool[func_name] |
| if "parameters" not in tool or "properties" not in tool["parameters"]: |
| return None |
| if arg_key not in tool["parameters"]["properties"]: |
| return None |
| return tool["parameters"]["properties"][arg_key].get("type") |
|
|
| def parse_model_response(response: str, defined_tools: list=[]): |
| """ |
| Parse model response to extract reasoning_content, content, and tool_calls |
| |
| Args: |
| response: Raw response text from the model |
| defined_tools: List of tool definitions |
| |
| Returns: |
| dict: Message containing role, reasoning_content (optional), content (optional), |
| and tool_calls (optional) |
| """ |
| text = response |
| reasoning_content = None |
| content = None |
| tool_calls = [] |
| |
| formatted_tools = [] |
| for tool in defined_tools: |
| if "function" in tool: |
| formatted_tools.append(tool['function']) |
| else: |
| formatted_tools.append(tool) |
| |
| if '</longcat_think>' in text: |
| text = text.replace('<longcat_think>', '') |
| thinking_end = text.find('</longcat_think>') |
| reasoning_content = text[: thinking_end].strip() |
| text = text[thinking_end + len('</longcat_think>'):].lstrip() |
| |
| assert '<longcat_think>' not in text, "Unclosed <longcat_think> tag found in remaining text" |
| assert '</longcat_think>' not in text, "Unexpected </longcat_think> tag found without opening tag" |
| |
| if '<longcat_tool_call>' in text: |
| index = text.find('<longcat_tool_call>') |
| content = text[:index] |
| text = text[index:].strip() |
| else: |
| content = text |
| text = "" |
| |
| open_tags = text.count('<longcat_tool_call>') |
| close_tags = text.count('</longcat_tool_call>') |
| assert open_tags == close_tags, \ |
| f"Mismatched tool_call tags: {open_tags} opening tags, {close_tags} closing tags" |
| |
| tool_call_strs = re.findall( |
| r'<longcat_tool_call>(.*?)</longcat_tool_call>', |
| text, |
| re.DOTALL |
| ) |
| |
| for call in tool_call_strs: |
| func_name_match = re.match(r'([^\n<]+)', call.strip()) |
| assert func_name_match, f"Missing function name in tool call: {call[:100]}" |
| |
| func_name = func_name_match.group(1).strip() |
| assert func_name, "Empty function name in tool call" |
| |
| |
| arg_key_count = call.count('<longcat_arg_key>') |
| arg_key_close_count = call.count('</longcat_arg_key>') |
| arg_value_count = call.count('<longcat_arg_value>') |
| arg_value_close_count = call.count('</longcat_arg_value>') |
| |
| assert arg_key_count == arg_key_close_count, \ |
| f"Mismatched arg_key tags in function {func_name}: {arg_key_count} opening, {arg_key_close_count} closing" |
| assert arg_value_count == arg_value_close_count, \ |
| f"Mismatched arg_value tags in function {func_name}: {arg_value_count} opening, {arg_value_close_count} closing" |
| assert arg_key_count == arg_value_count, \ |
| f"Mismatched arg_key and arg_value count in function {func_name}: {arg_key_count} keys, {arg_value_count} values" |
| |
| pairs = re.findall( |
| r'<longcat_arg_key>(.*?)</longcat_arg_key>\s*<longcat_arg_value>(.*?)</longcat_arg_value>', |
| call, |
| re.DOTALL |
| ) |
| |
| assert len(pairs) == arg_key_count, \ |
| f"Failed to parse all arguments in function {func_name}: expected {arg_key_count}, got {len(pairs)}" |
| |
| arguments = {} |
| for arg_key, arg_value in pairs: |
| arg_key = arg_key.strip() |
| arg_value = arg_value.strip() |
| |
| assert arg_key, f"Empty argument key in function {func_name}" |
| assert arg_key not in arguments, \ |
| f"Duplicate argument key '{arg_key}' in function {func_name}" |
| |
| arg_type = get_argument_type(func_name, arg_key, formatted_tools) |
| |
| if arg_type and arg_type != 'string': |
| parsed_value, is_good_json = parse_arguments(arg_value) |
| arg_value = parsed_value |
| |
| arguments[arg_key] = arg_value |
| |
| tool_calls.append({ |
| 'id': "tool-call-" + str(uuid.uuid4()), |
| 'type': "function", |
| 'function': { |
| 'name': func_name, |
| 'arguments': arguments |
| } |
| }) |
| |
| message = {'role': 'assistant'} |
| |
| if reasoning_content: |
| message['reasoning_content'] = reasoning_content |
| message['content'] = content |
| if tool_calls: |
| message['tool_calls'] = tool_calls |
| |
| return message |
|
|