Spaces:
Sleeping
Sleeping
| """ | |
| Multi-Agent Coordinator using LangChain | |
| Orchestrates Triage and Assignment agents in a coordinated workflow | |
| """ | |
| import sys | |
| import os | |
| sys.path.append(os.path.dirname(os.path.dirname(__file__))) | |
| from agents.triage.triage_agent import TriageAgent | |
| from agents.assignment.assignment_agent import AssignmentAgent | |
| from knowledge_graph.neo4j_client import Neo4jClient | |
| from typing import Dict, List, Optional | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class BugTriageCoordinator: | |
| """ | |
| Coordinates multiple agents to process bugs end-to-end: | |
| 1. Triage Agent: Classify and prioritize bug | |
| 2. Neo4j: Store bug in knowledge graph | |
| 3. Assignment Agent: Find and assign optimal developer | |
| """ | |
| def __init__(self): | |
| """Initialize all agents""" | |
| self.triage_agent = None | |
| self.assignment_agent = None | |
| self.neo4j = None | |
| self.initialized = False | |
| def initialize(self) -> bool: | |
| """Initialize and connect all agents""" | |
| try: | |
| logger.info("π Initializing Multi-Agent System...") | |
| # Initialize Triage Agent | |
| logger.info(" Loading Triage Agent...") | |
| self.triage_agent = TriageAgent() | |
| if not self.triage_agent.load_model(): | |
| logger.error(" β Triage Agent failed to load") | |
| return False | |
| logger.info(" β Triage Agent ready") | |
| # Initialize Neo4j Client | |
| logger.info(" Connecting to Neo4j...") | |
| self.neo4j = Neo4jClient() | |
| if not self.neo4j.connect(): | |
| logger.error(" β Neo4j connection failed") | |
| return False | |
| logger.info(" β Neo4j connected") | |
| # Initialize Assignment Agent | |
| logger.info(" Loading Assignment Agent...") | |
| self.assignment_agent = AssignmentAgent() | |
| if not self.assignment_agent.connect(): | |
| logger.error(" β Assignment Agent failed to connect") | |
| return False | |
| logger.info(" β Assignment Agent ready") | |
| self.initialized = True | |
| logger.info("π Multi-Agent System initialized successfully!\n") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Initialization failed: {e}") | |
| return False | |
| def shutdown(self): | |
| """Close all agent connections""" | |
| logger.info("Shutting down Multi-Agent System...") | |
| if self.assignment_agent: | |
| self.assignment_agent.close() | |
| if self.neo4j: | |
| self.neo4j.close() | |
| logger.info("β Shutdown complete") | |
| def process_new_bug(self, title: str, description: str, | |
| reporter: str = "user@example.com", | |
| component: Optional[str] = None) -> Dict: | |
| """ | |
| Complete end-to-end bug processing workflow | |
| Workflow: | |
| 1. Triage Agent classifies the bug | |
| 2. Create bug node in Neo4j knowledge graph | |
| 3. Link bug to component | |
| 4. Assignment Agent finds best developer | |
| 5. Assign bug to developer in graph | |
| Args: | |
| title: Bug title | |
| description: Bug description | |
| reporter: Email of person reporting bug | |
| component: Optional component name (or will be inferred) | |
| Returns: | |
| Dict with complete processing results | |
| """ | |
| if not self.initialized: | |
| return { | |
| 'success': False, | |
| 'error': 'Multi-Agent System not initialized' | |
| } | |
| try: | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"PROCESSING NEW BUG: {title}") | |
| logger.info(f"{'='*60}\n") | |
| # STEP 1: Triage Classification | |
| logger.info("STEP 1: Running Triage Agent...") | |
| triage_result = self.triage_agent.classify_bug(title, description) | |
| if not triage_result: | |
| return { | |
| 'success': False, | |
| 'error': 'Triage classification failed' | |
| } | |
| logger.info(f" Category: {triage_result['category']}") | |
| logger.info(f" Priority: {triage_result['priority']}") | |
| logger.info(f" Severity: {triage_result['severity']}") | |
| logger.info(f" Confidence: {triage_result['category_confidence']:.2%}\n") | |
| # STEP 2: Create Bug in Knowledge Graph | |
| logger.info("STEP 2: Creating bug in Knowledge Graph...") | |
| # Generate bug ID | |
| stats = self.neo4j.get_graph_statistics() | |
| bug_id = f"BUG-{str(stats['total_bugs'] + 1).zfill(3)}" | |
| # Determine component (use provided or map from category) | |
| if not component: | |
| component_map = { | |
| 'UI': 'User Dashboard', | |
| 'Backend': 'API Gateway', | |
| 'API': 'API Gateway', | |
| 'Database': 'Database Layer', | |
| 'Performance': 'API Gateway' | |
| } | |
| component = component_map.get(triage_result['category'], 'API Gateway') | |
| # Create bug data | |
| bug_data = { | |
| 'id': bug_id, | |
| 'title': title, | |
| 'description': description, | |
| 'priority': triage_result['priority'], | |
| 'severity': triage_result['severity'], | |
| 'category': triage_result['category'], | |
| 'component': component, | |
| 'reporter': reporter, | |
| 'estimated_hours': 8 # Default estimate | |
| } | |
| created_bug = self.neo4j.create_bug(bug_data) | |
| if not created_bug: | |
| return { | |
| 'success': False, | |
| 'error': 'Failed to create bug in knowledge graph' | |
| } | |
| logger.info(f" β Bug created: {bug_id}\n") | |
| # STEP 3: Link to Component | |
| logger.info("STEP 3: Linking bug to component...") | |
| linked = self.neo4j.link_bug_to_component(bug_id, component) | |
| if linked: | |
| logger.info(f" β Linked to {component}\n") | |
| # STEP 4: Assignment | |
| logger.info("STEP 4: Running Assignment Agent...") | |
| assignment_result = self.assignment_agent.assign_bug(bug_id) | |
| if not assignment_result: | |
| return { | |
| 'success': False, | |
| 'bug_id': bug_id, | |
| 'triage': triage_result, | |
| 'error': 'Assignment failed - no suitable developer found' | |
| } | |
| logger.info(f" β Assigned to: {assignment_result['assigned_to']}\n") | |
| # STEP 5: Complete | |
| logger.info("β BUG PROCESSING COMPLETE\n") | |
| return { | |
| 'success': True, | |
| 'bug_id': bug_id, | |
| 'triage': triage_result, | |
| 'assignment': assignment_result, | |
| 'workflow': { | |
| 'steps_completed': ['triage', 'graph_creation', 'component_linking', 'assignment'], | |
| 'processing_time': 'Complete' | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"β Bug processing failed: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def batch_process_bugs(self, bugs: List[Dict]) -> List[Dict]: | |
| """Process multiple bugs through the complete workflow""" | |
| if not self.initialized: | |
| logger.error("Multi-Agent System not initialized") | |
| return [] | |
| logger.info(f"\nπ Batch Processing {len(bugs)} bugs...\n") | |
| results = [] | |
| for i, bug in enumerate(bugs, 1): | |
| logger.info(f"\n--- Processing Bug {i}/{len(bugs)} ---") | |
| result = self.process_new_bug( | |
| title=bug['title'], | |
| description=bug['description'], | |
| reporter=bug.get('reporter', 'user@example.com'), | |
| component=bug.get('component') | |
| ) | |
| results.append(result) | |
| # Summary | |
| successful = sum(1 for r in results if r['success']) | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"BATCH PROCESSING COMPLETE") | |
| logger.info(f" Successful: {successful}/{len(bugs)}") | |
| logger.info(f" Failed: {len(bugs) - successful}/{len(bugs)}") | |
| logger.info(f"{'='*60}\n") | |
| return results | |
| def get_system_status(self) -> Dict: | |
| """Get current system status and statistics""" | |
| if not self.initialized: | |
| return {'status': 'Not Initialized'} | |
| stats = self.neo4j.get_graph_statistics() | |
| workload = self.assignment_agent.get_workload_report() | |
| return { | |
| 'status': 'Online', | |
| 'agents': { | |
| 'triage': 'Active', | |
| 'assignment': 'Active', | |
| 'resolution': 'Not Implemented', | |
| 'analytics': 'Not Implemented' | |
| }, | |
| 'knowledge_graph': stats, | |
| 'team_workload': workload | |
| } | |
| # Example usage and testing | |
| if __name__ == "__main__": | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| coordinator = BugTriageCoordinator() | |
| if coordinator.initialize(): | |
| # Test: Process a new bug end-to-end | |
| print("\n" + "="*60) | |
| print("TEST: Complete Bug Processing Workflow") | |
| print("="*60 + "\n") | |
| result = coordinator.process_new_bug( | |
| title="Payment gateway returning 500 errors", | |
| description="Users cannot complete purchases. Payment API throwing internal server errors on checkout.", | |
| reporter="support@example.com", | |
| component="API Gateway" | |
| ) | |
| if result['success']: | |
| print("\nβ WORKFLOW SUCCESSFUL!\n") | |
| print(f"Bug ID: {result['bug_id']}") | |
| print(f"Category: {result['triage']['category']}") | |
| print(f"Priority: {result['triage']['priority']}") | |
| print(f"Assigned To: {result['assignment']['assigned_to']}") | |
| print(f"Developer Email: {result['assignment']['developer_email']}") | |
| else: | |
| print(f"\nβ WORKFLOW FAILED: {result.get('error')}\n") | |
| # Get system status | |
| print("\n" + "="*60) | |
| print("SYSTEM STATUS") | |
| print("="*60 + "\n") | |
| status = coordinator.get_system_status() | |
| print(f"Status: {status['status']}") | |
| print(f"\nActive Agents:") | |
| for agent, state in status['agents'].items(): | |
| print(f" - {agent.capitalize()}: {state}") | |
| print(f"\nKnowledge Graph:") | |
| for key, value in status['knowledge_graph'].items(): | |
| print(f" - {key}: {value}") | |
| coordinator.shutdown() | |