""" Tool processing utilities """ import json import re import time from typing import Dict, List, Optional, Any from app.core.config import settings def content_to_string(content: Any) -> str: """Convert content from various formats to string (following app.py pattern)""" if isinstance(content, str): return content if isinstance(content, list): parts = [] for p in content: if isinstance(p, dict) and p.get("type") == "text": parts.append(p.get("text", "")) elif isinstance(p, str): parts.append(p) return " ".join(parts) return "" def generate_tool_prompt(tools: List[Dict[str, Any]]) -> str: """Generate tool injection prompt with enhanced formatting""" if not tools: return "" tool_definitions = [] for tool in tools: if tool.get("type") != "function": continue function_spec = tool.get("function", {}) or {} function_name = function_spec.get("name", "unknown") function_description = function_spec.get("description", "") parameters = function_spec.get("parameters", {}) or {} # Create structured tool definition tool_info = [f"## {function_name}", f"**Purpose**: {function_description}"] # Add parameter details parameter_properties = parameters.get("properties", {}) or {} required_parameters = set(parameters.get("required", []) or []) if parameter_properties: tool_info.append("**Parameters**:") for param_name, param_details in parameter_properties.items(): param_type = (param_details or {}).get("type", "any") param_desc = (param_details or {}).get("description", "") requirement_flag = "**Required**" if param_name in required_parameters else "*Optional*" tool_info.append(f"- `{param_name}` ({param_type}) - {requirement_flag}: {param_desc}") tool_definitions.append("\n".join(tool_info)) if not tool_definitions: return "" # Build comprehensive tool prompt prompt_template = ( "\n\n# AVAILABLE FUNCTIONS\n" + "\n\n---\n".join(tool_definitions) + "\n\n# USAGE INSTRUCTIONS\n" "When you need to execute a function, respond ONLY with a JSON object containing tool_calls:\n" "```json\n" "{\n" ' "tool_calls": [\n' " {\n" ' "id": "call_xxx",\n' ' "type": "function",\n' ' "function": {\n' ' "name": "function_name",\n' ' "arguments": "{\\"param1\\": \\"value1\\"}"\n' " }\n" " }\n" " ]\n" "}\n" "```\n" "Important: No explanatory text before or after the JSON. The 'arguments' field must be a JSON string, not an object.\n" ) return prompt_template def process_messages_with_tools( messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]] = None, tool_choice: Optional[Any] = None ) -> List[Dict[str, Any]]: """Process messages and inject tool prompts""" processed: List[Dict[str, Any]] = [] if tools and settings.TOOL_SUPPORT and (tool_choice != "none"): tools_prompt = generate_tool_prompt(tools) has_system = any(m.get("role") == "system" for m in messages) if has_system: for m in messages: if m.get("role") == "system": mm = dict(m) content = content_to_string(mm.get("content", "")) mm["content"] = content + tools_prompt processed.append(mm) else: processed.append(m) else: processed = [{"role": "system", "content": "你是一个有用的助手。" + tools_prompt}] + messages # Add tool choice hints if tool_choice in ("required", "auto"): if processed and processed[-1].get("role") == "user": last = dict(processed[-1]) content = content_to_string(last.get("content", "")) last["content"] = content + "\n\n请根据需要使用提供的工具函数。" processed[-1] = last elif isinstance(tool_choice, dict) and tool_choice.get("type") == "function": fname = (tool_choice.get("function") or {}).get("name") if fname and processed and processed[-1].get("role") == "user": last = dict(processed[-1]) content = content_to_string(last.get("content", "")) last["content"] = content + f"\n\n请使用 {fname} 函数来处理这个请求。" processed[-1] = last else: processed = list(messages) # Handle tool/function messages final_msgs: List[Dict[str, Any]] = [] for m in processed: role = m.get("role") if role in ("tool", "function"): tool_name = m.get("name", "unknown") tool_content = content_to_string(m.get("content", "")) if isinstance(tool_content, dict): tool_content = json.dumps(tool_content, ensure_ascii=False) # 确保内容不为空且不包含 None content = f"工具 {tool_name} 返回结果:\n```json\n{tool_content}\n```" if not content.strip(): content = f"工具 {tool_name} 执行完成" final_msgs.append( { "role": "assistant", "content": content, } ) else: # For regular messages, ensure content is string format final_msg = dict(m) content = content_to_string(final_msg.get("content", "")) final_msg["content"] = content final_msgs.append(final_msg) return final_msgs # Tool Extraction Patterns TOOL_CALL_FENCE_PATTERN = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL) # 注意:TOOL_CALL_INLINE_PATTERN 已被移除,因为它会导致过度匹配 # 现在在 remove_tool_json_content 函数中使用基于括号平衡的方法 FUNCTION_CALL_PATTERN = re.compile(r"调用函数\s*[::]\s*([\w\-\.]+)\s*(?:参数|arguments)[::]\s*(\{.*?\})", re.DOTALL) def extract_tool_invocations(text: str) -> Optional[List[Dict[str, Any]]]: """Extract tool invocations from response text""" if not text: return None # Limit scan size for performance scannable_text = text[: settings.SCAN_LIMIT] # Attempt 1: Extract from JSON code blocks json_blocks = TOOL_CALL_FENCE_PATTERN.findall(scannable_text) for json_block in json_blocks: try: parsed_data = json.loads(json_block) tool_calls = parsed_data.get("tool_calls") if tool_calls and isinstance(tool_calls, list): # Ensure arguments field is a string for tc in tool_calls: if "function" in tc: func = tc["function"] if "arguments" in func: if isinstance(func["arguments"], dict): # Convert dict to JSON string func["arguments"] = json.dumps(func["arguments"], ensure_ascii=False) elif not isinstance(func["arguments"], str): func["arguments"] = json.dumps(func["arguments"], ensure_ascii=False) return tool_calls except (json.JSONDecodeError, AttributeError): continue # Attempt 2: Extract inline JSON objects using bracket balance method # 查找包含 "tool_calls" 的 JSON 对象 i = 0 while i < len(scannable_text): if scannable_text[i] == '{': # 尝试找到匹配的右括号 brace_count = 1 j = i + 1 in_string = False escape_next = False while j < len(scannable_text) and brace_count > 0: if escape_next: escape_next = False elif scannable_text[j] == '\\': escape_next = True elif scannable_text[j] == '"' and not escape_next: in_string = not in_string elif not in_string: if scannable_text[j] == '{': brace_count += 1 elif scannable_text[j] == '}': brace_count -= 1 j += 1 if brace_count == 0: # 找到了完整的 JSON 对象 json_str = scannable_text[i:j] try: parsed_data = json.loads(json_str) tool_calls = parsed_data.get("tool_calls") if tool_calls and isinstance(tool_calls, list): # Ensure arguments field is a string for tc in tool_calls: if "function" in tc: func = tc["function"] if "arguments" in func: if isinstance(func["arguments"], dict): # Convert dict to JSON string func["arguments"] = json.dumps(func["arguments"], ensure_ascii=False) elif not isinstance(func["arguments"], str): func["arguments"] = json.dumps(func["arguments"], ensure_ascii=False) return tool_calls except (json.JSONDecodeError, AttributeError): pass i += 1 else: i += 1 # Attempt 3: Parse natural language function calls natural_lang_match = FUNCTION_CALL_PATTERN.search(scannable_text) if natural_lang_match: function_name = natural_lang_match.group(1).strip() arguments_str = natural_lang_match.group(2).strip() try: # Validate JSON format json.loads(arguments_str) return [ { "id": f"call_{int(time.time() * 1000000)}", "type": "function", "function": {"name": function_name, "arguments": arguments_str}, } ] except json.JSONDecodeError: return None return None def remove_tool_json_content(text: str) -> str: """Remove tool JSON content from response text - using bracket balance method""" def remove_tool_call_block(match: re.Match) -> str: json_content = match.group(1) try: parsed_data = json.loads(json_content) if "tool_calls" in parsed_data: return "" except (json.JSONDecodeError, AttributeError): pass return match.group(0) # Step 1: Remove fenced tool JSON blocks cleaned_text = TOOL_CALL_FENCE_PATTERN.sub(remove_tool_call_block, text) # Step 2: Remove inline tool JSON - 使用基于括号平衡的智能方法 # 查找所有可能的 JSON 对象并精确删除包含 tool_calls 的对象 result = [] i = 0 while i < len(cleaned_text): if cleaned_text[i] == '{': # 尝试找到匹配的右括号 brace_count = 1 j = i + 1 in_string = False escape_next = False while j < len(cleaned_text) and brace_count > 0: if escape_next: escape_next = False elif cleaned_text[j] == '\\': escape_next = True elif cleaned_text[j] == '"' and not escape_next: in_string = not in_string elif not in_string: if cleaned_text[j] == '{': brace_count += 1 elif cleaned_text[j] == '}': brace_count -= 1 j += 1 if brace_count == 0: # 找到了完整的 JSON 对象 json_str = cleaned_text[i:j] try: parsed = json.loads(json_str) if "tool_calls" in parsed: # 这是一个工具调用,跳过它 i = j continue except: pass # 不是工具调用或无法解析,保留这个字符 result.append(cleaned_text[i]) i += 1 else: result.append(cleaned_text[i]) i += 1 return ''.join(result).strip()