RobotPai / src /agents /multi_agent_system.py
atr0p05's picture
Upload 291 files
8a682b5 verified
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import logging
from pydantic import BaseModel, Field
from crewai import Agent, Task, Crew, Process
from langchain.tools import BaseTool
from langchain_core.messages import BaseMessage
# --- Multi-Agent System Architecture ---
class AgentRole(str, Enum):
"""Enumeration of specialized agent roles"""
PLANNER = "planner" # Strategic planning and task decomposition
RESEARCHER = "researcher" # Information gathering and analysis
EXECUTOR = "executor" # Tool execution and action taking
VERIFIER = "verifier" # Fact checking and validation
SYNTHESIZER = "synthesizer" # Answer synthesis and presentation
class AgentCapability(BaseModel):
"""Schema for agent capabilities"""
role: AgentRole
description: str
tools: List[str] = Field(default_factory=list)
model_config: Dict[str, Any] = Field(default_factory=dict)
@dataclass
class AgentState:
"""State shared between agents"""
query: str
plan: Optional[List[Dict[str, Any]]] = None
findings: Dict[str, Any] = None
verification_results: Dict[str, Any] = None
final_answer: Optional[str] = None
errors: List[str] = None
class MultiAgentSystem:
"""Orchestrates a team of specialized agents with unified tool management"""
def __init__(self, tools: List[BaseTool], model_config: Dict[str, Any] = None):
self.tools = tools
self.model_config = model_config or {}
self.state = AgentState(query="", findings={}, errors=[])
# Initialize unified tool registry integration
try:
from src.integration_hub import get_unified_registry, get_tool_orchestrator
self.unified_registry = get_unified_registry()
self.tool_orchestrator = get_tool_orchestrator()
# Register tools with unified registry
for tool in tools:
self.unified_registry.register(tool)
logger.info(f"Multi-agent system registered {len(tools)} tools with unified registry")
except ImportError:
logger.warning("Unified tool registry not available, using local tools")
self.unified_registry = None
self.tool_orchestrator = None
# Initialize tool introspection
try:
from src.tools_introspection import tool_introspector
self.tool_introspector = tool_introspector
# Register tools with introspector
for tool in tools:
if hasattr(tool, 'name'):
self.tool_introspector.tool_registry[tool.name] = tool
logger.info("Multi-agent system initialized with tool introspection")
except ImportError:
logger.warning("Tool introspection not available")
self.tool_introspector = None
# Initialize specialized agents
self.agents = self._create_agent_team()
def _create_agent_team(self) -> Dict[AgentRole, Agent]:
"""Create a team of specialized agents with tool introspection"""
agents = {}
# Define agent configurations with tool assignments
agent_configs = {
AgentRole.PLANNER: {
"role": "Strategic Planner",
"goal": "Create detailed, step-by-step plans for complex tasks",
"backstory": "Expert at breaking down complex problems into manageable steps",
"tool_categories": ["search", "calculator", "planning"],
"verbose": True
},
AgentRole.RESEARCHER: {
"role": "Information Researcher",
"goal": "Gather and analyze relevant information from multiple sources",
"backstory": "Expert at finding and validating information from diverse sources",
"tool_categories": ["search", "wikipedia", "web_research", "semantic_search"],
"verbose": True
},
AgentRole.EXECUTOR: {
"role": "Task Executor",
"goal": "Execute specific tasks using appropriate tools",
"backstory": "Expert at using tools effectively to accomplish tasks",
"tool_categories": ["execution", "calculation", "file_processing", "code_execution"],
"verbose": True
},
AgentRole.VERIFIER: {
"role": "Fact Checker",
"goal": "Verify information accuracy and consistency",
"backstory": "Expert at validating facts and identifying inconsistencies",
"tool_categories": ["search", "calculator", "verification", "fact_checking"],
"verbose": True
},
AgentRole.SYNTHESIZER: {
"role": "Answer Synthesizer",
"goal": "Create clear, accurate, and well-structured answers",
"backstory": "Expert at synthesizing information into coherent responses",
"tool_categories": [], # No tools needed for synthesis
"verbose": True
}
}
# Create agents with appropriate tools
for role, config in agent_configs.items():
# Get tools for this agent role
agent_tools = self._get_tools_for_role(role, config["tool_categories"])
agents[role] = Agent(
role=config["role"],
goal=config["goal"],
backstory=config["backstory"],
tools=agent_tools,
verbose=config["verbose"]
)
logger.info(f"Created {role} agent with {len(agent_tools)} tools")
return agents
def _get_tools_for_role(self, role: AgentRole, tool_categories: List[str]) -> List[BaseTool]:
"""Get tools suitable for a specific agent role using introspection and reliability"""
if not tool_categories:
return []
# Use unified registry if available
if self.unified_registry:
# Get reliable tools for this role
reliable_tools = self.unified_registry.get_tools_by_reliability(min_success_rate=0.7)
# Filter by role-specific categories
role_tools = []
for tool in reliable_tools:
if hasattr(tool, 'name'):
# Check if tool matches any category for this role
if any(category in tool.name.lower() for category in tool_categories):
role_tools.append(tool)
if role_tools:
logger.info(f"Found {len(role_tools)} reliable tools for {role}")
return role_tools
# Fallback to tool introspection
if self.tool_introspector:
try:
# Use introspection to find suitable tools
suitable_tools = []
for tool in self.tools:
if hasattr(tool, 'name'):
# Analyze tool capabilities for this role
tool_schema = self.tool_introspector.get_tool_schema(tool.name)
if tool_schema and any(category in tool_schema.get("description", "").lower()
for category in tool_categories):
suitable_tools.append(tool)
if suitable_tools:
logger.info(f"Found {len(suitable_tools)} suitable tools for {role} via introspection")
return suitable_tools
except Exception as e:
logger.warning(f"Tool introspection failed for {role}: {e}")
# Final fallback: return tools that match category names
fallback_tools = []
for tool in self.tools:
if hasattr(tool, 'name'):
if any(category in tool.name.lower() for category in tool_categories):
fallback_tools.append(tool)
logger.info(f"Using {len(fallback_tools)} fallback tools for {role}")
return fallback_tools
def _filter_by_reliability(self, tools: List[BaseTool]) -> List[BaseTool]:
"""Filter tools by reliability score"""
if not self.unified_registry:
return tools
reliable_tools = []
for tool in tools:
if hasattr(tool, 'name'):
reliability = self.unified_registry.tool_reliability.get(tool.name, {})
total_calls = reliability.get("total_calls", 0)
success_count = reliability.get("success_count", 0)
# Include tools with good reliability or new tools
if total_calls == 0 or (success_count / total_calls >= 0.7):
reliable_tools.append(tool)
return reliable_tools
def _create_planning_task(self, query: str) -> Task:
"""Create a planning task"""
return Task(
description=f"Create a detailed plan to answer: {query}",
agent=self.agents[AgentRole.PLANNER],
expected_output="A list of specific steps to accomplish the task"
)
def _create_research_task(self, query: str) -> Task:
"""Create a research task"""
return Task(
description=f"Research information relevant to: {query}",
agent=self.agents[AgentRole.RESEARCHER],
expected_output="Comprehensive research findings with sources"
)
def _create_execution_task(self, step: Dict[str, Any]) -> Task:
"""Create an execution task for a specific step"""
return Task(
description=f"Execute step: {step['description']}",
agent=self.agents[AgentRole.EXECUTOR],
expected_output="Results of executing the step"
)
def _create_verification_task(self, findings: Dict[str, Any]) -> Task:
"""Create a verification task"""
return Task(
description="Verify the accuracy and consistency of findings",
agent=self.agents[AgentRole.VERIFIER],
expected_output="Verification results and any identified issues"
)
def _create_synthesis_task(self, query: str, findings: Dict[str, Any]) -> Task:
"""Create a synthesis task"""
return Task(
description=f"Synthesize findings into a clear answer for: {query}",
agent=self.agents[AgentRole.SYNTHESIZER],
expected_output="A clear, accurate, and well-structured answer"
)
def process_query(self, query: str) -> str:
"""Process a user query using the multi-agent system with enhanced tool management"""
try:
# Update state
self.state.query = query
# Create the crew
crew = Crew(
agents=list(self.agents.values()),
tasks=[], # Will be populated based on plan
process=Process.sequential,
verbose=True
)
# Create initial planning task
planning_task = self._create_planning_task(query)
crew.tasks.append(planning_task)
# Execute planning
plan_result = crew.kickoff()
self.state.plan = plan_result
# Create and execute research task
research_task = self._create_research_task(query)
crew.tasks.append(research_task)
research_result = crew.kickoff()
self.state.findings = research_result
# Create and execute verification task
verification_task = self._create_verification_task(research_result)
crew.tasks.append(verification_task)
verification_result = crew.kickoff()
self.state.verification_results = verification_result
# Create and execute synthesis task
synthesis_task = self._create_synthesis_task(query, research_result)
crew.tasks.append(synthesis_task)
final_answer = crew.kickoff()
self.state.final_answer = final_answer
# Update tool reliability metrics if orchestrator is available
if self.tool_orchestrator:
self._update_tool_metrics()
return final_answer
except Exception as e:
error_msg = f"Error in multi-agent system: {str(e)}"
self.state.errors.append(error_msg)
logging.error(error_msg)
raise
def _update_tool_metrics(self):
"""Update tool reliability metrics based on execution results"""
if not self.tool_orchestrator or not self.unified_registry:
return
try:
# This would update metrics based on tool usage during execution
# For now, this is a placeholder for the actual implementation
logger.debug("Tool metrics would be updated here")
except Exception as e:
logger.warning(f"Failed to update tool metrics: {e}")
def get_state(self) -> AgentState:
"""Get the current state of the multi-agent system"""
return self.state
def get_tool_usage_stats(self) -> Dict[str, Any]:
"""Get tool usage statistics for the multi-agent system"""
if not self.unified_registry:
return {}
stats = {}
for tool_name, reliability in self.unified_registry.tool_reliability.items():
total_calls = reliability.get("total_calls", 0)
success_count = reliability.get("success_count", 0)
if total_calls > 0:
stats[tool_name] = {
"total_calls": total_calls,
"success_rate": success_count / total_calls,
"avg_latency": reliability.get("avg_latency", 0.0),
"last_used": reliability.get("last_used")
}
return stats