Spaces:
Build error
Build error
| """ | |
| Tool executor implementation for executing tools. | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Dict, Any, Optional, List | |
| from uuid import UUID, uuid4 | |
| from datetime import datetime | |
| from src.core.interfaces.tool_executor import ToolExecutor | |
| from src.core.entities.tool import Tool, ToolType | |
| from src.shared.exceptions import DomainException | |
| class ToolExecutorImpl(ToolExecutor): | |
| """ | |
| Implementation of the tool executor interface. | |
| This class handles the execution of different types of tools | |
| and manages their lifecycle during processing. | |
| """ | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| self._active_executions: Dict[UUID, Dict[str, Any]] = {} | |
| async def execute(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """ | |
| Execute a tool with given parameters. | |
| Args: | |
| tool: The tool to execute | |
| parameters: Tool execution parameters | |
| context: Optional execution context | |
| Returns: | |
| Dictionary containing the execution result | |
| """ | |
| execution_id = uuid4() | |
| start_time = datetime.now() | |
| try: | |
| # Register execution | |
| self._active_executions[execution_id] = { | |
| "tool_id": tool.id, | |
| "parameters": parameters, | |
| "start_time": start_time, | |
| "status": "running" | |
| } | |
| self.logger.info(f"Starting tool execution {execution_id} for tool {tool.name}") | |
| # Validate parameters | |
| validation_result = await self.validate_parameters(tool, parameters) | |
| if not validation_result.get("valid", False): | |
| raise DomainException(f"Parameter validation failed: {validation_result.get('errors', [])}") | |
| # Execute based on tool type | |
| if tool.tool_type == ToolType.SEARCH: | |
| result = await self._execute_search_tool(tool, parameters, context) | |
| elif tool.tool_type == ToolType.CALCULATOR: | |
| result = await self._execute_calculator_tool(tool, parameters, context) | |
| elif tool.tool_type == ToolType.FILE_PROCESSOR: | |
| result = await self._execute_file_processor_tool(tool, parameters, context) | |
| elif tool.tool_type == ToolType.API_CALLER: | |
| result = await self._execute_api_caller_tool(tool, parameters, context) | |
| elif tool.tool_type == ToolType.CUSTOM: | |
| result = await self._execute_custom_tool(tool, parameters, context) | |
| else: | |
| raise DomainException(f"Unsupported tool type: {tool.tool_type}") | |
| # Update execution status | |
| execution_time = (datetime.now() - start_time).total_seconds() | |
| self._active_executions[execution_id]["status"] = "completed" | |
| self._active_executions[execution_id]["execution_time"] = execution_time | |
| # Add execution metadata | |
| result["execution_id"] = str(execution_id) | |
| result["execution_time"] = execution_time | |
| self.logger.info(f"Tool execution {execution_id} completed successfully in {execution_time:.2f}s") | |
| return result | |
| except Exception as e: | |
| execution_time = (datetime.now() - start_time).total_seconds() | |
| self._active_executions[execution_id]["status"] = "failed" | |
| self._active_executions[execution_id]["error"] = str(e) | |
| self._active_executions[execution_id]["execution_time"] = execution_time | |
| self.logger.error(f"Tool execution {execution_id} failed: {str(e)}") | |
| raise DomainException(f"Tool execution failed: {str(e)}") | |
| async def validate_parameters(self, tool: Tool, parameters: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Validate tool parameters before execution. | |
| Args: | |
| tool: The tool to validate parameters for | |
| parameters: Parameters to validate | |
| Returns: | |
| Dictionary containing validation result | |
| """ | |
| errors = [] | |
| warnings = [] | |
| # Check if tool is available | |
| if not tool.is_available: | |
| errors.append("Tool is not available for execution") | |
| # Check required parameters | |
| if hasattr(tool, 'parameters_schema') and tool.parameters_schema: | |
| required_params = tool.parameters_schema.get("required", []) | |
| for param in required_params: | |
| if param not in parameters: | |
| errors.append(f"Required parameter '{param}' is missing") | |
| # Check parameter types (basic validation) | |
| if hasattr(tool, 'parameters_schema') and tool.parameters_schema: | |
| param_types = tool.parameters_schema.get("properties", {}) | |
| for param_name, param_value in parameters.items(): | |
| if param_name in param_types: | |
| expected_type = param_types[param_name].get("type") | |
| if expected_type and not self._validate_type(param_value, expected_type): | |
| warnings.append(f"Parameter '{param_name}' may have incorrect type") | |
| return { | |
| "valid": len(errors) == 0, | |
| "errors": errors, | |
| "warnings": warnings | |
| } | |
| async def get_tool_info(self, tool: Tool) -> Dict[str, Any]: | |
| """ | |
| Get tool information and capabilities. | |
| Args: | |
| tool: The tool to query | |
| Returns: | |
| Dictionary containing tool information | |
| """ | |
| info = { | |
| "id": str(tool.id), | |
| "name": tool.name, | |
| "description": tool.description, | |
| "tool_type": tool.tool_type.value, | |
| "is_available": tool.is_available, | |
| "parameters_schema": getattr(tool, 'parameters_schema', {}), | |
| "capabilities": [] | |
| } | |
| # Add capabilities based on tool type | |
| if tool.tool_type == ToolType.SEARCH: | |
| info["capabilities"] = ["web_search", "information_retrieval", "query_processing"] | |
| elif tool.tool_type == ToolType.CALCULATOR: | |
| info["capabilities"] = ["mathematical_computation", "formula_evaluation", "unit_conversion"] | |
| elif tool.tool_type == ToolType.FILE_PROCESSOR: | |
| info["capabilities"] = ["file_reading", "file_writing", "format_conversion", "data_extraction"] | |
| elif tool.tool_type == ToolType.API_CALLER: | |
| info["capabilities"] = ["api_integration", "data_fetching", "service_interaction"] | |
| elif tool.tool_type == ToolType.CUSTOM: | |
| info["capabilities"] = ["custom_processing", "domain_specific_operations"] | |
| return info | |
| async def execute_batch(self, tools: List[Tool], parameters_list: List[Dict[str, Any]], context: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Execute multiple tools in batch. | |
| Args: | |
| tools: List of tools to execute | |
| parameters_list: List of parameter dictionaries for each tool | |
| context: Optional execution context | |
| Returns: | |
| List of execution results | |
| """ | |
| if len(tools) != len(parameters_list): | |
| raise DomainException("Number of tools must match number of parameter sets") | |
| # Execute tools in parallel | |
| tasks = [] | |
| for tool, parameters in zip(tools, parameters_list): | |
| task = self.execute(tool, parameters, context) | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Process results | |
| processed_results = [] | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| processed_results.append({ | |
| "success": False, | |
| "error": str(result), | |
| "tool_id": str(tools[i].id), | |
| "tool_name": tools[i].name | |
| }) | |
| else: | |
| processed_results.append(result) | |
| return processed_results | |
| async def cancel_execution(self, execution_id: UUID) -> bool: | |
| """ | |
| Cancel a running tool execution. | |
| Args: | |
| execution_id: The execution to cancel | |
| Returns: | |
| True if cancellation was successful, False otherwise | |
| """ | |
| if execution_id not in self._active_executions: | |
| return False | |
| execution = self._active_executions[execution_id] | |
| if execution["status"] == "running": | |
| execution["status"] = "cancelled" | |
| execution["end_time"] = datetime.now() | |
| self.logger.info(f"Tool execution {execution_id} cancelled") | |
| return True | |
| return False | |
| def _validate_type(self, value: Any, expected_type: str) -> bool: | |
| """Basic type validation.""" | |
| if expected_type == "string": | |
| return isinstance(value, str) | |
| elif expected_type == "number": | |
| return isinstance(value, (int, float)) | |
| elif expected_type == "boolean": | |
| return isinstance(value, bool) | |
| elif expected_type == "array": | |
| return isinstance(value, list) | |
| elif expected_type == "object": | |
| return isinstance(value, dict) | |
| return True # Unknown type, assume valid | |
| async def _execute_search_tool(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Execute a search tool.""" | |
| await asyncio.sleep(0.1) # Simulate processing time | |
| query = parameters.get("query", "") | |
| return { | |
| "success": True, | |
| "result": f"Search results for: {query}", | |
| "metadata": { | |
| "tool_type": "search", | |
| "query": query, | |
| "results_count": 10 | |
| } | |
| } | |
| async def _execute_calculator_tool(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Execute a calculator tool.""" | |
| await asyncio.sleep(0.05) # Simulate processing time | |
| expression = parameters.get("expression", "") | |
| # Basic evaluation (in production, use safe_eval or similar) | |
| try: | |
| result = eval(expression) # Note: This is unsafe, use safe_eval in production | |
| return { | |
| "success": True, | |
| "result": result, | |
| "metadata": { | |
| "tool_type": "calculator", | |
| "expression": expression | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Calculation failed: {str(e)}", | |
| "metadata": { | |
| "tool_type": "calculator", | |
| "expression": expression | |
| } | |
| } | |
| async def _execute_file_processor_tool(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Execute a file processor tool.""" | |
| await asyncio.sleep(0.2) # Simulate processing time | |
| file_path = parameters.get("file_path", "") | |
| operation = parameters.get("operation", "read") | |
| return { | |
| "success": True, | |
| "result": f"File {operation} completed for: {file_path}", | |
| "metadata": { | |
| "tool_type": "file_processor", | |
| "file_path": file_path, | |
| "operation": operation | |
| } | |
| } | |
| async def _execute_api_caller_tool(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Execute an API caller tool.""" | |
| await asyncio.sleep(0.3) # Simulate processing time | |
| url = parameters.get("url", "") | |
| method = parameters.get("method", "GET") | |
| return { | |
| "success": True, | |
| "result": f"API call {method} to {url} completed", | |
| "metadata": { | |
| "tool_type": "api_caller", | |
| "url": url, | |
| "method": method, | |
| "status_code": 200 | |
| } | |
| } | |
| async def _execute_custom_tool(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Execute a custom tool.""" | |
| await asyncio.sleep(0.15) # Simulate processing time | |
| return { | |
| "success": True, | |
| "result": f"Custom tool '{tool.name}' executed with parameters: {parameters}", | |
| "metadata": { | |
| "tool_type": "custom", | |
| "tool_name": tool.name, | |
| "parameters": parameters | |
| } | |
| } | |
| # Alias for backward compatibility | |
| ToolExecutor = ToolExecutorImpl |