Spandan Roy
Phase 2 Complete: Multi-Agent System with Neo4j and LangChain
9d276dc
"""
Assignment Agent: Intelligent Developer Routing with Workload Balancing
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from knowledge_graph.neo4j_client import Neo4jClient
from typing import Dict, List, Optional, Tuple
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AssignmentAgent:
"""
Agent responsible for:
1. Analyzing bug requirements and affected components
2. Finding developers with relevant expertise
3. Balancing workload across team members
4. Considering developer success rates and resolution times
5. Providing assignment recommendations with reasoning
"""
def __init__(self):
"""Initialize assignment agent with Neo4j client"""
self.neo4j = Neo4jClient()
self.connected = False
# Scoring weights for assignment algorithm
self.weights = {
'expertise': 0.40, # 40% weight on technical expertise
'workload': 0.30, # 30% weight on current workload
'success_rate': 0.20, # 20% weight on past success
'resolution_time': 0.10 # 10% weight on speed
}
def connect(self):
"""Connect to Neo4j knowledge graph"""
self.connected = self.neo4j.connect()
return self.connected
def close(self):
"""Close connections"""
if self.connected:
self.neo4j.close()
def calculate_assignment_score(self, developer: Dict, bug_priority: str) -> float:
"""
Calculate assignment score for a developer based on multiple factors
Scoring Formula:
score = (expertise * 0.4) + (workload_factor * 0.3) +
(success_rate * 0.2) + (speed_factor * 0.1)
Args:
developer: Developer data from knowledge graph
bug_priority: Bug priority (P0-P4)
Returns:
Float score between 0 and 1 (higher is better)
"""
# Expertise score (0-1) based on proficiency
expertise_score = developer.get('proficiency', 0.5)
# Workload score (0-1) - lower workload is better
current = developer.get('current_workload', 0)
capacity = developer.get('max_capacity', 5)
if capacity > 0:
workload_utilization = current / capacity
workload_score = 1.0 - workload_utilization
else:
workload_score = 0.0
# Success rate score (0-1)
success_score = developer.get('success_rate', 0.5)
# Speed score (0-1) - faster resolution is better
# Normalize: 8 hrs = 1.0, 24 hrs = 0.5, 48+ hrs = 0.0
avg_time = developer.get('avg_resolution_time', 24)
if avg_time <= 8:
speed_score = 1.0
elif avg_time <= 24:
speed_score = 0.5 + (24 - avg_time) / 32 # Linear interpolation
else:
speed_score = max(0.0, 0.5 - (avg_time - 24) / 48)
# Priority multipliers (urgent bugs favor speed and success)
if bug_priority == 'P0':
# Critical bugs: prioritize success rate and speed
self.weights['expertise'] = 0.35
self.weights['workload'] = 0.20
self.weights['success_rate'] = 0.30
self.weights['resolution_time'] = 0.15
elif bug_priority == 'P1':
# High priority: balanced approach
self.weights['expertise'] = 0.40
self.weights['workload'] = 0.25
self.weights['success_rate'] = 0.25
self.weights['resolution_time'] = 0.10
else:
# Normal/low priority: favor workload balance
self.weights['expertise'] = 0.35
self.weights['workload'] = 0.40
self.weights['success_rate'] = 0.15
self.weights['resolution_time'] = 0.10
# Calculate weighted score
final_score = (
expertise_score * self.weights['expertise'] +
workload_score * self.weights['workload'] +
success_score * self.weights['success_rate'] +
speed_score * self.weights['resolution_time']
)
return final_score
def find_best_developer(self, bug: Dict) -> Optional[Tuple[Dict, str]]:
"""
Find the best developer to assign a bug to
Args:
bug: Bug data including component, priority, category
Returns:
Tuple of (developer_data, reasoning) or None if no match found
"""
if not self.connected:
logger.error("Not connected to Neo4j")
return None
component_name = bug.get('component_name') or bug.get('component')
if not component_name:
logger.warning(f"Bug {bug.get('bug_id')} has no component specified")
return None
# Get developers with expertise in the affected component
developers = self.neo4j.get_developers_by_component(component_name)
if not developers:
logger.warning(f"No developers found for component: {component_name}")
return None
logger.info(f"Evaluating {len(developers)} developers for {bug.get('bug_id')}")
# Calculate scores for each developer
scored_developers = []
for dev in developers:
score = self.calculate_assignment_score(dev, bug.get('priority', 'P2'))
scored_developers.append((dev, score))
# Sort by score (descending)
scored_developers.sort(key=lambda x: x[1], reverse=True)
# Get best developer
best_dev, best_score = scored_developers[0]
# Generate reasoning
reasoning = self._generate_reasoning(
best_dev,
best_score,
bug.get('priority'),
component_name
)
logger.info(f"Best match: {best_dev['name']} (score: {best_score:.3f})")
return (best_dev, reasoning)
def _generate_reasoning(self, developer: Dict, score: float,
priority: str, component: str) -> str:
"""Generate human-readable reasoning for assignment"""
name = developer['name']
proficiency = developer.get('proficiency', 0)
workload = developer.get('current_workload', 0)
capacity = developer.get('max_capacity', 5)
success_rate = developer.get('success_rate', 0)
reasoning = f"""
**Assignment Recommendation: {name}** (Score: {score:.2f}/1.00)
**Key Factors:**
- **Expertise**: {proficiency:.0%} proficiency in {component} component
- **Availability**: Currently handling {workload}/{capacity} bugs ({(workload/capacity)*100:.0f}% capacity)
- **Track Record**: {success_rate:.0%} success rate on past assignments
- **Priority Match**: Selected for {priority} priority based on optimal skill-workload balance
**Why This Developer?**
{name} has demonstrated strong expertise in the {component} component and currently has
available capacity to take on this {priority} bug. Their proven track record and technical
proficiency make them the optimal choice for timely resolution.
"""
return reasoning.strip()
def assign_bug(self, bug_id: str) -> Optional[Dict]:
"""
Complete assignment workflow for a bug
Args:
bug_id: ID of the bug to assign
Returns:
Dict with assignment details or None if assignment failed
"""
if not self.connected:
logger.error("Not connected to Neo4j")
return None
# Get bug details
bugs = self.neo4j.get_all_open_bugs()
bug = next((b for b in bugs if b['bug_id'] == bug_id), None)
if not bug:
logger.error(f"Bug {bug_id} not found or already assigned")
return None
# Find best developer
result = self.find_best_developer(bug)
if not result:
logger.error(f"Could not find suitable developer for {bug_id}")
return None
developer, reasoning = result
# Assign in knowledge graph
success = self.neo4j.assign_bug_to_developer(
bug_id=bug_id,
dev_id=developer['dev_id'],
assignment_reason=reasoning
)
if success:
assignment_result = {
'bug_id': bug_id,
'bug_title': bug['title'],
'assigned_to': developer['name'],
'developer_id': developer['dev_id'],
'developer_email': developer['email'],
'reasoning': reasoning,
'priority': bug.get('priority'),
'component': bug.get('component_name') or bug.get('component')
}
logger.info(f"βœ… Successfully assigned {bug_id} to {developer['name']}")
return assignment_result
else:
logger.error(f"❌ Failed to assign {bug_id} in knowledge graph")
return None
def batch_assign_bugs(self) -> List[Dict]:
"""Assign all open bugs to appropriate developers"""
if not self.connected:
logger.error("Not connected to Neo4j")
return []
open_bugs = self.neo4j.get_all_open_bugs()
logger.info(f"Processing {len(open_bugs)} open bugs for assignment")
assignments = []
for bug in open_bugs:
result = self.assign_bug(bug['bug_id'])
if result:
assignments.append(result)
logger.info(f"βœ… Successfully assigned {len(assignments)} bugs")
return assignments
def get_workload_report(self) -> Dict:
"""Generate team workload distribution report"""
if not self.connected:
return {}
stats = self.neo4j.get_graph_statistics()
# Get detailed developer workloads
query = """
MATCH (d:Developer)
OPTIONAL MATCH (d)-[:ASSIGNED_TO]->(b:Bug {status: 'Assigned'})
WITH d, COUNT(b) AS active_bugs
RETURN d.name AS name,
d.current_workload AS workload,
d.max_capacity AS capacity,
active_bugs,
d.skill_level AS skill_level
ORDER BY d.current_workload DESC
"""
developers = self.neo4j.execute_query(query)
return {
'statistics': stats,
'developers': developers or []
}
# Example usage and testing
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
agent = AssignmentAgent()
if agent.connect():
print("\n🎯 Assignment Agent Initialized\n")
# Test 1: Get workload report
print("πŸ“Š Team Workload Report:")
report = agent.get_workload_report()
print(f" Total Developers: {report['statistics']['total_developers']}")
print(f" Available: {report['statistics']['available_developers']}")
print(f" Open Bugs: {report['statistics']['open_bugs']}\n")
print(" Developer Capacity:")
for dev in report['developers']:
print(f" - {dev['name']}: {dev['workload']}/{dev['capacity']} "
f"({dev['skill_level']})")
# Test 2: Assign a specific bug
print("\n\nπŸ” Assigning BUG-001...")
result = agent.assign_bug('BUG-001')
if result:
print(f"\nβœ… Assignment Successful!")
print(f" Bug: {result['bug_title']}")
print(f" Assigned To: {result['assigned_to']}")
print(f" Email: {result['developer_email']}")
print(f"\n{result['reasoning']}")
agent.close()