IsaacGHX
update
d12a6df
import importlib
import json
import os
import re
import signal
from datetime import datetime
from typing import Any, Dict, List, Optional
from agentflow.engine.factory import create_llm_engine
from agentflow.models.formatters import ToolCommand
# Tool name mapping: Static fallback mapping (long external names to internal)
TOOL_NAME_MAPPING_LONG = {
"Generalist_Solution_Generator_Tool": {
"class_name": "Base_Generator_Tool",
"dir_name": "base_generator"
},
"Ground_Google_Search_Tool": {
"class_name": "Google_Search_Tool",
"dir_name": "google_search"
},
"Python_Code_Generator_Tool": {
"class_name": "Python_Coder_Tool",
"dir_name": "python_coder"
},
"Web_RAG_Search_Tool": {
"class_name": "Web_Search_Tool",
"dir_name": "web_search"
},
"Wikipedia_RAG_Search_Tool": {
"class_name": "Wikipedia_Search_Tool",
"dir_name": "wikipedia_search"
}
}
# Short to long mapping for fallback
TOOL_NAME_MAPPING_SHORT = {
"Base_Generator_Tool": "Generalist_Solution_Generator_Tool",
"Google_Search_Tool": "Ground_Google_Search_Tool",
"Python_Coder_Tool": "Python_Code_Generator_Tool",
"Web_Search_Tool": "Web_RAG_Search_Tool",
"Wikipedia_Search_Tool": "Wikipedia_RAG_Search_Tool"
}
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("Function execution timed out")
class Executor:
def __init__(self, llm_engine_name: str, root_cache_dir: str = "solver_cache", num_threads: int = 1, max_time: int = 120,
max_output_length: int = 100000, verbose: bool = False, base_url: str = None, check_model: bool = True, temperature: float = .0, enable_signal: bool = False):
self.llm_engine_name = llm_engine_name
self.root_cache_dir = root_cache_dir
self.num_threads = num_threads
self.max_time = max_time
self.max_output_length = max_output_length
self.verbose = verbose
self.base_url = base_url
self.check_model = check_model
self.temperature = temperature
self.enable_signal = enable_signal
if base_url is not None:
self.llm_generate_tool_command = create_llm_engine(model_string=self.llm_engine_name, is_multimodal=False, base_url=self.base_url, temperature = self.temperature)
else:
self.llm_generate_tool_command = create_llm_engine(model_string=self.llm_engine_name, is_multimodal=False, temperature = self.temperature)
def set_query_cache_dir(self, query_cache_dir):
if query_cache_dir:
self.query_cache_dir = query_cache_dir
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.query_cache_dir = os.path.join(self.root_cache_dir, timestamp)
os.makedirs(self.query_cache_dir, exist_ok=True)
def generate_tool_command(self, question: str, image: str, context: str, sub_goal: str, tool_name: str, tool_metadata: Dict[str, Any], step_count: int = 0, json_data: Any = None) -> Any:
prompt_generate_tool_command = f"""
Task: Generate a precise command to execute the selected tool.
Context:
- **Query:** {question}
- **Sub-Goal:** {sub_goal}
- **Tool Name:** {tool_name}
- **Tool Metadata:** {tool_metadata}
- **Relevant Data:** {context}
Instructions:
1. Analyze the tool's required parameters from its metadata.
2. Construct valid Python code that addresses the sub-goal using the provided context and data.
3. The command must include at least one call to `tool.execute()`.
4. Each `tool.execute()` call must be assigned to a variable named **`execution`**.
5. Please give the exact numbers and parameters should be used in the `tool.execute()` call.
Output Format:
Present your response in the following structured format. Do not include any extra text or explanations.
Generated Command:
```python
<command>
```
Example1:
Generated Command:
```python
execution = tool.execute(query="Summarize the following porblom:"Isaac has 100 toys, masa gets ...., how much are their together?")
```
Example2:
Generated Command:
```python
execution = tool.execute(query=["Methanol", "function of hyperbola", "Fermat's Last Theorem"])
```
"""
tool_command = self.llm_generate_tool_command(prompt_generate_tool_command, response_format=ToolCommand)
if json_data is not None:
json_data[f"tool_commander_{step_count}_prompt"] = prompt_generate_tool_command
json_data[f"tool_commander_{step_count}_response"] = str(tool_command)
return tool_command
def extract_explanation_and_command(self, response: Any) -> tuple:
def normalize_code(code: str) -> str:
# Remove leading/trailing whitespace and triple backticks if present
return re.sub(r'^```python\s*', '', code).rstrip('```').strip()
analysis = "No analysis found."
explanation = "No explanation found."
command = "No command found."
if isinstance(response, str):
# Attempt to parse as JSON first
try:
response_dict = json.loads(response)
response_obj = ToolCommand(**response_dict)
analysis = response_obj.analysis.strip()
explanation = response_obj.explanation.strip()
command = response_obj.command.strip()
except Exception as e:
print(f"Failed to parse response as JSON: {str(e)}")
# Fall back to regex parsing on string
try:
# Extract analysis
analysis_pattern = r"Analysis:(.*?)Command Explanation"
analysis_match = re.search(analysis_pattern, response, re.DOTALL | re.IGNORECASE)
analysis = analysis_match.group(1).strip() if analysis_match else "No analysis found."
# Extract explanation
explanation_pattern = r"Command Explanation:(.*?)Generated Command"
explanation_match = re.search(explanation_pattern, response, re.DOTALL | re.IGNORECASE)
explanation = explanation_match.group(1).strip() if explanation_match else "No explanation found."
# Extract command using "Generated Command:" prefix
command_pattern = r"Generated Command:.*?```python\n(.*?)```"
command_match = re.search(command_pattern, response, re.DOTALL | re.IGNORECASE)
if command_match:
command = command_match.group(1).strip()
else:
# Fallback: Extract ANY ```python ... ``` block (even without prefix)
loose_command_pattern = r"```python\s*\n(.*?)```"
loose_match = re.findall(loose_command_pattern, response, re.DOTALL | re.IGNORECASE)
if loose_match:
# Take the last or most complete one? Or first meaningful?
# Here we take the longest one as heuristic
command = max(loose_match, key=lambda x: len(x.strip())).strip()
else:
command = "No command found."
except Exception as e:
print(f"Error during regex parsing: {str(e)}")
analysis = "Parsing error."
explanation = "Parsing error."
command = "No command found."
elif isinstance(response, ToolCommand):
analysis = response.analysis.strip()
explanation = response.explanation.strip()
command = response.command.strip()
else:
command = "Invalid response type."
# Final normalization
command = normalize_code(command)
return analysis, explanation, command
def execute_tool_command(self, tool_name: str, command: str) -> Any: # TODO: check here
"""
Execute a tool command with timeout protection. If execution exceeds max_time seconds,
the function will be interrupted and return a timeout message.
Args:
tool_name (str): Name of the tool to execute
command (str): Command string containing tool.execute() calls
Returns:
Any: List of execution results or error message
"""
def split_commands(command: str) -> List[str]:
# Use regex to find all tool.execute() commands and their surrounding code
pattern = r'.*?execution\s*=\s*tool\.execute\([^\n]*\)\s*(?:\n|$)'
blocks = re.findall(pattern, command, re.DOTALL)
return [block.strip() for block in blocks if block.strip()]
def execute_with_timeout(block: str, local_context: dict) -> Optional[str]:
if self.enable_signal:
# Set up the timeout handler
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(self.max_time)
try:
# Execute the block in the local context
exec(block, globals(), local_context)
result = local_context.get('execution')
if self.enable_signal:
signal.alarm(0) # Disable the alarm
return result
except TimeoutError:
return f"Execution timed out after {self.max_time} seconds"
finally:
if self.enable_signal:
signal.alarm(0) # Ensure alarm is disabled even if other exceptions occur
# Import the tool module and instantiate it
# tool_name could be either short or long name
# First check if it's a long name
if tool_name in TOOL_NAME_MAPPING_LONG:
dir_name = TOOL_NAME_MAPPING_LONG[tool_name]["dir_name"]
class_name = TOOL_NAME_MAPPING_LONG[tool_name]["class_name"]
# Then check if it's a short name (convert to long, then get internal)
elif tool_name in TOOL_NAME_MAPPING_SHORT:
long_name = TOOL_NAME_MAPPING_SHORT[tool_name]
if long_name in TOOL_NAME_MAPPING_LONG:
dir_name = TOOL_NAME_MAPPING_LONG[long_name]["dir_name"]
class_name = TOOL_NAME_MAPPING_LONG[long_name]["class_name"]
else:
# Shouldn't happen, but fallback
dir_name = tool_name.lower().replace('_tool', '')
class_name = tool_name
else:
# Fallback to original behavior for unmapped tools
dir_name = tool_name.lower().replace('_tool', '')
class_name = tool_name
module_name = f"tools.{dir_name}.tool"
try:
# Dynamically import the module
module = importlib.import_module(module_name)
# Get the tool class
tool_class = getattr(module, class_name)
tool = tool_class()
# Set the custom output directory
tool.set_custom_output_dir(self.query_cache_dir)
# Split the command into blocks, execute each one and store execution results
command_blocks = split_commands(command)
executions = []
for block in command_blocks:
# Create a local context to safely execute the block
local_context = {'tool': tool}
# Execute the block with timeout protection
result = execute_with_timeout(block, local_context)
if result is not None:
executions.append(result)
else:
executions.append(f"No execution captured from block: {block}")
# Return all the execution results
return executions
except Exception as e:
return f"Error in execute_tool_command: {str(e)}"