Spaces:
Build error
Build error
File size: 14,405 Bytes
8a682b5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 | from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import logging
from pydantic import BaseModel, Field
from crewai import Agent, Task, Crew, Process
from langchain.tools import BaseTool
from langchain_core.messages import BaseMessage
# --- Multi-Agent System Architecture ---
class AgentRole(str, Enum):
"""Enumeration of specialized agent roles"""
PLANNER = "planner" # Strategic planning and task decomposition
RESEARCHER = "researcher" # Information gathering and analysis
EXECUTOR = "executor" # Tool execution and action taking
VERIFIER = "verifier" # Fact checking and validation
SYNTHESIZER = "synthesizer" # Answer synthesis and presentation
class AgentCapability(BaseModel):
"""Schema for agent capabilities"""
role: AgentRole
description: str
tools: List[str] = Field(default_factory=list)
model_config: Dict[str, Any] = Field(default_factory=dict)
@dataclass
class AgentState:
"""State shared between agents"""
query: str
plan: Optional[List[Dict[str, Any]]] = None
findings: Dict[str, Any] = None
verification_results: Dict[str, Any] = None
final_answer: Optional[str] = None
errors: List[str] = None
class MultiAgentSystem:
"""Orchestrates a team of specialized agents with unified tool management"""
def __init__(self, tools: List[BaseTool], model_config: Dict[str, Any] = None):
self.tools = tools
self.model_config = model_config or {}
self.state = AgentState(query="", findings={}, errors=[])
# Initialize unified tool registry integration
try:
from src.integration_hub import get_unified_registry, get_tool_orchestrator
self.unified_registry = get_unified_registry()
self.tool_orchestrator = get_tool_orchestrator()
# Register tools with unified registry
for tool in tools:
self.unified_registry.register(tool)
logger.info(f"Multi-agent system registered {len(tools)} tools with unified registry")
except ImportError:
logger.warning("Unified tool registry not available, using local tools")
self.unified_registry = None
self.tool_orchestrator = None
# Initialize tool introspection
try:
from src.tools_introspection import tool_introspector
self.tool_introspector = tool_introspector
# Register tools with introspector
for tool in tools:
if hasattr(tool, 'name'):
self.tool_introspector.tool_registry[tool.name] = tool
logger.info("Multi-agent system initialized with tool introspection")
except ImportError:
logger.warning("Tool introspection not available")
self.tool_introspector = None
# Initialize specialized agents
self.agents = self._create_agent_team()
def _create_agent_team(self) -> Dict[AgentRole, Agent]:
"""Create a team of specialized agents with tool introspection"""
agents = {}
# Define agent configurations with tool assignments
agent_configs = {
AgentRole.PLANNER: {
"role": "Strategic Planner",
"goal": "Create detailed, step-by-step plans for complex tasks",
"backstory": "Expert at breaking down complex problems into manageable steps",
"tool_categories": ["search", "calculator", "planning"],
"verbose": True
},
AgentRole.RESEARCHER: {
"role": "Information Researcher",
"goal": "Gather and analyze relevant information from multiple sources",
"backstory": "Expert at finding and validating information from diverse sources",
"tool_categories": ["search", "wikipedia", "web_research", "semantic_search"],
"verbose": True
},
AgentRole.EXECUTOR: {
"role": "Task Executor",
"goal": "Execute specific tasks using appropriate tools",
"backstory": "Expert at using tools effectively to accomplish tasks",
"tool_categories": ["execution", "calculation", "file_processing", "code_execution"],
"verbose": True
},
AgentRole.VERIFIER: {
"role": "Fact Checker",
"goal": "Verify information accuracy and consistency",
"backstory": "Expert at validating facts and identifying inconsistencies",
"tool_categories": ["search", "calculator", "verification", "fact_checking"],
"verbose": True
},
AgentRole.SYNTHESIZER: {
"role": "Answer Synthesizer",
"goal": "Create clear, accurate, and well-structured answers",
"backstory": "Expert at synthesizing information into coherent responses",
"tool_categories": [], # No tools needed for synthesis
"verbose": True
}
}
# Create agents with appropriate tools
for role, config in agent_configs.items():
# Get tools for this agent role
agent_tools = self._get_tools_for_role(role, config["tool_categories"])
agents[role] = Agent(
role=config["role"],
goal=config["goal"],
backstory=config["backstory"],
tools=agent_tools,
verbose=config["verbose"]
)
logger.info(f"Created {role} agent with {len(agent_tools)} tools")
return agents
def _get_tools_for_role(self, role: AgentRole, tool_categories: List[str]) -> List[BaseTool]:
"""Get tools suitable for a specific agent role using introspection and reliability"""
if not tool_categories:
return []
# Use unified registry if available
if self.unified_registry:
# Get reliable tools for this role
reliable_tools = self.unified_registry.get_tools_by_reliability(min_success_rate=0.7)
# Filter by role-specific categories
role_tools = []
for tool in reliable_tools:
if hasattr(tool, 'name'):
# Check if tool matches any category for this role
if any(category in tool.name.lower() for category in tool_categories):
role_tools.append(tool)
if role_tools:
logger.info(f"Found {len(role_tools)} reliable tools for {role}")
return role_tools
# Fallback to tool introspection
if self.tool_introspector:
try:
# Use introspection to find suitable tools
suitable_tools = []
for tool in self.tools:
if hasattr(tool, 'name'):
# Analyze tool capabilities for this role
tool_schema = self.tool_introspector.get_tool_schema(tool.name)
if tool_schema and any(category in tool_schema.get("description", "").lower()
for category in tool_categories):
suitable_tools.append(tool)
if suitable_tools:
logger.info(f"Found {len(suitable_tools)} suitable tools for {role} via introspection")
return suitable_tools
except Exception as e:
logger.warning(f"Tool introspection failed for {role}: {e}")
# Final fallback: return tools that match category names
fallback_tools = []
for tool in self.tools:
if hasattr(tool, 'name'):
if any(category in tool.name.lower() for category in tool_categories):
fallback_tools.append(tool)
logger.info(f"Using {len(fallback_tools)} fallback tools for {role}")
return fallback_tools
def _filter_by_reliability(self, tools: List[BaseTool]) -> List[BaseTool]:
"""Filter tools by reliability score"""
if not self.unified_registry:
return tools
reliable_tools = []
for tool in tools:
if hasattr(tool, 'name'):
reliability = self.unified_registry.tool_reliability.get(tool.name, {})
total_calls = reliability.get("total_calls", 0)
success_count = reliability.get("success_count", 0)
# Include tools with good reliability or new tools
if total_calls == 0 or (success_count / total_calls >= 0.7):
reliable_tools.append(tool)
return reliable_tools
def _create_planning_task(self, query: str) -> Task:
"""Create a planning task"""
return Task(
description=f"Create a detailed plan to answer: {query}",
agent=self.agents[AgentRole.PLANNER],
expected_output="A list of specific steps to accomplish the task"
)
def _create_research_task(self, query: str) -> Task:
"""Create a research task"""
return Task(
description=f"Research information relevant to: {query}",
agent=self.agents[AgentRole.RESEARCHER],
expected_output="Comprehensive research findings with sources"
)
def _create_execution_task(self, step: Dict[str, Any]) -> Task:
"""Create an execution task for a specific step"""
return Task(
description=f"Execute step: {step['description']}",
agent=self.agents[AgentRole.EXECUTOR],
expected_output="Results of executing the step"
)
def _create_verification_task(self, findings: Dict[str, Any]) -> Task:
"""Create a verification task"""
return Task(
description="Verify the accuracy and consistency of findings",
agent=self.agents[AgentRole.VERIFIER],
expected_output="Verification results and any identified issues"
)
def _create_synthesis_task(self, query: str, findings: Dict[str, Any]) -> Task:
"""Create a synthesis task"""
return Task(
description=f"Synthesize findings into a clear answer for: {query}",
agent=self.agents[AgentRole.SYNTHESIZER],
expected_output="A clear, accurate, and well-structured answer"
)
def process_query(self, query: str) -> str:
"""Process a user query using the multi-agent system with enhanced tool management"""
try:
# Update state
self.state.query = query
# Create the crew
crew = Crew(
agents=list(self.agents.values()),
tasks=[], # Will be populated based on plan
process=Process.sequential,
verbose=True
)
# Create initial planning task
planning_task = self._create_planning_task(query)
crew.tasks.append(planning_task)
# Execute planning
plan_result = crew.kickoff()
self.state.plan = plan_result
# Create and execute research task
research_task = self._create_research_task(query)
crew.tasks.append(research_task)
research_result = crew.kickoff()
self.state.findings = research_result
# Create and execute verification task
verification_task = self._create_verification_task(research_result)
crew.tasks.append(verification_task)
verification_result = crew.kickoff()
self.state.verification_results = verification_result
# Create and execute synthesis task
synthesis_task = self._create_synthesis_task(query, research_result)
crew.tasks.append(synthesis_task)
final_answer = crew.kickoff()
self.state.final_answer = final_answer
# Update tool reliability metrics if orchestrator is available
if self.tool_orchestrator:
self._update_tool_metrics()
return final_answer
except Exception as e:
error_msg = f"Error in multi-agent system: {str(e)}"
self.state.errors.append(error_msg)
logging.error(error_msg)
raise
def _update_tool_metrics(self):
"""Update tool reliability metrics based on execution results"""
if not self.tool_orchestrator or not self.unified_registry:
return
try:
# This would update metrics based on tool usage during execution
# For now, this is a placeholder for the actual implementation
logger.debug("Tool metrics would be updated here")
except Exception as e:
logger.warning(f"Failed to update tool metrics: {e}")
def get_state(self) -> AgentState:
"""Get the current state of the multi-agent system"""
return self.state
def get_tool_usage_stats(self) -> Dict[str, Any]:
"""Get tool usage statistics for the multi-agent system"""
if not self.unified_registry:
return {}
stats = {}
for tool_name, reliability in self.unified_registry.tool_reliability.items():
total_calls = reliability.get("total_calls", 0)
success_count = reliability.get("success_count", 0)
if total_calls > 0:
stats[tool_name] = {
"total_calls": total_calls,
"success_rate": success_count / total_calls,
"avg_latency": reliability.get("avg_latency", 0.0),
"last_used": reliability.get("last_used")
}
return stats |