Spaces:
Build error
Build error
| """ | |
| Agent executor implementation for executing AI agents. | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Dict, Any, Optional | |
| from uuid import UUID, uuid4 | |
| from datetime import datetime | |
| from src.core.interfaces.agent_executor import AgentExecutor | |
| from src.core.entities.agent import Agent, AgentType | |
| from src.core.entities.message import Message | |
| from src.shared.exceptions import DomainException | |
| class AgentExecutorImpl(AgentExecutor): | |
| """ | |
| Implementation of the agent executor interface. | |
| This class handles the execution of different types of agents | |
| and manages their lifecycle during processing. | |
| """ | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| self._active_executions: Dict[UUID, Dict[str, Any]] = {} | |
| async def execute(self, agent: Agent, message: Message) -> Dict[str, Any]: | |
| """ | |
| Execute an agent with a given message. | |
| Args: | |
| agent: The agent to execute | |
| message: The message to process | |
| Returns: | |
| Dictionary containing the execution result | |
| """ | |
| execution_id = uuid4() | |
| start_time = datetime.now() | |
| try: | |
| # Register execution | |
| self._active_executions[execution_id] = { | |
| "agent_id": agent.id, | |
| "message_id": message.id, | |
| "start_time": start_time, | |
| "status": "running" | |
| } | |
| self.logger.info(f"Starting execution {execution_id} for agent {agent.id}") | |
| # Validate agent | |
| validation_result = await self.validate_agent(agent) | |
| if not validation_result.get("valid", False): | |
| raise DomainException(f"Agent validation failed: {validation_result.get('errors', [])}") | |
| # Execute based on agent type | |
| if agent.agent_type == AgentType.FSM_REACT: | |
| result = await self._execute_fsm_react_agent(agent, message) | |
| elif agent.agent_type == AgentType.NEXT_GEN: | |
| result = await self._execute_next_gen_agent(agent, message) | |
| elif agent.agent_type == AgentType.CREW: | |
| result = await self._execute_crew_agent(agent, message) | |
| elif agent.agent_type == AgentType.SPECIALIZED: | |
| result = await self._execute_specialized_agent(agent, message) | |
| else: | |
| raise DomainException(f"Unsupported agent type: {agent.agent_type}") | |
| # Update execution status | |
| execution_time = (datetime.now() - start_time).total_seconds() | |
| self._active_executions[execution_id]["status"] = "completed" | |
| self._active_executions[execution_id]["execution_time"] = execution_time | |
| # Add execution metadata | |
| result["execution_id"] = str(execution_id) | |
| result["execution_time"] = execution_time | |
| self.logger.info(f"Execution {execution_id} completed successfully in {execution_time:.2f}s") | |
| return result | |
| except Exception as e: | |
| execution_time = (datetime.now() - start_time).total_seconds() | |
| self._active_executions[execution_id]["status"] = "failed" | |
| self._active_executions[execution_id]["error"] = str(e) | |
| self._active_executions[execution_id]["execution_time"] = execution_time | |
| self.logger.error(f"Execution {execution_id} failed: {str(e)}") | |
| raise DomainException(f"Agent execution failed: {str(e)}") | |
| async def validate_agent(self, agent: Agent) -> Dict[str, Any]: | |
| """ | |
| Validate an agent before execution. | |
| Args: | |
| agent: The agent to validate | |
| Returns: | |
| Dictionary containing validation result | |
| """ | |
| errors = [] | |
| warnings = [] | |
| # Check if agent is available | |
| if not agent.is_available: | |
| errors.append("Agent is not available for execution") | |
| # Check agent configuration | |
| if not agent.config: | |
| warnings.append("Agent has no configuration") | |
| # Check model configuration | |
| if not agent.model_config: | |
| warnings.append("Agent has no model configuration") | |
| # Validate agent type | |
| if agent.agent_type not in AgentType: | |
| errors.append(f"Invalid agent type: {agent.agent_type}") | |
| return { | |
| "valid": len(errors) == 0, | |
| "errors": errors, | |
| "warnings": warnings | |
| } | |
| async def get_agent_capabilities(self, agent: Agent) -> Dict[str, Any]: | |
| """ | |
| Get agent capabilities and supported operations. | |
| Args: | |
| agent: The agent to query | |
| Returns: | |
| Dictionary containing agent capabilities | |
| """ | |
| capabilities = { | |
| "agent_type": agent.agent_type.value, | |
| "supported_operations": [], | |
| "tools_available": [], | |
| "model_info": {} | |
| } | |
| # Add capabilities based on agent type | |
| if agent.agent_type == AgentType.FSM_REACT: | |
| capabilities["supported_operations"] = [ | |
| "text_processing", | |
| "tool_execution", | |
| "state_management", | |
| "reasoning" | |
| ] | |
| elif agent.agent_type == AgentType.NEXT_GEN: | |
| capabilities["supported_operations"] = [ | |
| "advanced_reasoning", | |
| "parallel_processing", | |
| "multi_modal_processing", | |
| "learning" | |
| ] | |
| elif agent.agent_type == AgentType.CREW: | |
| capabilities["supported_operations"] = [ | |
| "multi_agent_coordination", | |
| "task_delegation", | |
| "collaborative_reasoning", | |
| "workflow_management" | |
| ] | |
| elif agent.agent_type == AgentType.SPECIALIZED: | |
| capabilities["supported_operations"] = [ | |
| "domain_specific_processing", | |
| "expert_knowledge", | |
| "specialized_tools" | |
| ] | |
| return capabilities | |
| async def cancel_execution(self, execution_id: UUID) -> bool: | |
| """ | |
| Cancel a running execution. | |
| Args: | |
| execution_id: The execution to cancel | |
| Returns: | |
| True if cancellation was successful, False otherwise | |
| """ | |
| if execution_id not in self._active_executions: | |
| return False | |
| execution = self._active_executions[execution_id] | |
| if execution["status"] == "running": | |
| execution["status"] = "cancelled" | |
| execution["end_time"] = datetime.now() | |
| self.logger.info(f"Execution {execution_id} cancelled") | |
| return True | |
| return False | |
| async def get_execution_status(self, execution_id: UUID) -> Dict[str, Any]: | |
| """ | |
| Get the status of an execution. | |
| Args: | |
| execution_id: The execution to query | |
| Returns: | |
| Dictionary containing execution status | |
| """ | |
| if execution_id not in self._active_executions: | |
| return {"error": "Execution not found"} | |
| execution = self._active_executions[execution_id] | |
| status = { | |
| "execution_id": str(execution_id), | |
| "status": execution["status"], | |
| "agent_id": str(execution["agent_id"]), | |
| "message_id": str(execution["message_id"]), | |
| "start_time": execution["start_time"].isoformat() | |
| } | |
| if "execution_time" in execution: | |
| status["execution_time"] = execution["execution_time"] | |
| if "error" in execution: | |
| status["error"] = execution["error"] | |
| return status | |
| async def _execute_fsm_react_agent(self, agent: Agent, message: Message) -> Dict[str, Any]: | |
| """Execute an FSM React agent.""" | |
| # This would integrate with the existing FSM agent implementation | |
| # For now, return a mock response | |
| await asyncio.sleep(0.1) # Simulate processing time | |
| return { | |
| "response": f"FSM React agent processed: {message.content}", | |
| "confidence": 0.85, | |
| "tools_used": ["text_processor", "reasoning_engine"], | |
| "metadata": { | |
| "agent_type": "fsm_react", | |
| "processing_steps": 3 | |
| } | |
| } | |
| async def _execute_next_gen_agent(self, agent: Agent, message: Message) -> Dict[str, Any]: | |
| """Execute a Next Gen agent.""" | |
| await asyncio.sleep(0.2) # Simulate processing time | |
| return { | |
| "response": f"Next Gen agent processed: {message.content}", | |
| "confidence": 0.92, | |
| "tools_used": ["advanced_reasoning", "parallel_processor"], | |
| "metadata": { | |
| "agent_type": "next_gen", | |
| "processing_steps": 5 | |
| } | |
| } | |
| async def _execute_crew_agent(self, agent: Agent, message: Message) -> Dict[str, Any]: | |
| """Execute a Crew agent.""" | |
| await asyncio.sleep(0.3) # Simulate processing time | |
| return { | |
| "response": f"Crew agent processed: {message.content}", | |
| "confidence": 0.88, | |
| "tools_used": ["coordinator", "researcher", "executor"], | |
| "metadata": { | |
| "agent_type": "crew", | |
| "crew_size": 3, | |
| "processing_steps": 7 | |
| } | |
| } | |
| async def _execute_specialized_agent(self, agent: Agent, message: Message) -> Dict[str, Any]: | |
| """Execute a Specialized agent.""" | |
| await asyncio.sleep(0.15) # Simulate processing time | |
| return { | |
| "response": f"Specialized agent processed: {message.content}", | |
| "confidence": 0.95, | |
| "tools_used": ["domain_expert", "specialized_tool"], | |
| "metadata": { | |
| "agent_type": "specialized", | |
| "domain": "expert", | |
| "processing_steps": 2 | |
| } | |
| } | |
| # Alias for backward compatibility | |
| AgentExecutor = AgentExecutorImpl |