Spaces:
Build error
Build error
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import logging | |
| from pydantic import BaseModel, Field | |
| from crewai import Agent, Task, Crew, Process | |
| from langchain.tools import BaseTool | |
| from langchain_core.messages import BaseMessage | |
| # --- Multi-Agent System Architecture --- | |
| class AgentRole(str, Enum): | |
| """Enumeration of specialized agent roles""" | |
| PLANNER = "planner" # Strategic planning and task decomposition | |
| RESEARCHER = "researcher" # Information gathering and analysis | |
| EXECUTOR = "executor" # Tool execution and action taking | |
| VERIFIER = "verifier" # Fact checking and validation | |
| SYNTHESIZER = "synthesizer" # Answer synthesis and presentation | |
| class AgentCapability(BaseModel): | |
| """Schema for agent capabilities""" | |
| role: AgentRole | |
| description: str | |
| tools: List[str] = Field(default_factory=list) | |
| model_config: Dict[str, Any] = Field(default_factory=dict) | |
| class AgentState: | |
| """State shared between agents""" | |
| query: str | |
| plan: Optional[List[Dict[str, Any]]] = None | |
| findings: Dict[str, Any] = None | |
| verification_results: Dict[str, Any] = None | |
| final_answer: Optional[str] = None | |
| errors: List[str] = None | |
| class MultiAgentSystem: | |
| """Orchestrates a team of specialized agents with unified tool management""" | |
| def __init__(self, tools: List[BaseTool], model_config: Dict[str, Any] = None): | |
| self.tools = tools | |
| self.model_config = model_config or {} | |
| self.state = AgentState(query="", findings={}, errors=[]) | |
| # Initialize unified tool registry integration | |
| try: | |
| from src.integration_hub import get_unified_registry, get_tool_orchestrator | |
| self.unified_registry = get_unified_registry() | |
| self.tool_orchestrator = get_tool_orchestrator() | |
| # Register tools with unified registry | |
| for tool in tools: | |
| self.unified_registry.register(tool) | |
| logger.info(f"Multi-agent system registered {len(tools)} tools with unified registry") | |
| except ImportError: | |
| logger.warning("Unified tool registry not available, using local tools") | |
| self.unified_registry = None | |
| self.tool_orchestrator = None | |
| # Initialize tool introspection | |
| try: | |
| from src.tools_introspection import tool_introspector | |
| self.tool_introspector = tool_introspector | |
| # Register tools with introspector | |
| for tool in tools: | |
| if hasattr(tool, 'name'): | |
| self.tool_introspector.tool_registry[tool.name] = tool | |
| logger.info("Multi-agent system initialized with tool introspection") | |
| except ImportError: | |
| logger.warning("Tool introspection not available") | |
| self.tool_introspector = None | |
| # Initialize specialized agents | |
| self.agents = self._create_agent_team() | |
| def _create_agent_team(self) -> Dict[AgentRole, Agent]: | |
| """Create a team of specialized agents with tool introspection""" | |
| agents = {} | |
| # Define agent configurations with tool assignments | |
| agent_configs = { | |
| AgentRole.PLANNER: { | |
| "role": "Strategic Planner", | |
| "goal": "Create detailed, step-by-step plans for complex tasks", | |
| "backstory": "Expert at breaking down complex problems into manageable steps", | |
| "tool_categories": ["search", "calculator", "planning"], | |
| "verbose": True | |
| }, | |
| AgentRole.RESEARCHER: { | |
| "role": "Information Researcher", | |
| "goal": "Gather and analyze relevant information from multiple sources", | |
| "backstory": "Expert at finding and validating information from diverse sources", | |
| "tool_categories": ["search", "wikipedia", "web_research", "semantic_search"], | |
| "verbose": True | |
| }, | |
| AgentRole.EXECUTOR: { | |
| "role": "Task Executor", | |
| "goal": "Execute specific tasks using appropriate tools", | |
| "backstory": "Expert at using tools effectively to accomplish tasks", | |
| "tool_categories": ["execution", "calculation", "file_processing", "code_execution"], | |
| "verbose": True | |
| }, | |
| AgentRole.VERIFIER: { | |
| "role": "Fact Checker", | |
| "goal": "Verify information accuracy and consistency", | |
| "backstory": "Expert at validating facts and identifying inconsistencies", | |
| "tool_categories": ["search", "calculator", "verification", "fact_checking"], | |
| "verbose": True | |
| }, | |
| AgentRole.SYNTHESIZER: { | |
| "role": "Answer Synthesizer", | |
| "goal": "Create clear, accurate, and well-structured answers", | |
| "backstory": "Expert at synthesizing information into coherent responses", | |
| "tool_categories": [], # No tools needed for synthesis | |
| "verbose": True | |
| } | |
| } | |
| # Create agents with appropriate tools | |
| for role, config in agent_configs.items(): | |
| # Get tools for this agent role | |
| agent_tools = self._get_tools_for_role(role, config["tool_categories"]) | |
| agents[role] = Agent( | |
| role=config["role"], | |
| goal=config["goal"], | |
| backstory=config["backstory"], | |
| tools=agent_tools, | |
| verbose=config["verbose"] | |
| ) | |
| logger.info(f"Created {role} agent with {len(agent_tools)} tools") | |
| return agents | |
| def _get_tools_for_role(self, role: AgentRole, tool_categories: List[str]) -> List[BaseTool]: | |
| """Get tools suitable for a specific agent role using introspection and reliability""" | |
| if not tool_categories: | |
| return [] | |
| # Use unified registry if available | |
| if self.unified_registry: | |
| # Get reliable tools for this role | |
| reliable_tools = self.unified_registry.get_tools_by_reliability(min_success_rate=0.7) | |
| # Filter by role-specific categories | |
| role_tools = [] | |
| for tool in reliable_tools: | |
| if hasattr(tool, 'name'): | |
| # Check if tool matches any category for this role | |
| if any(category in tool.name.lower() for category in tool_categories): | |
| role_tools.append(tool) | |
| if role_tools: | |
| logger.info(f"Found {len(role_tools)} reliable tools for {role}") | |
| return role_tools | |
| # Fallback to tool introspection | |
| if self.tool_introspector: | |
| try: | |
| # Use introspection to find suitable tools | |
| suitable_tools = [] | |
| for tool in self.tools: | |
| if hasattr(tool, 'name'): | |
| # Analyze tool capabilities for this role | |
| tool_schema = self.tool_introspector.get_tool_schema(tool.name) | |
| if tool_schema and any(category in tool_schema.get("description", "").lower() | |
| for category in tool_categories): | |
| suitable_tools.append(tool) | |
| if suitable_tools: | |
| logger.info(f"Found {len(suitable_tools)} suitable tools for {role} via introspection") | |
| return suitable_tools | |
| except Exception as e: | |
| logger.warning(f"Tool introspection failed for {role}: {e}") | |
| # Final fallback: return tools that match category names | |
| fallback_tools = [] | |
| for tool in self.tools: | |
| if hasattr(tool, 'name'): | |
| if any(category in tool.name.lower() for category in tool_categories): | |
| fallback_tools.append(tool) | |
| logger.info(f"Using {len(fallback_tools)} fallback tools for {role}") | |
| return fallback_tools | |
| def _filter_by_reliability(self, tools: List[BaseTool]) -> List[BaseTool]: | |
| """Filter tools by reliability score""" | |
| if not self.unified_registry: | |
| return tools | |
| reliable_tools = [] | |
| for tool in tools: | |
| if hasattr(tool, 'name'): | |
| reliability = self.unified_registry.tool_reliability.get(tool.name, {}) | |
| total_calls = reliability.get("total_calls", 0) | |
| success_count = reliability.get("success_count", 0) | |
| # Include tools with good reliability or new tools | |
| if total_calls == 0 or (success_count / total_calls >= 0.7): | |
| reliable_tools.append(tool) | |
| return reliable_tools | |
| def _create_planning_task(self, query: str) -> Task: | |
| """Create a planning task""" | |
| return Task( | |
| description=f"Create a detailed plan to answer: {query}", | |
| agent=self.agents[AgentRole.PLANNER], | |
| expected_output="A list of specific steps to accomplish the task" | |
| ) | |
| def _create_research_task(self, query: str) -> Task: | |
| """Create a research task""" | |
| return Task( | |
| description=f"Research information relevant to: {query}", | |
| agent=self.agents[AgentRole.RESEARCHER], | |
| expected_output="Comprehensive research findings with sources" | |
| ) | |
| def _create_execution_task(self, step: Dict[str, Any]) -> Task: | |
| """Create an execution task for a specific step""" | |
| return Task( | |
| description=f"Execute step: {step['description']}", | |
| agent=self.agents[AgentRole.EXECUTOR], | |
| expected_output="Results of executing the step" | |
| ) | |
| def _create_verification_task(self, findings: Dict[str, Any]) -> Task: | |
| """Create a verification task""" | |
| return Task( | |
| description="Verify the accuracy and consistency of findings", | |
| agent=self.agents[AgentRole.VERIFIER], | |
| expected_output="Verification results and any identified issues" | |
| ) | |
| def _create_synthesis_task(self, query: str, findings: Dict[str, Any]) -> Task: | |
| """Create a synthesis task""" | |
| return Task( | |
| description=f"Synthesize findings into a clear answer for: {query}", | |
| agent=self.agents[AgentRole.SYNTHESIZER], | |
| expected_output="A clear, accurate, and well-structured answer" | |
| ) | |
| def process_query(self, query: str) -> str: | |
| """Process a user query using the multi-agent system with enhanced tool management""" | |
| try: | |
| # Update state | |
| self.state.query = query | |
| # Create the crew | |
| crew = Crew( | |
| agents=list(self.agents.values()), | |
| tasks=[], # Will be populated based on plan | |
| process=Process.sequential, | |
| verbose=True | |
| ) | |
| # Create initial planning task | |
| planning_task = self._create_planning_task(query) | |
| crew.tasks.append(planning_task) | |
| # Execute planning | |
| plan_result = crew.kickoff() | |
| self.state.plan = plan_result | |
| # Create and execute research task | |
| research_task = self._create_research_task(query) | |
| crew.tasks.append(research_task) | |
| research_result = crew.kickoff() | |
| self.state.findings = research_result | |
| # Create and execute verification task | |
| verification_task = self._create_verification_task(research_result) | |
| crew.tasks.append(verification_task) | |
| verification_result = crew.kickoff() | |
| self.state.verification_results = verification_result | |
| # Create and execute synthesis task | |
| synthesis_task = self._create_synthesis_task(query, research_result) | |
| crew.tasks.append(synthesis_task) | |
| final_answer = crew.kickoff() | |
| self.state.final_answer = final_answer | |
| # Update tool reliability metrics if orchestrator is available | |
| if self.tool_orchestrator: | |
| self._update_tool_metrics() | |
| return final_answer | |
| except Exception as e: | |
| error_msg = f"Error in multi-agent system: {str(e)}" | |
| self.state.errors.append(error_msg) | |
| logging.error(error_msg) | |
| raise | |
| def _update_tool_metrics(self): | |
| """Update tool reliability metrics based on execution results""" | |
| if not self.tool_orchestrator or not self.unified_registry: | |
| return | |
| try: | |
| # This would update metrics based on tool usage during execution | |
| # For now, this is a placeholder for the actual implementation | |
| logger.debug("Tool metrics would be updated here") | |
| except Exception as e: | |
| logger.warning(f"Failed to update tool metrics: {e}") | |
| def get_state(self) -> AgentState: | |
| """Get the current state of the multi-agent system""" | |
| return self.state | |
| def get_tool_usage_stats(self) -> Dict[str, Any]: | |
| """Get tool usage statistics for the multi-agent system""" | |
| if not self.unified_registry: | |
| return {} | |
| stats = {} | |
| for tool_name, reliability in self.unified_registry.tool_reliability.items(): | |
| total_calls = reliability.get("total_calls", 0) | |
| success_count = reliability.get("success_count", 0) | |
| if total_calls > 0: | |
| stats[tool_name] = { | |
| "total_calls": total_calls, | |
| "success_rate": success_count / total_calls, | |
| "avg_latency": reliability.get("avg_latency", 0.0), | |
| "last_used": reliability.get("last_used") | |
| } | |
| return stats |