Progression-POC / skill_optimizer.py
RediM's picture
Upload 5 files
6d2c000 verified
Raw
History Blame Contribute Delete
16.4 kB
"""
Skill-Based Workforce Optimization Algorithm
This module optimizes team composition based on:
1. Maintaining required skill levels
2. Minimizing skill gap after reduction
3. Preserving critical capabilities
"""
import json
from typing import Dict, List, Tuple, Any
from itertools import combinations
class SkillOptimizer:
def __init__(self, team_data_path='team_data.json', employees_data_path='employees_data.json'):
"""Initialize the optimizer with team and employee data"""
with open(team_data_path, 'r') as f:
self.team_data = json.load(f)
with open(employees_data_path, 'r') as f:
self.employees_data = json.load(f)
self.employees = self.employees_data['employees']
self.all_skills = self._get_all_skills()
def _get_all_skills(self) -> List[str]:
"""Get list of all unique skills"""
skills = set()
for emp in self.employees:
skills.update(emp['skills'].keys())
return sorted(list(skills))
def calculate_team_skill_averages(self, employee_list=None) -> Dict[str, float]:
"""
Calculate average skill levels across the team
Args:
employee_list: Specific list of employees to calculate for (default: all)
Returns:
Dictionary of skill -> average level
"""
if employee_list is None:
employee_list = self.employees
skill_totals = {skill: [] for skill in self.all_skills}
for emp in employee_list:
for skill, level in emp['skills'].items():
skill_totals[skill].append(level)
skill_averages = {}
for skill, levels in skill_totals.items():
if levels:
skill_averages[skill] = sum(levels) / len(levels)
else:
skill_averages[skill] = 0.0
return skill_averages
def calculate_skill_coverage(self, employee_list=None, threshold=3.0) -> Dict[str, Dict]:
"""
Calculate skill coverage metrics
Returns dict with:
- count: number of people with skill >= threshold
- average: average level
- max: highest level
- experts: people with level >= 4.0
"""
if employee_list is None:
employee_list = self.employees
coverage = {}
for skill in self.all_skills:
levels = []
experts = 0
qualified = 0
for emp in employee_list:
level = emp['skills'].get(skill, 0)
levels.append(level)
if level >= 4.0:
experts += 1
if level >= threshold:
qualified += 1
coverage[skill] = {
'average': sum(levels) / len(levels) if levels else 0,
'max': max(levels) if levels else 0,
'qualified_count': qualified,
'expert_count': experts,
'total_with_skill': sum(1 for l in levels if l > 1.0)
}
return coverage
def calculate_employee_value(self, employee: Dict, required_skills: Dict[str, float]) -> float:
"""
Calculate value score for an employee based on:
1. How well they meet required skills
2. Their unique/rare skills
3. Their overall skill level
"""
value = 0.0
# Factor 1: Meeting required skills
for skill, required_level in required_skills.items():
emp_level = employee['skills'].get(skill, 0)
if emp_level >= required_level:
value += (emp_level - required_level + 1) * 2 # Bonus for exceeding
else:
value -= (required_level - emp_level) * 3 # Penalty for not meeting
# Factor 2: Unique skills (check rarity)
current_coverage = self.calculate_skill_coverage()
for skill, level in employee['skills'].items():
if level >= 4.0: # Expert level
# More valuable if few experts
expert_count = current_coverage[skill]['expert_count']
if expert_count <= 2:
value += 10 / expert_count
elif expert_count <= 4:
value += 5 / expert_count
# Factor 3: Overall competency
avg_skill = sum(employee['skills'].values()) / len(employee['skills'])
value += avg_skill * 2
# Factor 4: Role criticality
if employee['role_id'] in ['tech_lead', 'engineering_manager']:
value += 20 # Leadership bonus
elif employee['role_id'] in ['ml_engineer', 'senior_backend', 'senior_frontend']:
value += 10 # Senior specialist bonus
return value
def identify_critical_employees(self, required_skills: Dict[str, float], top_n=10) -> List[Dict]:
"""
Identify employees critical for maintaining skill requirements
"""
employee_values = []
for emp in self.employees:
value = self.calculate_employee_value(emp, required_skills)
employee_values.append((value, emp))
# Sort by value (highest first)
employee_values.sort(key=lambda x: x[0], reverse=True)
return [emp for _, emp in employee_values[:top_n]]
def calculate_skill_gap(self, remaining_employees: List[Dict],
required_skills: Dict[str, float]) -> float:
"""
Calculate total skill gap for a team configuration
Lower is better (0 = all requirements met)
"""
team_skills = self.calculate_team_skill_averages(remaining_employees)
total_gap = 0.0
for skill, required_level in required_skills.items():
actual_level = team_skills.get(skill, 0)
if actual_level < required_level:
# Weight critical gaps more heavily
gap = required_level - actual_level
weight = 2.0 if required_level >= 4.0 else 1.0
total_gap += gap * weight
return total_gap
def greedy_optimization(self, reduction_count: int,
required_skills: Dict[str, float]) -> Dict[str, Any]:
"""
Greedy algorithm: Remove employees with least impact on skill gaps
"""
remaining = self.employees.copy()
removed = []
for _ in range(reduction_count):
best_removal = None
best_gap = float('inf')
for i, emp in enumerate(remaining):
# Skip critical roles
if emp['role_id'] in ['engineering_manager']:
continue
# Test removing this employee
test_remaining = remaining[:i] + remaining[i+1:]
gap = self.calculate_skill_gap(test_remaining, required_skills)
if gap < best_gap:
best_gap = gap
best_removal = i
if best_removal is not None:
removed.append(remaining.pop(best_removal))
return {
'remaining_employees': remaining,
'removed_employees': removed,
'final_gap': self.calculate_skill_gap(remaining, required_skills),
'algorithm': 'greedy'
}
def balanced_optimization(self, reduction_count: int,
required_skills: Dict[str, float]) -> Dict[str, Any]:
"""
Balanced algorithm: Considers multiple factors including skill diversity
"""
# Calculate employee scores
scores = []
for emp in self.employees:
# Protect critical roles
if emp['role_id'] in ['engineering_manager', 'tech_lead']:
score = float('inf') # Never remove
else:
value = self.calculate_employee_value(emp, required_skills)
redundancy = self._calculate_redundancy(emp)
salary_factor = emp['salary'] / 100000 # Normalize salary
# Combined score (lower is more likely to be removed)
score = value - redundancy * 2 + salary_factor
scores.append((score, emp))
# Sort by score (keep high scores)
scores.sort(key=lambda x: x[0], reverse=True)
# Select employees to keep
keep_count = len(self.employees) - reduction_count
remaining = [emp for _, emp in scores[:keep_count]]
removed = [emp for _, emp in scores[keep_count:]]
return {
'remaining_employees': remaining,
'removed_employees': removed,
'final_gap': self.calculate_skill_gap(remaining, required_skills),
'algorithm': 'balanced'
}
def _calculate_redundancy(self, employee: Dict) -> float:
"""Calculate how redundant an employee's skills are"""
redundancy = 0.0
coverage = self.calculate_skill_coverage()
for skill, level in employee['skills'].items():
if level >= 3.0:
# More redundant if many people have this skill
count = coverage[skill]['qualified_count']
if count > 5:
redundancy += 2.0
elif count > 3:
redundancy += 1.0
return redundancy
def optimize(self, reduction_percentage: float,
required_skills: Dict[str, float],
algorithm='balanced') -> Dict[str, Any]:
"""
Main optimization function
Args:
reduction_percentage: Percentage to reduce (0-100)
required_skills: Skill requirements (adjusted from intent parser)
algorithm: 'greedy' or 'balanced'
Returns:
Optimization results
"""
reduction_count = int(len(self.employees) * (reduction_percentage / 100))
print(f"\n{'='*60}")
print(f"OPTIMIZATION PARAMETERS")
print(f"{'='*60}")
print(f"Current team size: {len(self.employees)}")
print(f"Reduction target: {reduction_percentage}% ({reduction_count} employees)")
print(f"Algorithm: {algorithm}")
# Calculate current state
current_skills = self.calculate_team_skill_averages()
current_coverage = self.calculate_skill_coverage()
# Run optimization
if algorithm == 'greedy':
result = self.greedy_optimization(reduction_count, required_skills)
else:
result = self.balanced_optimization(reduction_count, required_skills)
# Calculate new state
new_skills = self.calculate_team_skill_averages(result['remaining_employees'])
new_coverage = self.calculate_skill_coverage(result['remaining_employees'])
# Add analysis to result
result['algorithm'] = algorithm
result['analysis'] = {
'team_size': {
'before': len(self.employees),
'after': len(result['remaining_employees']),
'reduced': len(result['removed_employees'])
},
'cost': {
'before': sum(e['salary'] for e in self.employees),
'after': sum(e['salary'] for e in result['remaining_employees']),
'saved': sum(e['salary'] for e in result['removed_employees'])
},
'skill_changes': {},
'critical_impacts': []
}
# Analyze skill impacts
for skill in self.all_skills:
before = current_skills.get(skill, 0)
after = new_skills.get(skill, 0)
required = required_skills.get(skill, 0)
change = after - before
if abs(change) > 0.1:
result['analysis']['skill_changes'][skill] = {
'before': round(before, 2),
'after': round(after, 2),
'change': round(change, 2),
'required': required,
'meets_requirement': after >= required
}
if after < required and before >= required:
result['analysis']['critical_impacts'].append(
f"{skill}: Dropped below required level ({after:.1f} < {required:.1f})"
)
return result
def print_optimization_results(self, result: Dict[str, Any]):
"""Pretty print optimization results"""
print(f"\n{'='*60}")
print(f"OPTIMIZATION RESULTS")
print(f"{'='*60}")
analysis = result['analysis']
# Team changes
print(f"\nTeam Size: {analysis['team_size']['before']} β†’ {analysis['team_size']['after']} "
f"(-{analysis['team_size']['reduced']})")
print(f"Cost Savings: ${analysis['cost']['saved']:,.0f}/year")
print(f"Skill Gap Score: {result['final_gap']:.2f}")
# Removed employees
print(f"\n{'='*60}")
print("REMOVED EMPLOYEES:")
print(f"{'='*60}")
for emp in sorted(result['removed_employees'], key=lambda x: x['salary'], reverse=True):
print(f" - {emp['name']:<20} ({emp['role']:<25}) ${emp['salary']:,}")
# Skill impacts
print(f"\n{'='*60}")
print("SKILL IMPACTS:")
print(f"{'='*60}")
if analysis['critical_impacts']:
print("\n🚨 CRITICAL IMPACTS:")
for impact in analysis['critical_impacts']:
print(f" - {impact}")
print("\nSIGNIFICANT SKILL CHANGES:")
for skill, data in sorted(analysis['skill_changes'].items(),
key=lambda x: abs(x[1]['change']), reverse=True)[:10]:
symbol = "βœ“" if data['meets_requirement'] else "βœ—"
arrow = "↑" if data['change'] > 0 else "↓"
print(f" {symbol} {skill:<25} {data['before']:.1f} β†’ {data['after']:.1f} "
f"({arrow}{abs(data['change']):.1f}) Required: {data['required']:.1f}")
def main():
"""Test the optimization algorithm"""
optimizer = SkillOptimizer()
# Example: Focus on backend and AI, reduce frontend
required_skills = {
"python": 3.5,
"javascript": 2.0,
"react": 2.0,
"backend_design": 3.5,
"system_architecture": 3.5,
"api_development": 3.5,
"database_management": 3.0,
"devops": 3.0,
"cloud_infrastructure": 3.0,
"security": 2.5,
"machine_learning": 3.0,
"pytorch": 2.5,
"tensorflow": 2.0,
"data_engineering": 3.0,
"data_analysis": 3.0,
"communication": 3.0,
"problem_solving": 3.5,
"team_collaboration": 3.0,
"leadership": 2.0,
"project_management": 2.0,
"frontend_optimization": 1.5,
"mobile_development": 1.0,
"testing": 2.5,
"documentation": 2.5,
"code_review": 3.0
}
# Run optimization
result = optimizer.optimize(
reduction_percentage=20,
required_skills=required_skills,
algorithm='balanced'
)
optimizer.print_optimization_results(result)
# Save results
with open('optimization_result.json', 'w') as f:
# Convert employee objects to serializable format
output = {
'parameters': {
'reduction_percentage': 20,
'algorithm': 'balanced'
},
'results': {
'removed_employees': [e['name'] for e in result['removed_employees']],
'remaining_count': len(result['remaining_employees']),
'skill_gap': result['final_gap'],
'cost_saved': result['analysis']['cost']['saved']
}
}
json.dump(output, f, indent=2)
print("\nβœ… Results saved to optimization_result.json")
if __name__ == "__main__":
main()