Spaces:
Build error
Build error
File size: 10,669 Bytes
8a682b5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 | """
Agent factory for creating different types of agents.
"""
import asyncio
import logging
from typing import Dict, Any, List, Optional
from uuid import uuid4
from src.core.entities.agent import AgentType
from src.infrastructure.agents.concrete_agents import (
FSMReactAgentImpl, NextGenAgentImpl, CrewAgentImpl, SpecializedAgentImpl
)
from src.application.agents.base_agent import BaseAgent
from src.utils.base_tool import get_enhanced_tools
logger = logging.getLogger(__name__)
class AgentFactory:
"""Factory for creating different types of agents"""
def __init__(self):
self.created_agents: Dict[str, BaseAgent] = {}
self.logger = logging.getLogger(__name__)
async def create_agent(
self,
agent_type: AgentType,
name: str,
config: Optional[Dict[str, Any]] = None
) -> BaseAgent:
"""
Create an agent of the specified type.
Args:
agent_type: Type of agent to create
name: Name for the agent
config: Configuration for the agent
Returns:
Created agent instance
"""
config = config or {}
agent_id = str(uuid4())
try:
if agent_type == AgentType.FSM_REACT:
# Get tools for FSM agent
tools = config.get("tools", [])
if not tools:
tools = get_enhanced_tools()
agent = FSMReactAgentImpl(
agent_id=agent_id,
name=name,
tools=tools
)
elif agent_type == AgentType.NEXT_GEN:
model_config = config.get("model_config", {})
agent = NextGenAgentImpl(
agent_id=agent_id,
name=name,
model_config=model_config
)
elif agent_type == AgentType.CREW:
role = config.get("role", "general")
agent = CrewAgentImpl(
agent_id=agent_id,
name=name,
role=role
)
elif agent_type == AgentType.SPECIALIZED:
specialization = config.get("specialization", "general")
agent = SpecializedAgentImpl(
agent_id=agent_id,
name=name,
specialization=specialization
)
else:
raise ValueError(f"Unknown agent type: {agent_type}")
# Initialize the agent
init_config = {
"agent_type": agent_type.value,
"model_config": config.get("model_config", {}),
"max_concurrent_tasks": config.get("max_concurrent_tasks", 5),
"task_timeout": config.get("task_timeout", 300),
"enable_learning": config.get("enable_learning", True),
"enable_collaboration": config.get("enable_collaboration", True),
"memory_size": config.get("memory_size", 1000),
**config
}
success = await agent.initialize(init_config)
if not success:
raise RuntimeError(f"Failed to initialize agent {name}")
# Store the agent
self.created_agents[agent_id] = agent
self.logger.info(f"Created {agent_type.value} agent: {name} ({agent_id})")
return agent
except Exception as e:
self.logger.error(f"Failed to create agent {name}: {e}")
raise
async def create_fsm_agent(
self,
name: str,
tools: Optional[List[Any]] = None,
config: Optional[Dict[str, Any]] = None
) -> FSMReactAgentImpl:
"""Create an FSM React agent with specified tools"""
agent_config = config or {}
if tools:
agent_config["tools"] = tools
return await self.create_agent(AgentType.FSM_REACT, name, agent_config)
async def create_next_gen_agent(
self,
name: str,
model_config: Optional[Dict[str, Any]] = None,
config: Optional[Dict[str, Any]] = None
) -> NextGenAgentImpl:
"""Create a Next Generation agent with learning capabilities"""
agent_config = config or {}
if model_config:
agent_config["model_config"] = model_config
return await self.create_agent(AgentType.NEXT_GEN, name, agent_config)
async def create_crew_agent(
self,
name: str,
role: str = "general",
config: Optional[Dict[str, Any]] = None
) -> CrewAgentImpl:
"""Create a Crew agent for team collaboration"""
agent_config = config or {}
agent_config["role"] = role
return await self.create_agent(AgentType.CREW, name, agent_config)
async def create_specialized_agent(
self,
name: str,
specialization: str,
config: Optional[Dict[str, Any]] = None
) -> SpecializedAgentImpl:
"""Create a specialized agent for domain-specific tasks"""
agent_config = config or {}
agent_config["specialization"] = specialization
return await self.create_agent(AgentType.SPECIALIZED, name, agent_config)
async def create_agent_team(
self,
team_config: List[Dict[str, Any]]
) -> List[BaseAgent]:
"""Create a team of agents based on configuration"""
agents = []
for member_config in team_config:
agent_type = AgentType(member_config["type"])
name = member_config["name"]
config = member_config.get("config", {})
agent = await self.create_agent(agent_type, name, config)
agents.append(agent)
self.logger.info(f"Created agent team with {len(agents)} members")
return agents
def get_agent(self, agent_id: str) -> Optional[BaseAgent]:
"""Get an agent by ID"""
return self.created_agents.get(agent_id)
def get_all_agents(self) -> List[BaseAgent]:
"""Get all created agents"""
return list(self.created_agents.values())
def get_agents_by_type(self, agent_type: AgentType) -> List[BaseAgent]:
"""Get all agents of a specific type"""
return [
agent for agent in self.created_agents.values()
if hasattr(agent, 'agent_type') and agent.agent_type == agent_type
]
async def shutdown_agent(self, agent_id: str) -> bool:
"""Shutdown a specific agent"""
agent = self.created_agents.get(agent_id)
if not agent:
self.logger.warning(f"Agent {agent_id} not found")
return False
try:
success = await agent.shutdown()
if success:
del self.created_agents[agent_id]
self.logger.info(f"Agent {agent_id} shut down successfully")
return success
except Exception as e:
self.logger.error(f"Failed to shutdown agent {agent_id}: {e}")
return False
async def shutdown_all(self):
"""Shutdown all created agents"""
self.logger.info(f"Shutting down {len(self.created_agents)} agents")
shutdown_tasks = []
for agent_id, agent in self.created_agents.items():
task = self.shutdown_agent(agent_id)
shutdown_tasks.append(task)
# Wait for all shutdowns to complete
results = await asyncio.gather(*shutdown_tasks, return_exceptions=True)
success_count = sum(1 for result in results if result is True)
self.logger.info(f"Successfully shut down {success_count}/{len(self.created_agents)} agents")
def get_agent_statistics(self) -> Dict[str, Any]:
"""Get statistics about created agents"""
stats = {
"total_agents": len(self.created_agents),
"agents_by_type": {},
"available_agents": 0,
"busy_agents": 0,
"error_agents": 0
}
for agent in self.created_agents.values():
# Count by type
agent_type = getattr(agent, 'agent_type', 'unknown')
if agent_type not in stats["agents_by_type"]:
stats["agents_by_type"][agent_type] = 0
stats["agents_by_type"][agent_type] += 1
# Count by status
status = getattr(agent, 'status', None)
if status:
if status.name in ['AVAILABLE', 'IDLE']:
stats["available_agents"] += 1
elif status.name == 'BUSY':
stats["busy_agents"] += 1
elif status.name == 'ERROR':
stats["error_agents"] += 1
return stats
# Example usage function
async def create_agent_team():
"""Example: Create a team of different agent types"""
factory = AgentFactory()
# Create FSM React Agent
fsm_agent = await factory.create_agent(
AgentType.FSM_REACT,
"FSM_Agent_1",
{"tools": ["web_search", "calculator", "python_repl"]}
)
# Create Next Gen Agent
next_gen_agent = await factory.create_agent(
AgentType.NEXT_GEN,
"NextGen_Agent_1",
{"model_config": {"model": "gpt-4", "temperature": 0.8}}
)
# Create Crew Agents
coordinator = await factory.create_agent(
AgentType.CREW,
"Crew_Coordinator",
{"role": "coordinator", "team_id": "alpha_team"}
)
researcher = await factory.create_agent(
AgentType.CREW,
"Crew_Researcher",
{"role": "researcher", "team_id": "alpha_team"}
)
# Create Specialized Agents
data_analyst = await factory.create_agent(
AgentType.SPECIALIZED,
"Data_Analyst_1",
{"specialization": "data_analysis", "expertise_level": "expert"}
)
code_generator = await factory.create_agent(
AgentType.SPECIALIZED,
"Code_Generator_1",
{"specialization": "code_generation", "expertise_level": "advanced"}
)
return {
"fsm_agent": fsm_agent,
"next_gen_agent": next_gen_agent,
"coordinator": coordinator,
"researcher": researcher,
"data_analyst": data_analyst,
"code_generator": code_generator
} |