File size: 14,405 Bytes
8a682b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import logging
from pydantic import BaseModel, Field

from crewai import Agent, Task, Crew, Process
from langchain.tools import BaseTool
from langchain_core.messages import BaseMessage

# --- Multi-Agent System Architecture ---

class AgentRole(str, Enum):
    """Enumeration of specialized agent roles"""
    PLANNER = "planner"           # Strategic planning and task decomposition
    RESEARCHER = "researcher"     # Information gathering and analysis
    EXECUTOR = "executor"         # Tool execution and action taking
    VERIFIER = "verifier"         # Fact checking and validation
    SYNTHESIZER = "synthesizer"   # Answer synthesis and presentation

class AgentCapability(BaseModel):
    """Schema for agent capabilities"""
    role: AgentRole
    description: str
    tools: List[str] = Field(default_factory=list)
    model_config: Dict[str, Any] = Field(default_factory=dict)

@dataclass
class AgentState:
    """State shared between agents"""
    query: str
    plan: Optional[List[Dict[str, Any]]] = None
    findings: Dict[str, Any] = None
    verification_results: Dict[str, Any] = None
    final_answer: Optional[str] = None
    errors: List[str] = None

class MultiAgentSystem:
    """Orchestrates a team of specialized agents with unified tool management"""
    
    def __init__(self, tools: List[BaseTool], model_config: Dict[str, Any] = None):
        self.tools = tools
        self.model_config = model_config or {}
        self.state = AgentState(query="", findings={}, errors=[])
        
        # Initialize unified tool registry integration
        try:
            from src.integration_hub import get_unified_registry, get_tool_orchestrator
            self.unified_registry = get_unified_registry()
            self.tool_orchestrator = get_tool_orchestrator()
            
            # Register tools with unified registry
            for tool in tools:
                self.unified_registry.register(tool)
            
            logger.info(f"Multi-agent system registered {len(tools)} tools with unified registry")
            
        except ImportError:
            logger.warning("Unified tool registry not available, using local tools")
            self.unified_registry = None
            self.tool_orchestrator = None
        
        # Initialize tool introspection
        try:
            from src.tools_introspection import tool_introspector
            self.tool_introspector = tool_introspector
            
            # Register tools with introspector
            for tool in tools:
                if hasattr(tool, 'name'):
                    self.tool_introspector.tool_registry[tool.name] = tool
            
            logger.info("Multi-agent system initialized with tool introspection")
            
        except ImportError:
            logger.warning("Tool introspection not available")
            self.tool_introspector = None
        
        # Initialize specialized agents
        self.agents = self._create_agent_team()
        
    def _create_agent_team(self) -> Dict[AgentRole, Agent]:
        """Create a team of specialized agents with tool introspection"""
        agents = {}
        
        # Define agent configurations with tool assignments
        agent_configs = {
            AgentRole.PLANNER: {
                "role": "Strategic Planner",
                "goal": "Create detailed, step-by-step plans for complex tasks",
                "backstory": "Expert at breaking down complex problems into manageable steps",
                "tool_categories": ["search", "calculator", "planning"],
                "verbose": True
            },
            AgentRole.RESEARCHER: {
                "role": "Information Researcher",
                "goal": "Gather and analyze relevant information from multiple sources",
                "backstory": "Expert at finding and validating information from diverse sources",
                "tool_categories": ["search", "wikipedia", "web_research", "semantic_search"],
                "verbose": True
            },
            AgentRole.EXECUTOR: {
                "role": "Task Executor",
                "goal": "Execute specific tasks using appropriate tools",
                "backstory": "Expert at using tools effectively to accomplish tasks",
                "tool_categories": ["execution", "calculation", "file_processing", "code_execution"],
                "verbose": True
            },
            AgentRole.VERIFIER: {
                "role": "Fact Checker",
                "goal": "Verify information accuracy and consistency",
                "backstory": "Expert at validating facts and identifying inconsistencies",
                "tool_categories": ["search", "calculator", "verification", "fact_checking"],
                "verbose": True
            },
            AgentRole.SYNTHESIZER: {
                "role": "Answer Synthesizer",
                "goal": "Create clear, accurate, and well-structured answers",
                "backstory": "Expert at synthesizing information into coherent responses",
                "tool_categories": [],  # No tools needed for synthesis
                "verbose": True
            }
        }
        
        # Create agents with appropriate tools
        for role, config in agent_configs.items():
            # Get tools for this agent role
            agent_tools = self._get_tools_for_role(role, config["tool_categories"])
            
            agents[role] = Agent(
                role=config["role"],
                goal=config["goal"],
                backstory=config["backstory"],
                tools=agent_tools,
                verbose=config["verbose"]
            )
            
            logger.info(f"Created {role} agent with {len(agent_tools)} tools")
        
        return agents
    
    def _get_tools_for_role(self, role: AgentRole, tool_categories: List[str]) -> List[BaseTool]:
        """Get tools suitable for a specific agent role using introspection and reliability"""
        if not tool_categories:
            return []
        
        # Use unified registry if available
        if self.unified_registry:
            # Get reliable tools for this role
            reliable_tools = self.unified_registry.get_tools_by_reliability(min_success_rate=0.7)
            
            # Filter by role-specific categories
            role_tools = []
            for tool in reliable_tools:
                if hasattr(tool, 'name'):
                    # Check if tool matches any category for this role
                    if any(category in tool.name.lower() for category in tool_categories):
                        role_tools.append(tool)
            
            if role_tools:
                logger.info(f"Found {len(role_tools)} reliable tools for {role}")
                return role_tools
        
        # Fallback to tool introspection
        if self.tool_introspector:
            try:
                # Use introspection to find suitable tools
                suitable_tools = []
                for tool in self.tools:
                    if hasattr(tool, 'name'):
                        # Analyze tool capabilities for this role
                        tool_schema = self.tool_introspector.get_tool_schema(tool.name)
                        if tool_schema and any(category in tool_schema.get("description", "").lower() 
                                             for category in tool_categories):
                            suitable_tools.append(tool)
                
                if suitable_tools:
                    logger.info(f"Found {len(suitable_tools)} suitable tools for {role} via introspection")
                    return suitable_tools
                    
            except Exception as e:
                logger.warning(f"Tool introspection failed for {role}: {e}")
        
        # Final fallback: return tools that match category names
        fallback_tools = []
        for tool in self.tools:
            if hasattr(tool, 'name'):
                if any(category in tool.name.lower() for category in tool_categories):
                    fallback_tools.append(tool)
        
        logger.info(f"Using {len(fallback_tools)} fallback tools for {role}")
        return fallback_tools
    
    def _filter_by_reliability(self, tools: List[BaseTool]) -> List[BaseTool]:
        """Filter tools by reliability score"""
        if not self.unified_registry:
            return tools
        
        reliable_tools = []
        for tool in tools:
            if hasattr(tool, 'name'):
                reliability = self.unified_registry.tool_reliability.get(tool.name, {})
                total_calls = reliability.get("total_calls", 0)
                success_count = reliability.get("success_count", 0)
                
                # Include tools with good reliability or new tools
                if total_calls == 0 or (success_count / total_calls >= 0.7):
                    reliable_tools.append(tool)
        
        return reliable_tools
    
    def _create_planning_task(self, query: str) -> Task:
        """Create a planning task"""
        return Task(
            description=f"Create a detailed plan to answer: {query}",
            agent=self.agents[AgentRole.PLANNER],
            expected_output="A list of specific steps to accomplish the task"
        )
        
    def _create_research_task(self, query: str) -> Task:
        """Create a research task"""
        return Task(
            description=f"Research information relevant to: {query}",
            agent=self.agents[AgentRole.RESEARCHER],
            expected_output="Comprehensive research findings with sources"
        )
        
    def _create_execution_task(self, step: Dict[str, Any]) -> Task:
        """Create an execution task for a specific step"""
        return Task(
            description=f"Execute step: {step['description']}",
            agent=self.agents[AgentRole.EXECUTOR],
            expected_output="Results of executing the step"
        )
        
    def _create_verification_task(self, findings: Dict[str, Any]) -> Task:
        """Create a verification task"""
        return Task(
            description="Verify the accuracy and consistency of findings",
            agent=self.agents[AgentRole.VERIFIER],
            expected_output="Verification results and any identified issues"
        )
        
    def _create_synthesis_task(self, query: str, findings: Dict[str, Any]) -> Task:
        """Create a synthesis task"""
        return Task(
            description=f"Synthesize findings into a clear answer for: {query}",
            agent=self.agents[AgentRole.SYNTHESIZER],
            expected_output="A clear, accurate, and well-structured answer"
        )
        
    def process_query(self, query: str) -> str:
        """Process a user query using the multi-agent system with enhanced tool management"""
        try:
            # Update state
            self.state.query = query
            
            # Create the crew
            crew = Crew(
                agents=list(self.agents.values()),
                tasks=[],  # Will be populated based on plan
                process=Process.sequential,
                verbose=True
            )
            
            # Create initial planning task
            planning_task = self._create_planning_task(query)
            crew.tasks.append(planning_task)
            
            # Execute planning
            plan_result = crew.kickoff()
            self.state.plan = plan_result
            
            # Create and execute research task
            research_task = self._create_research_task(query)
            crew.tasks.append(research_task)
            research_result = crew.kickoff()
            self.state.findings = research_result
            
            # Create and execute verification task
            verification_task = self._create_verification_task(research_result)
            crew.tasks.append(verification_task)
            verification_result = crew.kickoff()
            self.state.verification_results = verification_result
            
            # Create and execute synthesis task
            synthesis_task = self._create_synthesis_task(query, research_result)
            crew.tasks.append(synthesis_task)
            final_answer = crew.kickoff()
            self.state.final_answer = final_answer
            
            # Update tool reliability metrics if orchestrator is available
            if self.tool_orchestrator:
                self._update_tool_metrics()
            
            return final_answer
            
        except Exception as e:
            error_msg = f"Error in multi-agent system: {str(e)}"
            self.state.errors.append(error_msg)
            logging.error(error_msg)
            raise
    
    def _update_tool_metrics(self):
        """Update tool reliability metrics based on execution results"""
        if not self.tool_orchestrator or not self.unified_registry:
            return
        
        try:
            # This would update metrics based on tool usage during execution
            # For now, this is a placeholder for the actual implementation
            logger.debug("Tool metrics would be updated here")
            
        except Exception as e:
            logger.warning(f"Failed to update tool metrics: {e}")
            
    def get_state(self) -> AgentState:
        """Get the current state of the multi-agent system"""
        return self.state
    
    def get_tool_usage_stats(self) -> Dict[str, Any]:
        """Get tool usage statistics for the multi-agent system"""
        if not self.unified_registry:
            return {}
        
        stats = {}
        for tool_name, reliability in self.unified_registry.tool_reliability.items():
            total_calls = reliability.get("total_calls", 0)
            success_count = reliability.get("success_count", 0)
            
            if total_calls > 0:
                stats[tool_name] = {
                    "total_calls": total_calls,
                    "success_rate": success_count / total_calls,
                    "avg_latency": reliability.get("avg_latency", 0.0),
                    "last_used": reliability.get("last_used")
                }
        
        return stats