import importlib import json import os import re import signal from datetime import datetime from typing import Any, Dict, List, Optional from agentflow.engine.factory import create_llm_engine from agentflow.models.formatters import ToolCommand # Tool name mapping: Static fallback mapping (long external names to internal) TOOL_NAME_MAPPING_LONG = { "Generalist_Solution_Generator_Tool": { "class_name": "Base_Generator_Tool", "dir_name": "base_generator" }, "Ground_Google_Search_Tool": { "class_name": "Google_Search_Tool", "dir_name": "google_search" }, "Python_Code_Generator_Tool": { "class_name": "Python_Coder_Tool", "dir_name": "python_coder" }, "Web_RAG_Search_Tool": { "class_name": "Web_Search_Tool", "dir_name": "web_search" }, "Wikipedia_RAG_Search_Tool": { "class_name": "Wikipedia_Search_Tool", "dir_name": "wikipedia_search" } } # Short to long mapping for fallback TOOL_NAME_MAPPING_SHORT = { "Base_Generator_Tool": "Generalist_Solution_Generator_Tool", "Google_Search_Tool": "Ground_Google_Search_Tool", "Python_Coder_Tool": "Python_Code_Generator_Tool", "Web_Search_Tool": "Web_RAG_Search_Tool", "Wikipedia_Search_Tool": "Wikipedia_RAG_Search_Tool" } class TimeoutError(Exception): pass def timeout_handler(signum, frame): raise TimeoutError("Function execution timed out") class Executor: def __init__(self, llm_engine_name: str, root_cache_dir: str = "solver_cache", num_threads: int = 1, max_time: int = 120, max_output_length: int = 100000, verbose: bool = False, base_url: str = None, check_model: bool = True, temperature: float = .0, enable_signal: bool = False): self.llm_engine_name = llm_engine_name self.root_cache_dir = root_cache_dir self.num_threads = num_threads self.max_time = max_time self.max_output_length = max_output_length self.verbose = verbose self.base_url = base_url self.check_model = check_model self.temperature = temperature self.enable_signal = enable_signal if base_url is not None: self.llm_generate_tool_command = create_llm_engine(model_string=self.llm_engine_name, is_multimodal=False, base_url=self.base_url, temperature = self.temperature) else: self.llm_generate_tool_command = create_llm_engine(model_string=self.llm_engine_name, is_multimodal=False, temperature = self.temperature) def set_query_cache_dir(self, query_cache_dir): if query_cache_dir: self.query_cache_dir = query_cache_dir else: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.query_cache_dir = os.path.join(self.root_cache_dir, timestamp) os.makedirs(self.query_cache_dir, exist_ok=True) def generate_tool_command(self, question: str, image: str, context: str, sub_goal: str, tool_name: str, tool_metadata: Dict[str, Any], step_count: int = 0, json_data: Any = None) -> Any: prompt_generate_tool_command = f""" Task: Generate a precise command to execute the selected tool. Context: - **Query:** {question} - **Sub-Goal:** {sub_goal} - **Tool Name:** {tool_name} - **Tool Metadata:** {tool_metadata} - **Relevant Data:** {context} Instructions: 1. Analyze the tool's required parameters from its metadata. 2. Construct valid Python code that addresses the sub-goal using the provided context and data. 3. The command must include at least one call to `tool.execute()`. 4. Each `tool.execute()` call must be assigned to a variable named **`execution`**. 5. Please give the exact numbers and parameters should be used in the `tool.execute()` call. Output Format: Present your response in the following structured format. Do not include any extra text or explanations. Generated Command: ```python ``` Example1: Generated Command: ```python execution = tool.execute(query="Summarize the following porblom:"Isaac has 100 toys, masa gets ...., how much are their together?") ``` Example2: Generated Command: ```python execution = tool.execute(query=["Methanol", "function of hyperbola", "Fermat's Last Theorem"]) ``` """ tool_command = self.llm_generate_tool_command(prompt_generate_tool_command, response_format=ToolCommand) if json_data is not None: json_data[f"tool_commander_{step_count}_prompt"] = prompt_generate_tool_command json_data[f"tool_commander_{step_count}_response"] = str(tool_command) return tool_command def extract_explanation_and_command(self, response: Any) -> tuple: def normalize_code(code: str) -> str: # Remove leading/trailing whitespace and triple backticks if present return re.sub(r'^```python\s*', '', code).rstrip('```').strip() analysis = "No analysis found." explanation = "No explanation found." command = "No command found." if isinstance(response, str): # Attempt to parse as JSON first try: response_dict = json.loads(response) response_obj = ToolCommand(**response_dict) analysis = response_obj.analysis.strip() explanation = response_obj.explanation.strip() command = response_obj.command.strip() except Exception as e: print(f"Failed to parse response as JSON: {str(e)}") # Fall back to regex parsing on string try: # Extract analysis analysis_pattern = r"Analysis:(.*?)Command Explanation" analysis_match = re.search(analysis_pattern, response, re.DOTALL | re.IGNORECASE) analysis = analysis_match.group(1).strip() if analysis_match else "No analysis found." # Extract explanation explanation_pattern = r"Command Explanation:(.*?)Generated Command" explanation_match = re.search(explanation_pattern, response, re.DOTALL | re.IGNORECASE) explanation = explanation_match.group(1).strip() if explanation_match else "No explanation found." # Extract command using "Generated Command:" prefix command_pattern = r"Generated Command:.*?```python\n(.*?)```" command_match = re.search(command_pattern, response, re.DOTALL | re.IGNORECASE) if command_match: command = command_match.group(1).strip() else: # Fallback: Extract ANY ```python ... ``` block (even without prefix) loose_command_pattern = r"```python\s*\n(.*?)```" loose_match = re.findall(loose_command_pattern, response, re.DOTALL | re.IGNORECASE) if loose_match: # Take the last or most complete one? Or first meaningful? # Here we take the longest one as heuristic command = max(loose_match, key=lambda x: len(x.strip())).strip() else: command = "No command found." except Exception as e: print(f"Error during regex parsing: {str(e)}") analysis = "Parsing error." explanation = "Parsing error." command = "No command found." elif isinstance(response, ToolCommand): analysis = response.analysis.strip() explanation = response.explanation.strip() command = response.command.strip() else: command = "Invalid response type." # Final normalization command = normalize_code(command) return analysis, explanation, command def execute_tool_command(self, tool_name: str, command: str) -> Any: # TODO: check here """ Execute a tool command with timeout protection. If execution exceeds max_time seconds, the function will be interrupted and return a timeout message. Args: tool_name (str): Name of the tool to execute command (str): Command string containing tool.execute() calls Returns: Any: List of execution results or error message """ def split_commands(command: str) -> List[str]: # Use regex to find all tool.execute() commands and their surrounding code pattern = r'.*?execution\s*=\s*tool\.execute\([^\n]*\)\s*(?:\n|$)' blocks = re.findall(pattern, command, re.DOTALL) return [block.strip() for block in blocks if block.strip()] def execute_with_timeout(block: str, local_context: dict) -> Optional[str]: if self.enable_signal: # Set up the timeout handler signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(self.max_time) try: # Execute the block in the local context exec(block, globals(), local_context) result = local_context.get('execution') if self.enable_signal: signal.alarm(0) # Disable the alarm return result except TimeoutError: return f"Execution timed out after {self.max_time} seconds" finally: if self.enable_signal: signal.alarm(0) # Ensure alarm is disabled even if other exceptions occur # Import the tool module and instantiate it # tool_name could be either short or long name # First check if it's a long name if tool_name in TOOL_NAME_MAPPING_LONG: dir_name = TOOL_NAME_MAPPING_LONG[tool_name]["dir_name"] class_name = TOOL_NAME_MAPPING_LONG[tool_name]["class_name"] # Then check if it's a short name (convert to long, then get internal) elif tool_name in TOOL_NAME_MAPPING_SHORT: long_name = TOOL_NAME_MAPPING_SHORT[tool_name] if long_name in TOOL_NAME_MAPPING_LONG: dir_name = TOOL_NAME_MAPPING_LONG[long_name]["dir_name"] class_name = TOOL_NAME_MAPPING_LONG[long_name]["class_name"] else: # Shouldn't happen, but fallback dir_name = tool_name.lower().replace('_tool', '') class_name = tool_name else: # Fallback to original behavior for unmapped tools dir_name = tool_name.lower().replace('_tool', '') class_name = tool_name module_name = f"tools.{dir_name}.tool" try: # Dynamically import the module module = importlib.import_module(module_name) # Get the tool class tool_class = getattr(module, class_name) tool = tool_class() # Set the custom output directory tool.set_custom_output_dir(self.query_cache_dir) # Split the command into blocks, execute each one and store execution results command_blocks = split_commands(command) executions = [] for block in command_blocks: # Create a local context to safely execute the block local_context = {'tool': tool} # Execute the block with timeout protection result = execute_with_timeout(block, local_context) if result is not None: executions.append(result) else: executions.append(f"No execution captured from block: {block}") # Return all the execution results return executions except Exception as e: return f"Error in execute_tool_command: {str(e)}"