RobotPai / src /application /agents /agent_executor.py
atr0p05's picture
Upload 291 files
8a682b5 verified
"""
Agent executor implementation for executing AI agents.
"""
import asyncio
import logging
from typing import Dict, Any, Optional
from uuid import UUID, uuid4
from datetime import datetime
from src.core.interfaces.agent_executor import AgentExecutor
from src.core.entities.agent import Agent, AgentType
from src.core.entities.message import Message
from src.shared.exceptions import DomainException
class AgentExecutorImpl(AgentExecutor):
"""
Implementation of the agent executor interface.
This class handles the execution of different types of agents
and manages their lifecycle during processing.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._active_executions: Dict[UUID, Dict[str, Any]] = {}
async def execute(self, agent: Agent, message: Message) -> Dict[str, Any]:
"""
Execute an agent with a given message.
Args:
agent: The agent to execute
message: The message to process
Returns:
Dictionary containing the execution result
"""
execution_id = uuid4()
start_time = datetime.now()
try:
# Register execution
self._active_executions[execution_id] = {
"agent_id": agent.id,
"message_id": message.id,
"start_time": start_time,
"status": "running"
}
self.logger.info(f"Starting execution {execution_id} for agent {agent.id}")
# Validate agent
validation_result = await self.validate_agent(agent)
if not validation_result.get("valid", False):
raise DomainException(f"Agent validation failed: {validation_result.get('errors', [])}")
# Execute based on agent type
if agent.agent_type == AgentType.FSM_REACT:
result = await self._execute_fsm_react_agent(agent, message)
elif agent.agent_type == AgentType.NEXT_GEN:
result = await self._execute_next_gen_agent(agent, message)
elif agent.agent_type == AgentType.CREW:
result = await self._execute_crew_agent(agent, message)
elif agent.agent_type == AgentType.SPECIALIZED:
result = await self._execute_specialized_agent(agent, message)
else:
raise DomainException(f"Unsupported agent type: {agent.agent_type}")
# Update execution status
execution_time = (datetime.now() - start_time).total_seconds()
self._active_executions[execution_id]["status"] = "completed"
self._active_executions[execution_id]["execution_time"] = execution_time
# Add execution metadata
result["execution_id"] = str(execution_id)
result["execution_time"] = execution_time
self.logger.info(f"Execution {execution_id} completed successfully in {execution_time:.2f}s")
return result
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
self._active_executions[execution_id]["status"] = "failed"
self._active_executions[execution_id]["error"] = str(e)
self._active_executions[execution_id]["execution_time"] = execution_time
self.logger.error(f"Execution {execution_id} failed: {str(e)}")
raise DomainException(f"Agent execution failed: {str(e)}")
async def validate_agent(self, agent: Agent) -> Dict[str, Any]:
"""
Validate an agent before execution.
Args:
agent: The agent to validate
Returns:
Dictionary containing validation result
"""
errors = []
warnings = []
# Check if agent is available
if not agent.is_available:
errors.append("Agent is not available for execution")
# Check agent configuration
if not agent.config:
warnings.append("Agent has no configuration")
# Check model configuration
if not agent.model_config:
warnings.append("Agent has no model configuration")
# Validate agent type
if agent.agent_type not in AgentType:
errors.append(f"Invalid agent type: {agent.agent_type}")
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings
}
async def get_agent_capabilities(self, agent: Agent) -> Dict[str, Any]:
"""
Get agent capabilities and supported operations.
Args:
agent: The agent to query
Returns:
Dictionary containing agent capabilities
"""
capabilities = {
"agent_type": agent.agent_type.value,
"supported_operations": [],
"tools_available": [],
"model_info": {}
}
# Add capabilities based on agent type
if agent.agent_type == AgentType.FSM_REACT:
capabilities["supported_operations"] = [
"text_processing",
"tool_execution",
"state_management",
"reasoning"
]
elif agent.agent_type == AgentType.NEXT_GEN:
capabilities["supported_operations"] = [
"advanced_reasoning",
"parallel_processing",
"multi_modal_processing",
"learning"
]
elif agent.agent_type == AgentType.CREW:
capabilities["supported_operations"] = [
"multi_agent_coordination",
"task_delegation",
"collaborative_reasoning",
"workflow_management"
]
elif agent.agent_type == AgentType.SPECIALIZED:
capabilities["supported_operations"] = [
"domain_specific_processing",
"expert_knowledge",
"specialized_tools"
]
return capabilities
async def cancel_execution(self, execution_id: UUID) -> bool:
"""
Cancel a running execution.
Args:
execution_id: The execution to cancel
Returns:
True if cancellation was successful, False otherwise
"""
if execution_id not in self._active_executions:
return False
execution = self._active_executions[execution_id]
if execution["status"] == "running":
execution["status"] = "cancelled"
execution["end_time"] = datetime.now()
self.logger.info(f"Execution {execution_id} cancelled")
return True
return False
async def get_execution_status(self, execution_id: UUID) -> Dict[str, Any]:
"""
Get the status of an execution.
Args:
execution_id: The execution to query
Returns:
Dictionary containing execution status
"""
if execution_id not in self._active_executions:
return {"error": "Execution not found"}
execution = self._active_executions[execution_id]
status = {
"execution_id": str(execution_id),
"status": execution["status"],
"agent_id": str(execution["agent_id"]),
"message_id": str(execution["message_id"]),
"start_time": execution["start_time"].isoformat()
}
if "execution_time" in execution:
status["execution_time"] = execution["execution_time"]
if "error" in execution:
status["error"] = execution["error"]
return status
async def _execute_fsm_react_agent(self, agent: Agent, message: Message) -> Dict[str, Any]:
"""Execute an FSM React agent."""
# This would integrate with the existing FSM agent implementation
# For now, return a mock response
await asyncio.sleep(0.1) # Simulate processing time
return {
"response": f"FSM React agent processed: {message.content}",
"confidence": 0.85,
"tools_used": ["text_processor", "reasoning_engine"],
"metadata": {
"agent_type": "fsm_react",
"processing_steps": 3
}
}
async def _execute_next_gen_agent(self, agent: Agent, message: Message) -> Dict[str, Any]:
"""Execute a Next Gen agent."""
await asyncio.sleep(0.2) # Simulate processing time
return {
"response": f"Next Gen agent processed: {message.content}",
"confidence": 0.92,
"tools_used": ["advanced_reasoning", "parallel_processor"],
"metadata": {
"agent_type": "next_gen",
"processing_steps": 5
}
}
async def _execute_crew_agent(self, agent: Agent, message: Message) -> Dict[str, Any]:
"""Execute a Crew agent."""
await asyncio.sleep(0.3) # Simulate processing time
return {
"response": f"Crew agent processed: {message.content}",
"confidence": 0.88,
"tools_used": ["coordinator", "researcher", "executor"],
"metadata": {
"agent_type": "crew",
"crew_size": 3,
"processing_steps": 7
}
}
async def _execute_specialized_agent(self, agent: Agent, message: Message) -> Dict[str, Any]:
"""Execute a Specialized agent."""
await asyncio.sleep(0.15) # Simulate processing time
return {
"response": f"Specialized agent processed: {message.content}",
"confidence": 0.95,
"tools_used": ["domain_expert", "specialized_tool"],
"metadata": {
"agent_type": "specialized",
"domain": "expert",
"processing_steps": 2
}
}
# Alias for backward compatibility
AgentExecutor = AgentExecutorImpl