RobotPai / src /core /use_cases /manage_agent.py
atr0p05's picture
Upload 291 files
8a682b5 verified
"""
Use case for managing AI agents with factory support.
"""
from typing import Dict, Any, Optional, List
from uuid import UUID
import logging
from src.core.entities.agent import Agent, AgentType, AgentState
from src.core.interfaces.agent_repository import AgentRepository
from src.core.interfaces.logging_service import LoggingService
from src.application.agents.agent_factory import AgentFactory
from src.shared.exceptions import DomainException, ValidationException
class ManageAgentUseCase:
"""
Use case for managing AI agents with factory support.
This use case handles agent creation, updates, deletion,
and lifecycle management operations using the agent factory.
"""
def __init__(
self,
agent_repository: AgentRepository,
agent_factory: AgentFactory,
logging_service: LoggingService
):
self.agent_repository = agent_repository
self.agent_factory = agent_factory
self.logging_service = logging_service
self.logger = logging.getLogger(__name__)
async def create_agent(
self,
agent_type: AgentType,
name: str,
description: Optional[str] = None,
configuration: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Create a new agent using the factory.
Args:
agent_type: Type of agent to create
name: Agent name
description: Optional agent description
configuration: Optional agent configuration
Returns:
Dictionary containing the created agent information
"""
try:
# Validate input
if not name or not name.strip():
raise ValidationException("Agent name cannot be empty")
# Create agent using factory
agent = await self.agent_factory.create_agent(
agent_type=agent_type,
name=name,
config=configuration or {}
)
# Create core agent entity for repository
core_agent = Agent(
agent_type=agent_type,
name=name,
description=description or f"{agent_type.value} agent",
configuration=configuration or {},
state=AgentState.READY
)
# Save to repository
saved_agent = await self.agent_repository.save(core_agent)
# Log creation
await self.logging_service.log_info(
"agent_created",
f"Created {agent_type.value} agent: {name} (ID: {agent.agent_id})",
{"agent_id": str(saved_agent.id), "agent_type": agent_type.value}
)
return {
"success": True,
"agent_id": agent.agent_id,
"name": agent.name,
"type": agent_type.value,
"status": agent.status.name,
"factory_agent": True
}
except Exception as e:
self.logger.error(f"Failed to create agent: {str(e)}")
await self.logging_service.log_error(
"agent_creation_failed",
str(e),
{"agent_type": agent_type.value, "name": name}
)
return {"success": False, "error": str(e)}
async def update_agent(
self,
agent_id: UUID,
name: Optional[str] = None,
description: Optional[str] = None,
configuration: Optional[Dict[str, Any]] = None,
state: Optional[AgentState] = None
) -> Dict[str, Any]:
"""
Update an existing agent.
Args:
agent_id: ID of the agent to update
name: New agent name
description: New agent description
configuration: New agent configuration
state: New agent state
Returns:
Dictionary containing the update result
"""
try:
# Find agent
agent = await self.agent_repository.find_by_id(agent_id)
if not agent:
raise DomainException(f"Agent {agent_id} not found")
# Update fields
if name is not None:
if not name.strip():
raise ValidationException("Agent name cannot be empty")
agent.name = name
if description is not None:
agent.description = description
if configuration is not None:
agent.configuration = configuration
if state is not None:
agent.state = state
# Save updated agent
updated_agent = await self.agent_repository.save(agent)
# Log update
await self.logging_service.log_info(
"agent_updated",
f"Updated agent {agent_id}",
{"agent_id": str(agent_id)}
)
return {
"success": True,
"agent_id": str(updated_agent.id),
"name": updated_agent.name,
"state": updated_agent.state.value
}
except Exception as e:
self.logger.error(f"Failed to update agent {agent_id}: {str(e)}")
await self.logging_service.log_error(
"agent_update_failed",
str(e),
{"agent_id": str(agent_id)}
)
return {"success": False, "error": str(e)}
async def delete_agent(self, agent_id: UUID) -> Dict[str, Any]:
"""
Delete an agent.
Args:
agent_id: ID of the agent to delete
Returns:
Dictionary containing the deletion result
"""
try:
# Check if agent exists in repository
agent = await self.agent_repository.find_by_id(agent_id)
if not agent:
raise DomainException(f"Agent {agent_id} not found")
# Check if agent exists in factory cache
factory_agent = self.agent_factory.get_agent(str(agent_id))
if factory_agent:
# Shutdown factory agent
await factory_agent.shutdown()
# Delete from repository
success = await self.agent_repository.delete(agent_id)
if not success:
raise DomainException(f"Failed to delete agent {agent_id}")
# Log deletion
await self.logging_service.log_info(
"agent_deleted",
f"Deleted agent {agent_id}",
{"agent_id": str(agent_id)}
)
return {"success": True, "agent_id": str(agent_id)}
except Exception as e:
self.logger.error(f"Failed to delete agent {agent_id}: {str(e)}")
await self.logging_service.log_error(
"agent_deletion_failed",
str(e),
{"agent_id": str(agent_id)}
)
return {"success": False, "error": str(e)}
async def get_agent(self, agent_id: UUID) -> Optional[Dict[str, Any]]:
"""
Get agent information from factory or repository.
Args:
agent_id: ID of the agent to retrieve
Returns:
Dictionary containing the agent information
"""
try:
# Try factory first
factory_agent = self.agent_factory.get_agent(str(agent_id))
if factory_agent:
return {
"success": True,
"agent": {
"id": factory_agent.agent_id,
"name": factory_agent.name,
"type": type(factory_agent).__name__,
"status": factory_agent.status.name,
"source": "factory"
}
}
# Try repository
repo_agent = await self.agent_repository.find_by_id(agent_id)
if repo_agent:
return {
"success": True,
"agent": {
"id": str(repo_agent.id),
"name": repo_agent.name,
"description": repo_agent.description,
"agent_type": repo_agent.agent_type.value,
"state": repo_agent.state.value,
"configuration": repo_agent.configuration,
"performance_metrics": getattr(repo_agent, 'performance_metrics', {}),
"created_at": repo_agent.created_at.isoformat() if repo_agent.created_at else None,
"updated_at": repo_agent.updated_at.isoformat() if repo_agent.updated_at else None,
"source": "repository"
}
}
return {"success": False, "error": f"Agent {agent_id} not found"}
except Exception as e:
self.logger.error(f"Failed to get agent {agent_id}: {str(e)}")
return {"success": False, "error": str(e)}
async def list_agents(self, agent_type: Optional[AgentType] = None) -> Dict[str, Any]:
"""
List all agents from factory cache and repository.
Args:
agent_type: Optional agent type filter
Returns:
Dictionary containing the list of agents
"""
try:
# Get from factory cache
factory_agents = self.agent_factory.list_agents()
# Get from repository
if agent_type:
repo_agents = await self.agent_repository.find_by_type(agent_type)
else:
# Get all agents from repository
repo_agents = []
# Note: We need to add a find_all method to the repository interface
# For now, we'll use the statistics to get agent info
stats = await self.agent_repository.get_statistics()
repo_agents = stats.get("agents", [])
# Merge and deduplicate
all_agents = {}
# Add factory agents
for agent_info in factory_agents:
all_agents[agent_info["id"]] = {
**agent_info,
"source": "factory"
}
# Add repository agents
for agent in repo_agents:
agent_id = str(agent.id) if hasattr(agent, 'id') else agent.get('id', 'unknown')
if agent_id not in all_agents:
all_agents[agent_id] = {
"id": agent_id,
"name": agent.name if hasattr(agent, 'name') else agent.get('name', 'Unknown'),
"type": agent.agent_type.value if hasattr(agent, 'agent_type') else agent.get('type', 'unknown'),
"status": agent.state.value if hasattr(agent, 'state') else agent.get('status', 'unknown'),
"source": "repository"
}
return {
"success": True,
"agents": list(all_agents.values()),
"count": len(all_agents)
}
except Exception as e:
self.logger.error(f"Failed to list agents: {str(e)}")
return {"success": False, "error": str(e)}
async def get_agent_statistics(self) -> Dict[str, Any]:
"""
Get agent repository statistics.
Returns:
Dictionary containing agent statistics
"""
try:
stats = await self.agent_repository.get_statistics()
# Add factory statistics
factory_agents = self.agent_factory.list_agents()
stats["factory_agents"] = len(factory_agents)
stats["factory_agent_types"] = {}
for agent_info in factory_agents:
agent_type = agent_info.get("type", "unknown")
stats["factory_agent_types"][agent_type] = stats["factory_agent_types"].get(agent_type, 0) + 1
return {"success": True, "statistics": stats}
except Exception as e:
self.logger.error(f"Failed to get agent statistics: {str(e)}")
return {"success": False, "error": str(e)}
async def shutdown_all_agents(self) -> Dict[str, Any]:
"""
Shutdown all agents in the factory cache.
Returns:
Dictionary containing shutdown results
"""
try:
await self.agent_factory.shutdown_all()
await self.logging_service.log_info(
"agents_shutdown",
"All factory agents shut down successfully"
)
return {
"success": True,
"message": "All agents shut down successfully"
}
except Exception as e:
self.logger.error(f"Failed to shutdown all agents: {str(e)}")
await self.logging_service.log_error(
"agents_shutdown_failed",
str(e)
)
return {"success": False, "error": str(e)}