Spaces:
Build error
Build error
File size: 13,223 Bytes
8a682b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
"""
Tool executor implementation for executing tools.
"""
import asyncio
import logging
from typing import Dict, Any, Optional, List
from uuid import UUID, uuid4
from datetime import datetime
from src.core.interfaces.tool_executor import ToolExecutor
from src.core.entities.tool import Tool, ToolType
from src.shared.exceptions import DomainException
class ToolExecutorImpl(ToolExecutor):
"""
Implementation of the tool executor interface.
This class handles the execution of different types of tools
and manages their lifecycle during processing.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._active_executions: Dict[UUID, Dict[str, Any]] = {}
async def execute(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Execute a tool with given parameters.
Args:
tool: The tool to execute
parameters: Tool execution parameters
context: Optional execution context
Returns:
Dictionary containing the execution result
"""
execution_id = uuid4()
start_time = datetime.now()
try:
# Register execution
self._active_executions[execution_id] = {
"tool_id": tool.id,
"parameters": parameters,
"start_time": start_time,
"status": "running"
}
self.logger.info(f"Starting tool execution {execution_id} for tool {tool.name}")
# Validate parameters
validation_result = await self.validate_parameters(tool, parameters)
if not validation_result.get("valid", False):
raise DomainException(f"Parameter validation failed: {validation_result.get('errors', [])}")
# Execute based on tool type
if tool.tool_type == ToolType.SEARCH:
result = await self._execute_search_tool(tool, parameters, context)
elif tool.tool_type == ToolType.CALCULATOR:
result = await self._execute_calculator_tool(tool, parameters, context)
elif tool.tool_type == ToolType.FILE_PROCESSOR:
result = await self._execute_file_processor_tool(tool, parameters, context)
elif tool.tool_type == ToolType.API_CALLER:
result = await self._execute_api_caller_tool(tool, parameters, context)
elif tool.tool_type == ToolType.CUSTOM:
result = await self._execute_custom_tool(tool, parameters, context)
else:
raise DomainException(f"Unsupported tool type: {tool.tool_type}")
# Update execution status
execution_time = (datetime.now() - start_time).total_seconds()
self._active_executions[execution_id]["status"] = "completed"
self._active_executions[execution_id]["execution_time"] = execution_time
# Add execution metadata
result["execution_id"] = str(execution_id)
result["execution_time"] = execution_time
self.logger.info(f"Tool execution {execution_id} completed successfully in {execution_time:.2f}s")
return result
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
self._active_executions[execution_id]["status"] = "failed"
self._active_executions[execution_id]["error"] = str(e)
self._active_executions[execution_id]["execution_time"] = execution_time
self.logger.error(f"Tool execution {execution_id} failed: {str(e)}")
raise DomainException(f"Tool execution failed: {str(e)}")
async def validate_parameters(self, tool: Tool, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate tool parameters before execution.
Args:
tool: The tool to validate parameters for
parameters: Parameters to validate
Returns:
Dictionary containing validation result
"""
errors = []
warnings = []
# Check if tool is available
if not tool.is_available:
errors.append("Tool is not available for execution")
# Check required parameters
if hasattr(tool, 'parameters_schema') and tool.parameters_schema:
required_params = tool.parameters_schema.get("required", [])
for param in required_params:
if param not in parameters:
errors.append(f"Required parameter '{param}' is missing")
# Check parameter types (basic validation)
if hasattr(tool, 'parameters_schema') and tool.parameters_schema:
param_types = tool.parameters_schema.get("properties", {})
for param_name, param_value in parameters.items():
if param_name in param_types:
expected_type = param_types[param_name].get("type")
if expected_type and not self._validate_type(param_value, expected_type):
warnings.append(f"Parameter '{param_name}' may have incorrect type")
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings
}
async def get_tool_info(self, tool: Tool) -> Dict[str, Any]:
"""
Get tool information and capabilities.
Args:
tool: The tool to query
Returns:
Dictionary containing tool information
"""
info = {
"id": str(tool.id),
"name": tool.name,
"description": tool.description,
"tool_type": tool.tool_type.value,
"is_available": tool.is_available,
"parameters_schema": getattr(tool, 'parameters_schema', {}),
"capabilities": []
}
# Add capabilities based on tool type
if tool.tool_type == ToolType.SEARCH:
info["capabilities"] = ["web_search", "information_retrieval", "query_processing"]
elif tool.tool_type == ToolType.CALCULATOR:
info["capabilities"] = ["mathematical_computation", "formula_evaluation", "unit_conversion"]
elif tool.tool_type == ToolType.FILE_PROCESSOR:
info["capabilities"] = ["file_reading", "file_writing", "format_conversion", "data_extraction"]
elif tool.tool_type == ToolType.API_CALLER:
info["capabilities"] = ["api_integration", "data_fetching", "service_interaction"]
elif tool.tool_type == ToolType.CUSTOM:
info["capabilities"] = ["custom_processing", "domain_specific_operations"]
return info
async def execute_batch(self, tools: List[Tool], parameters_list: List[Dict[str, Any]], context: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""
Execute multiple tools in batch.
Args:
tools: List of tools to execute
parameters_list: List of parameter dictionaries for each tool
context: Optional execution context
Returns:
List of execution results
"""
if len(tools) != len(parameters_list):
raise DomainException("Number of tools must match number of parameter sets")
# Execute tools in parallel
tasks = []
for tool, parameters in zip(tools, parameters_list):
task = self.execute(tool, parameters, context)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"success": False,
"error": str(result),
"tool_id": str(tools[i].id),
"tool_name": tools[i].name
})
else:
processed_results.append(result)
return processed_results
async def cancel_execution(self, execution_id: UUID) -> bool:
"""
Cancel a running tool execution.
Args:
execution_id: The execution to cancel
Returns:
True if cancellation was successful, False otherwise
"""
if execution_id not in self._active_executions:
return False
execution = self._active_executions[execution_id]
if execution["status"] == "running":
execution["status"] = "cancelled"
execution["end_time"] = datetime.now()
self.logger.info(f"Tool execution {execution_id} cancelled")
return True
return False
def _validate_type(self, value: Any, expected_type: str) -> bool:
"""Basic type validation."""
if expected_type == "string":
return isinstance(value, str)
elif expected_type == "number":
return isinstance(value, (int, float))
elif expected_type == "boolean":
return isinstance(value, bool)
elif expected_type == "array":
return isinstance(value, list)
elif expected_type == "object":
return isinstance(value, dict)
return True # Unknown type, assume valid
async def _execute_search_tool(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute a search tool."""
await asyncio.sleep(0.1) # Simulate processing time
query = parameters.get("query", "")
return {
"success": True,
"result": f"Search results for: {query}",
"metadata": {
"tool_type": "search",
"query": query,
"results_count": 10
}
}
async def _execute_calculator_tool(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute a calculator tool."""
await asyncio.sleep(0.05) # Simulate processing time
expression = parameters.get("expression", "")
# Basic evaluation (in production, use safe_eval or similar)
try:
result = eval(expression) # Note: This is unsafe, use safe_eval in production
return {
"success": True,
"result": result,
"metadata": {
"tool_type": "calculator",
"expression": expression
}
}
except Exception as e:
return {
"success": False,
"error": f"Calculation failed: {str(e)}",
"metadata": {
"tool_type": "calculator",
"expression": expression
}
}
async def _execute_file_processor_tool(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute a file processor tool."""
await asyncio.sleep(0.2) # Simulate processing time
file_path = parameters.get("file_path", "")
operation = parameters.get("operation", "read")
return {
"success": True,
"result": f"File {operation} completed for: {file_path}",
"metadata": {
"tool_type": "file_processor",
"file_path": file_path,
"operation": operation
}
}
async def _execute_api_caller_tool(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute an API caller tool."""
await asyncio.sleep(0.3) # Simulate processing time
url = parameters.get("url", "")
method = parameters.get("method", "GET")
return {
"success": True,
"result": f"API call {method} to {url} completed",
"metadata": {
"tool_type": "api_caller",
"url": url,
"method": method,
"status_code": 200
}
}
async def _execute_custom_tool(self, tool: Tool, parameters: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute a custom tool."""
await asyncio.sleep(0.15) # Simulate processing time
return {
"success": True,
"result": f"Custom tool '{tool.name}' executed with parameters: {parameters}",
"metadata": {
"tool_type": "custom",
"tool_name": tool.name,
"parameters": parameters
}
}
# Alias for backward compatibility
ToolExecutor = ToolExecutorImpl |