intelligent-bug-triage / agents /coordinator.py
Spandan Roy
Phase 2 Complete: Multi-Agent System with Neo4j and LangChain
9d276dc
"""
Multi-Agent Coordinator using LangChain
Orchestrates Triage and Assignment agents in a coordinated workflow
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from agents.triage.triage_agent import TriageAgent
from agents.assignment.assignment_agent import AssignmentAgent
from knowledge_graph.neo4j_client import Neo4jClient
from typing import Dict, List, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BugTriageCoordinator:
"""
Coordinates multiple agents to process bugs end-to-end:
1. Triage Agent: Classify and prioritize bug
2. Neo4j: Store bug in knowledge graph
3. Assignment Agent: Find and assign optimal developer
"""
def __init__(self):
"""Initialize all agents"""
self.triage_agent = None
self.assignment_agent = None
self.neo4j = None
self.initialized = False
def initialize(self) -> bool:
"""Initialize and connect all agents"""
try:
logger.info("πŸš€ Initializing Multi-Agent System...")
# Initialize Triage Agent
logger.info(" Loading Triage Agent...")
self.triage_agent = TriageAgent()
if not self.triage_agent.load_model():
logger.error(" ❌ Triage Agent failed to load")
return False
logger.info(" βœ… Triage Agent ready")
# Initialize Neo4j Client
logger.info(" Connecting to Neo4j...")
self.neo4j = Neo4jClient()
if not self.neo4j.connect():
logger.error(" ❌ Neo4j connection failed")
return False
logger.info(" βœ… Neo4j connected")
# Initialize Assignment Agent
logger.info(" Loading Assignment Agent...")
self.assignment_agent = AssignmentAgent()
if not self.assignment_agent.connect():
logger.error(" ❌ Assignment Agent failed to connect")
return False
logger.info(" βœ… Assignment Agent ready")
self.initialized = True
logger.info("πŸŽ‰ Multi-Agent System initialized successfully!\n")
return True
except Exception as e:
logger.error(f"❌ Initialization failed: {e}")
return False
def shutdown(self):
"""Close all agent connections"""
logger.info("Shutting down Multi-Agent System...")
if self.assignment_agent:
self.assignment_agent.close()
if self.neo4j:
self.neo4j.close()
logger.info("βœ… Shutdown complete")
def process_new_bug(self, title: str, description: str,
reporter: str = "user@example.com",
component: Optional[str] = None) -> Dict:
"""
Complete end-to-end bug processing workflow
Workflow:
1. Triage Agent classifies the bug
2. Create bug node in Neo4j knowledge graph
3. Link bug to component
4. Assignment Agent finds best developer
5. Assign bug to developer in graph
Args:
title: Bug title
description: Bug description
reporter: Email of person reporting bug
component: Optional component name (or will be inferred)
Returns:
Dict with complete processing results
"""
if not self.initialized:
return {
'success': False,
'error': 'Multi-Agent System not initialized'
}
try:
logger.info(f"\n{'='*60}")
logger.info(f"PROCESSING NEW BUG: {title}")
logger.info(f"{'='*60}\n")
# STEP 1: Triage Classification
logger.info("STEP 1: Running Triage Agent...")
triage_result = self.triage_agent.classify_bug(title, description)
if not triage_result:
return {
'success': False,
'error': 'Triage classification failed'
}
logger.info(f" Category: {triage_result['category']}")
logger.info(f" Priority: {triage_result['priority']}")
logger.info(f" Severity: {triage_result['severity']}")
logger.info(f" Confidence: {triage_result['category_confidence']:.2%}\n")
# STEP 2: Create Bug in Knowledge Graph
logger.info("STEP 2: Creating bug in Knowledge Graph...")
# Generate bug ID
stats = self.neo4j.get_graph_statistics()
bug_id = f"BUG-{str(stats['total_bugs'] + 1).zfill(3)}"
# Determine component (use provided or map from category)
if not component:
component_map = {
'UI': 'User Dashboard',
'Backend': 'API Gateway',
'API': 'API Gateway',
'Database': 'Database Layer',
'Performance': 'API Gateway'
}
component = component_map.get(triage_result['category'], 'API Gateway')
# Create bug data
bug_data = {
'id': bug_id,
'title': title,
'description': description,
'priority': triage_result['priority'],
'severity': triage_result['severity'],
'category': triage_result['category'],
'component': component,
'reporter': reporter,
'estimated_hours': 8 # Default estimate
}
created_bug = self.neo4j.create_bug(bug_data)
if not created_bug:
return {
'success': False,
'error': 'Failed to create bug in knowledge graph'
}
logger.info(f" βœ… Bug created: {bug_id}\n")
# STEP 3: Link to Component
logger.info("STEP 3: Linking bug to component...")
linked = self.neo4j.link_bug_to_component(bug_id, component)
if linked:
logger.info(f" βœ… Linked to {component}\n")
# STEP 4: Assignment
logger.info("STEP 4: Running Assignment Agent...")
assignment_result = self.assignment_agent.assign_bug(bug_id)
if not assignment_result:
return {
'success': False,
'bug_id': bug_id,
'triage': triage_result,
'error': 'Assignment failed - no suitable developer found'
}
logger.info(f" βœ… Assigned to: {assignment_result['assigned_to']}\n")
# STEP 5: Complete
logger.info("βœ… BUG PROCESSING COMPLETE\n")
return {
'success': True,
'bug_id': bug_id,
'triage': triage_result,
'assignment': assignment_result,
'workflow': {
'steps_completed': ['triage', 'graph_creation', 'component_linking', 'assignment'],
'processing_time': 'Complete'
}
}
except Exception as e:
logger.error(f"❌ Bug processing failed: {e}")
return {
'success': False,
'error': str(e)
}
def batch_process_bugs(self, bugs: List[Dict]) -> List[Dict]:
"""Process multiple bugs through the complete workflow"""
if not self.initialized:
logger.error("Multi-Agent System not initialized")
return []
logger.info(f"\nπŸ”„ Batch Processing {len(bugs)} bugs...\n")
results = []
for i, bug in enumerate(bugs, 1):
logger.info(f"\n--- Processing Bug {i}/{len(bugs)} ---")
result = self.process_new_bug(
title=bug['title'],
description=bug['description'],
reporter=bug.get('reporter', 'user@example.com'),
component=bug.get('component')
)
results.append(result)
# Summary
successful = sum(1 for r in results if r['success'])
logger.info(f"\n{'='*60}")
logger.info(f"BATCH PROCESSING COMPLETE")
logger.info(f" Successful: {successful}/{len(bugs)}")
logger.info(f" Failed: {len(bugs) - successful}/{len(bugs)}")
logger.info(f"{'='*60}\n")
return results
def get_system_status(self) -> Dict:
"""Get current system status and statistics"""
if not self.initialized:
return {'status': 'Not Initialized'}
stats = self.neo4j.get_graph_statistics()
workload = self.assignment_agent.get_workload_report()
return {
'status': 'Online',
'agents': {
'triage': 'Active',
'assignment': 'Active',
'resolution': 'Not Implemented',
'analytics': 'Not Implemented'
},
'knowledge_graph': stats,
'team_workload': workload
}
# Example usage and testing
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
coordinator = BugTriageCoordinator()
if coordinator.initialize():
# Test: Process a new bug end-to-end
print("\n" + "="*60)
print("TEST: Complete Bug Processing Workflow")
print("="*60 + "\n")
result = coordinator.process_new_bug(
title="Payment gateway returning 500 errors",
description="Users cannot complete purchases. Payment API throwing internal server errors on checkout.",
reporter="support@example.com",
component="API Gateway"
)
if result['success']:
print("\nβœ… WORKFLOW SUCCESSFUL!\n")
print(f"Bug ID: {result['bug_id']}")
print(f"Category: {result['triage']['category']}")
print(f"Priority: {result['triage']['priority']}")
print(f"Assigned To: {result['assignment']['assigned_to']}")
print(f"Developer Email: {result['assignment']['developer_email']}")
else:
print(f"\n❌ WORKFLOW FAILED: {result.get('error')}\n")
# Get system status
print("\n" + "="*60)
print("SYSTEM STATUS")
print("="*60 + "\n")
status = coordinator.get_system_status()
print(f"Status: {status['status']}")
print(f"\nActive Agents:")
for agent, state in status['agents'].items():
print(f" - {agent.capitalize()}: {state}")
print(f"\nKnowledge Graph:")
for key, value in status['knowledge_graph'].items():
print(f" - {key}: {value}")
coordinator.shutdown()