""" Skill-Based Workforce Optimization Algorithm This module optimizes team composition based on: 1. Maintaining required skill levels 2. Minimizing skill gap after reduction 3. Preserving critical capabilities """ import json from typing import Dict, List, Tuple, Any from itertools import combinations class SkillOptimizer: def __init__(self, team_data_path='team_data.json', employees_data_path='employees_data.json'): """Initialize the optimizer with team and employee data""" with open(team_data_path, 'r') as f: self.team_data = json.load(f) with open(employees_data_path, 'r') as f: self.employees_data = json.load(f) self.employees = self.employees_data['employees'] self.all_skills = self._get_all_skills() def _get_all_skills(self) -> List[str]: """Get list of all unique skills""" skills = set() for emp in self.employees: skills.update(emp['skills'].keys()) return sorted(list(skills)) def calculate_team_skill_averages(self, employee_list=None) -> Dict[str, float]: """ Calculate average skill levels across the team Args: employee_list: Specific list of employees to calculate for (default: all) Returns: Dictionary of skill -> average level """ if employee_list is None: employee_list = self.employees skill_totals = {skill: [] for skill in self.all_skills} for emp in employee_list: for skill, level in emp['skills'].items(): skill_totals[skill].append(level) skill_averages = {} for skill, levels in skill_totals.items(): if levels: skill_averages[skill] = sum(levels) / len(levels) else: skill_averages[skill] = 0.0 return skill_averages def calculate_skill_coverage(self, employee_list=None, threshold=3.0) -> Dict[str, Dict]: """ Calculate skill coverage metrics Returns dict with: - count: number of people with skill >= threshold - average: average level - max: highest level - experts: people with level >= 4.0 """ if employee_list is None: employee_list = self.employees coverage = {} for skill in self.all_skills: levels = [] experts = 0 qualified = 0 for emp in employee_list: level = emp['skills'].get(skill, 0) levels.append(level) if level >= 4.0: experts += 1 if level >= threshold: qualified += 1 coverage[skill] = { 'average': sum(levels) / len(levels) if levels else 0, 'max': max(levels) if levels else 0, 'qualified_count': qualified, 'expert_count': experts, 'total_with_skill': sum(1 for l in levels if l > 1.0) } return coverage def calculate_employee_value(self, employee: Dict, required_skills: Dict[str, float]) -> float: """ Calculate value score for an employee based on: 1. How well they meet required skills 2. Their unique/rare skills 3. Their overall skill level """ value = 0.0 # Factor 1: Meeting required skills for skill, required_level in required_skills.items(): emp_level = employee['skills'].get(skill, 0) if emp_level >= required_level: value += (emp_level - required_level + 1) * 2 # Bonus for exceeding else: value -= (required_level - emp_level) * 3 # Penalty for not meeting # Factor 2: Unique skills (check rarity) current_coverage = self.calculate_skill_coverage() for skill, level in employee['skills'].items(): if level >= 4.0: # Expert level # More valuable if few experts expert_count = current_coverage[skill]['expert_count'] if expert_count <= 2: value += 10 / expert_count elif expert_count <= 4: value += 5 / expert_count # Factor 3: Overall competency avg_skill = sum(employee['skills'].values()) / len(employee['skills']) value += avg_skill * 2 # Factor 4: Role criticality if employee['role_id'] in ['tech_lead', 'engineering_manager']: value += 20 # Leadership bonus elif employee['role_id'] in ['ml_engineer', 'senior_backend', 'senior_frontend']: value += 10 # Senior specialist bonus return value def identify_critical_employees(self, required_skills: Dict[str, float], top_n=10) -> List[Dict]: """ Identify employees critical for maintaining skill requirements """ employee_values = [] for emp in self.employees: value = self.calculate_employee_value(emp, required_skills) employee_values.append((value, emp)) # Sort by value (highest first) employee_values.sort(key=lambda x: x[0], reverse=True) return [emp for _, emp in employee_values[:top_n]] def calculate_skill_gap(self, remaining_employees: List[Dict], required_skills: Dict[str, float]) -> float: """ Calculate total skill gap for a team configuration Lower is better (0 = all requirements met) """ team_skills = self.calculate_team_skill_averages(remaining_employees) total_gap = 0.0 for skill, required_level in required_skills.items(): actual_level = team_skills.get(skill, 0) if actual_level < required_level: # Weight critical gaps more heavily gap = required_level - actual_level weight = 2.0 if required_level >= 4.0 else 1.0 total_gap += gap * weight return total_gap def greedy_optimization(self, reduction_count: int, required_skills: Dict[str, float]) -> Dict[str, Any]: """ Greedy algorithm: Remove employees with least impact on skill gaps """ remaining = self.employees.copy() removed = [] for _ in range(reduction_count): best_removal = None best_gap = float('inf') for i, emp in enumerate(remaining): # Skip critical roles if emp['role_id'] in ['engineering_manager']: continue # Test removing this employee test_remaining = remaining[:i] + remaining[i+1:] gap = self.calculate_skill_gap(test_remaining, required_skills) if gap < best_gap: best_gap = gap best_removal = i if best_removal is not None: removed.append(remaining.pop(best_removal)) return { 'remaining_employees': remaining, 'removed_employees': removed, 'final_gap': self.calculate_skill_gap(remaining, required_skills), 'algorithm': 'greedy' } def balanced_optimization(self, reduction_count: int, required_skills: Dict[str, float]) -> Dict[str, Any]: """ Balanced algorithm: Considers multiple factors including skill diversity """ # Calculate employee scores scores = [] for emp in self.employees: # Protect critical roles if emp['role_id'] in ['engineering_manager', 'tech_lead']: score = float('inf') # Never remove else: value = self.calculate_employee_value(emp, required_skills) redundancy = self._calculate_redundancy(emp) salary_factor = emp['salary'] / 100000 # Normalize salary # Combined score (lower is more likely to be removed) score = value - redundancy * 2 + salary_factor scores.append((score, emp)) # Sort by score (keep high scores) scores.sort(key=lambda x: x[0], reverse=True) # Select employees to keep keep_count = len(self.employees) - reduction_count remaining = [emp for _, emp in scores[:keep_count]] removed = [emp for _, emp in scores[keep_count:]] return { 'remaining_employees': remaining, 'removed_employees': removed, 'final_gap': self.calculate_skill_gap(remaining, required_skills), 'algorithm': 'balanced' } def _calculate_redundancy(self, employee: Dict) -> float: """Calculate how redundant an employee's skills are""" redundancy = 0.0 coverage = self.calculate_skill_coverage() for skill, level in employee['skills'].items(): if level >= 3.0: # More redundant if many people have this skill count = coverage[skill]['qualified_count'] if count > 5: redundancy += 2.0 elif count > 3: redundancy += 1.0 return redundancy def optimize(self, reduction_percentage: float, required_skills: Dict[str, float], algorithm='balanced') -> Dict[str, Any]: """ Main optimization function Args: reduction_percentage: Percentage to reduce (0-100) required_skills: Skill requirements (adjusted from intent parser) algorithm: 'greedy' or 'balanced' Returns: Optimization results """ reduction_count = int(len(self.employees) * (reduction_percentage / 100)) print(f"\n{'='*60}") print(f"OPTIMIZATION PARAMETERS") print(f"{'='*60}") print(f"Current team size: {len(self.employees)}") print(f"Reduction target: {reduction_percentage}% ({reduction_count} employees)") print(f"Algorithm: {algorithm}") # Calculate current state current_skills = self.calculate_team_skill_averages() current_coverage = self.calculate_skill_coverage() # Run optimization if algorithm == 'greedy': result = self.greedy_optimization(reduction_count, required_skills) else: result = self.balanced_optimization(reduction_count, required_skills) # Calculate new state new_skills = self.calculate_team_skill_averages(result['remaining_employees']) new_coverage = self.calculate_skill_coverage(result['remaining_employees']) # Add analysis to result result['algorithm'] = algorithm result['analysis'] = { 'team_size': { 'before': len(self.employees), 'after': len(result['remaining_employees']), 'reduced': len(result['removed_employees']) }, 'cost': { 'before': sum(e['salary'] for e in self.employees), 'after': sum(e['salary'] for e in result['remaining_employees']), 'saved': sum(e['salary'] for e in result['removed_employees']) }, 'skill_changes': {}, 'critical_impacts': [] } # Analyze skill impacts for skill in self.all_skills: before = current_skills.get(skill, 0) after = new_skills.get(skill, 0) required = required_skills.get(skill, 0) change = after - before if abs(change) > 0.1: result['analysis']['skill_changes'][skill] = { 'before': round(before, 2), 'after': round(after, 2), 'change': round(change, 2), 'required': required, 'meets_requirement': after >= required } if after < required and before >= required: result['analysis']['critical_impacts'].append( f"{skill}: Dropped below required level ({after:.1f} < {required:.1f})" ) return result def print_optimization_results(self, result: Dict[str, Any]): """Pretty print optimization results""" print(f"\n{'='*60}") print(f"OPTIMIZATION RESULTS") print(f"{'='*60}") analysis = result['analysis'] # Team changes print(f"\nTeam Size: {analysis['team_size']['before']} → {analysis['team_size']['after']} " f"(-{analysis['team_size']['reduced']})") print(f"Cost Savings: ${analysis['cost']['saved']:,.0f}/year") print(f"Skill Gap Score: {result['final_gap']:.2f}") # Removed employees print(f"\n{'='*60}") print("REMOVED EMPLOYEES:") print(f"{'='*60}") for emp in sorted(result['removed_employees'], key=lambda x: x['salary'], reverse=True): print(f" - {emp['name']:<20} ({emp['role']:<25}) ${emp['salary']:,}") # Skill impacts print(f"\n{'='*60}") print("SKILL IMPACTS:") print(f"{'='*60}") if analysis['critical_impacts']: print("\n🚨 CRITICAL IMPACTS:") for impact in analysis['critical_impacts']: print(f" - {impact}") print("\nSIGNIFICANT SKILL CHANGES:") for skill, data in sorted(analysis['skill_changes'].items(), key=lambda x: abs(x[1]['change']), reverse=True)[:10]: symbol = "āœ“" if data['meets_requirement'] else "āœ—" arrow = "↑" if data['change'] > 0 else "↓" print(f" {symbol} {skill:<25} {data['before']:.1f} → {data['after']:.1f} " f"({arrow}{abs(data['change']):.1f}) Required: {data['required']:.1f}") def main(): """Test the optimization algorithm""" optimizer = SkillOptimizer() # Example: Focus on backend and AI, reduce frontend required_skills = { "python": 3.5, "javascript": 2.0, "react": 2.0, "backend_design": 3.5, "system_architecture": 3.5, "api_development": 3.5, "database_management": 3.0, "devops": 3.0, "cloud_infrastructure": 3.0, "security": 2.5, "machine_learning": 3.0, "pytorch": 2.5, "tensorflow": 2.0, "data_engineering": 3.0, "data_analysis": 3.0, "communication": 3.0, "problem_solving": 3.5, "team_collaboration": 3.0, "leadership": 2.0, "project_management": 2.0, "frontend_optimization": 1.5, "mobile_development": 1.0, "testing": 2.5, "documentation": 2.5, "code_review": 3.0 } # Run optimization result = optimizer.optimize( reduction_percentage=20, required_skills=required_skills, algorithm='balanced' ) optimizer.print_optimization_results(result) # Save results with open('optimization_result.json', 'w') as f: # Convert employee objects to serializable format output = { 'parameters': { 'reduction_percentage': 20, 'algorithm': 'balanced' }, 'results': { 'removed_employees': [e['name'] for e in result['removed_employees']], 'remaining_count': len(result['remaining_employees']), 'skill_gap': result['final_gap'], 'cost_saved': result['analysis']['cost']['saved'] } } json.dump(output, f, indent=2) print("\nāœ… Results saved to optimization_result.json") if __name__ == "__main__": main()