Spaces:
Sleeping
Sleeping
| """ | |
| Skill-Based Workforce Optimization Algorithm | |
| This module optimizes team composition based on: | |
| 1. Maintaining required skill levels | |
| 2. Minimizing skill gap after reduction | |
| 3. Preserving critical capabilities | |
| """ | |
| import json | |
| from typing import Dict, List, Tuple, Any | |
| from itertools import combinations | |
| class SkillOptimizer: | |
| def __init__(self, team_data_path='team_data.json', employees_data_path='employees_data.json'): | |
| """Initialize the optimizer with team and employee data""" | |
| with open(team_data_path, 'r') as f: | |
| self.team_data = json.load(f) | |
| with open(employees_data_path, 'r') as f: | |
| self.employees_data = json.load(f) | |
| self.employees = self.employees_data['employees'] | |
| self.all_skills = self._get_all_skills() | |
| def _get_all_skills(self) -> List[str]: | |
| """Get list of all unique skills""" | |
| skills = set() | |
| for emp in self.employees: | |
| skills.update(emp['skills'].keys()) | |
| return sorted(list(skills)) | |
| def calculate_team_skill_averages(self, employee_list=None) -> Dict[str, float]: | |
| """ | |
| Calculate average skill levels across the team | |
| Args: | |
| employee_list: Specific list of employees to calculate for (default: all) | |
| Returns: | |
| Dictionary of skill -> average level | |
| """ | |
| if employee_list is None: | |
| employee_list = self.employees | |
| skill_totals = {skill: [] for skill in self.all_skills} | |
| for emp in employee_list: | |
| for skill, level in emp['skills'].items(): | |
| skill_totals[skill].append(level) | |
| skill_averages = {} | |
| for skill, levels in skill_totals.items(): | |
| if levels: | |
| skill_averages[skill] = sum(levels) / len(levels) | |
| else: | |
| skill_averages[skill] = 0.0 | |
| return skill_averages | |
| def calculate_skill_coverage(self, employee_list=None, threshold=3.0) -> Dict[str, Dict]: | |
| """ | |
| Calculate skill coverage metrics | |
| Returns dict with: | |
| - count: number of people with skill >= threshold | |
| - average: average level | |
| - max: highest level | |
| - experts: people with level >= 4.0 | |
| """ | |
| if employee_list is None: | |
| employee_list = self.employees | |
| coverage = {} | |
| for skill in self.all_skills: | |
| levels = [] | |
| experts = 0 | |
| qualified = 0 | |
| for emp in employee_list: | |
| level = emp['skills'].get(skill, 0) | |
| levels.append(level) | |
| if level >= 4.0: | |
| experts += 1 | |
| if level >= threshold: | |
| qualified += 1 | |
| coverage[skill] = { | |
| 'average': sum(levels) / len(levels) if levels else 0, | |
| 'max': max(levels) if levels else 0, | |
| 'qualified_count': qualified, | |
| 'expert_count': experts, | |
| 'total_with_skill': sum(1 for l in levels if l > 1.0) | |
| } | |
| return coverage | |
| def calculate_employee_value(self, employee: Dict, required_skills: Dict[str, float]) -> float: | |
| """ | |
| Calculate value score for an employee based on: | |
| 1. How well they meet required skills | |
| 2. Their unique/rare skills | |
| 3. Their overall skill level | |
| """ | |
| value = 0.0 | |
| # Factor 1: Meeting required skills | |
| for skill, required_level in required_skills.items(): | |
| emp_level = employee['skills'].get(skill, 0) | |
| if emp_level >= required_level: | |
| value += (emp_level - required_level + 1) * 2 # Bonus for exceeding | |
| else: | |
| value -= (required_level - emp_level) * 3 # Penalty for not meeting | |
| # Factor 2: Unique skills (check rarity) | |
| current_coverage = self.calculate_skill_coverage() | |
| for skill, level in employee['skills'].items(): | |
| if level >= 4.0: # Expert level | |
| # More valuable if few experts | |
| expert_count = current_coverage[skill]['expert_count'] | |
| if expert_count <= 2: | |
| value += 10 / expert_count | |
| elif expert_count <= 4: | |
| value += 5 / expert_count | |
| # Factor 3: Overall competency | |
| avg_skill = sum(employee['skills'].values()) / len(employee['skills']) | |
| value += avg_skill * 2 | |
| # Factor 4: Role criticality | |
| if employee['role_id'] in ['tech_lead', 'engineering_manager']: | |
| value += 20 # Leadership bonus | |
| elif employee['role_id'] in ['ml_engineer', 'senior_backend', 'senior_frontend']: | |
| value += 10 # Senior specialist bonus | |
| return value | |
| def identify_critical_employees(self, required_skills: Dict[str, float], top_n=10) -> List[Dict]: | |
| """ | |
| Identify employees critical for maintaining skill requirements | |
| """ | |
| employee_values = [] | |
| for emp in self.employees: | |
| value = self.calculate_employee_value(emp, required_skills) | |
| employee_values.append((value, emp)) | |
| # Sort by value (highest first) | |
| employee_values.sort(key=lambda x: x[0], reverse=True) | |
| return [emp for _, emp in employee_values[:top_n]] | |
| def calculate_skill_gap(self, remaining_employees: List[Dict], | |
| required_skills: Dict[str, float]) -> float: | |
| """ | |
| Calculate total skill gap for a team configuration | |
| Lower is better (0 = all requirements met) | |
| """ | |
| team_skills = self.calculate_team_skill_averages(remaining_employees) | |
| total_gap = 0.0 | |
| for skill, required_level in required_skills.items(): | |
| actual_level = team_skills.get(skill, 0) | |
| if actual_level < required_level: | |
| # Weight critical gaps more heavily | |
| gap = required_level - actual_level | |
| weight = 2.0 if required_level >= 4.0 else 1.0 | |
| total_gap += gap * weight | |
| return total_gap | |
| def greedy_optimization(self, reduction_count: int, | |
| required_skills: Dict[str, float]) -> Dict[str, Any]: | |
| """ | |
| Greedy algorithm: Remove employees with least impact on skill gaps | |
| """ | |
| remaining = self.employees.copy() | |
| removed = [] | |
| for _ in range(reduction_count): | |
| best_removal = None | |
| best_gap = float('inf') | |
| for i, emp in enumerate(remaining): | |
| # Skip critical roles | |
| if emp['role_id'] in ['engineering_manager']: | |
| continue | |
| # Test removing this employee | |
| test_remaining = remaining[:i] + remaining[i+1:] | |
| gap = self.calculate_skill_gap(test_remaining, required_skills) | |
| if gap < best_gap: | |
| best_gap = gap | |
| best_removal = i | |
| if best_removal is not None: | |
| removed.append(remaining.pop(best_removal)) | |
| return { | |
| 'remaining_employees': remaining, | |
| 'removed_employees': removed, | |
| 'final_gap': self.calculate_skill_gap(remaining, required_skills), | |
| 'algorithm': 'greedy' | |
| } | |
| def balanced_optimization(self, reduction_count: int, | |
| required_skills: Dict[str, float]) -> Dict[str, Any]: | |
| """ | |
| Balanced algorithm: Considers multiple factors including skill diversity | |
| """ | |
| # Calculate employee scores | |
| scores = [] | |
| for emp in self.employees: | |
| # Protect critical roles | |
| if emp['role_id'] in ['engineering_manager', 'tech_lead']: | |
| score = float('inf') # Never remove | |
| else: | |
| value = self.calculate_employee_value(emp, required_skills) | |
| redundancy = self._calculate_redundancy(emp) | |
| salary_factor = emp['salary'] / 100000 # Normalize salary | |
| # Combined score (lower is more likely to be removed) | |
| score = value - redundancy * 2 + salary_factor | |
| scores.append((score, emp)) | |
| # Sort by score (keep high scores) | |
| scores.sort(key=lambda x: x[0], reverse=True) | |
| # Select employees to keep | |
| keep_count = len(self.employees) - reduction_count | |
| remaining = [emp for _, emp in scores[:keep_count]] | |
| removed = [emp for _, emp in scores[keep_count:]] | |
| return { | |
| 'remaining_employees': remaining, | |
| 'removed_employees': removed, | |
| 'final_gap': self.calculate_skill_gap(remaining, required_skills), | |
| 'algorithm': 'balanced' | |
| } | |
| def _calculate_redundancy(self, employee: Dict) -> float: | |
| """Calculate how redundant an employee's skills are""" | |
| redundancy = 0.0 | |
| coverage = self.calculate_skill_coverage() | |
| for skill, level in employee['skills'].items(): | |
| if level >= 3.0: | |
| # More redundant if many people have this skill | |
| count = coverage[skill]['qualified_count'] | |
| if count > 5: | |
| redundancy += 2.0 | |
| elif count > 3: | |
| redundancy += 1.0 | |
| return redundancy | |
| def optimize(self, reduction_percentage: float, | |
| required_skills: Dict[str, float], | |
| algorithm='balanced') -> Dict[str, Any]: | |
| """ | |
| Main optimization function | |
| Args: | |
| reduction_percentage: Percentage to reduce (0-100) | |
| required_skills: Skill requirements (adjusted from intent parser) | |
| algorithm: 'greedy' or 'balanced' | |
| Returns: | |
| Optimization results | |
| """ | |
| reduction_count = int(len(self.employees) * (reduction_percentage / 100)) | |
| print(f"\n{'='*60}") | |
| print(f"OPTIMIZATION PARAMETERS") | |
| print(f"{'='*60}") | |
| print(f"Current team size: {len(self.employees)}") | |
| print(f"Reduction target: {reduction_percentage}% ({reduction_count} employees)") | |
| print(f"Algorithm: {algorithm}") | |
| # Calculate current state | |
| current_skills = self.calculate_team_skill_averages() | |
| current_coverage = self.calculate_skill_coverage() | |
| # Run optimization | |
| if algorithm == 'greedy': | |
| result = self.greedy_optimization(reduction_count, required_skills) | |
| else: | |
| result = self.balanced_optimization(reduction_count, required_skills) | |
| # Calculate new state | |
| new_skills = self.calculate_team_skill_averages(result['remaining_employees']) | |
| new_coverage = self.calculate_skill_coverage(result['remaining_employees']) | |
| # Add analysis to result | |
| result['algorithm'] = algorithm | |
| result['analysis'] = { | |
| 'team_size': { | |
| 'before': len(self.employees), | |
| 'after': len(result['remaining_employees']), | |
| 'reduced': len(result['removed_employees']) | |
| }, | |
| 'cost': { | |
| 'before': sum(e['salary'] for e in self.employees), | |
| 'after': sum(e['salary'] for e in result['remaining_employees']), | |
| 'saved': sum(e['salary'] for e in result['removed_employees']) | |
| }, | |
| 'skill_changes': {}, | |
| 'critical_impacts': [] | |
| } | |
| # Analyze skill impacts | |
| for skill in self.all_skills: | |
| before = current_skills.get(skill, 0) | |
| after = new_skills.get(skill, 0) | |
| required = required_skills.get(skill, 0) | |
| change = after - before | |
| if abs(change) > 0.1: | |
| result['analysis']['skill_changes'][skill] = { | |
| 'before': round(before, 2), | |
| 'after': round(after, 2), | |
| 'change': round(change, 2), | |
| 'required': required, | |
| 'meets_requirement': after >= required | |
| } | |
| if after < required and before >= required: | |
| result['analysis']['critical_impacts'].append( | |
| f"{skill}: Dropped below required level ({after:.1f} < {required:.1f})" | |
| ) | |
| return result | |
| def print_optimization_results(self, result: Dict[str, Any]): | |
| """Pretty print optimization results""" | |
| print(f"\n{'='*60}") | |
| print(f"OPTIMIZATION RESULTS") | |
| print(f"{'='*60}") | |
| analysis = result['analysis'] | |
| # Team changes | |
| print(f"\nTeam Size: {analysis['team_size']['before']} β {analysis['team_size']['after']} " | |
| f"(-{analysis['team_size']['reduced']})") | |
| print(f"Cost Savings: ${analysis['cost']['saved']:,.0f}/year") | |
| print(f"Skill Gap Score: {result['final_gap']:.2f}") | |
| # Removed employees | |
| print(f"\n{'='*60}") | |
| print("REMOVED EMPLOYEES:") | |
| print(f"{'='*60}") | |
| for emp in sorted(result['removed_employees'], key=lambda x: x['salary'], reverse=True): | |
| print(f" - {emp['name']:<20} ({emp['role']:<25}) ${emp['salary']:,}") | |
| # Skill impacts | |
| print(f"\n{'='*60}") | |
| print("SKILL IMPACTS:") | |
| print(f"{'='*60}") | |
| if analysis['critical_impacts']: | |
| print("\nπ¨ CRITICAL IMPACTS:") | |
| for impact in analysis['critical_impacts']: | |
| print(f" - {impact}") | |
| print("\nSIGNIFICANT SKILL CHANGES:") | |
| for skill, data in sorted(analysis['skill_changes'].items(), | |
| key=lambda x: abs(x[1]['change']), reverse=True)[:10]: | |
| symbol = "β" if data['meets_requirement'] else "β" | |
| arrow = "β" if data['change'] > 0 else "β" | |
| print(f" {symbol} {skill:<25} {data['before']:.1f} β {data['after']:.1f} " | |
| f"({arrow}{abs(data['change']):.1f}) Required: {data['required']:.1f}") | |
| def main(): | |
| """Test the optimization algorithm""" | |
| optimizer = SkillOptimizer() | |
| # Example: Focus on backend and AI, reduce frontend | |
| required_skills = { | |
| "python": 3.5, | |
| "javascript": 2.0, | |
| "react": 2.0, | |
| "backend_design": 3.5, | |
| "system_architecture": 3.5, | |
| "api_development": 3.5, | |
| "database_management": 3.0, | |
| "devops": 3.0, | |
| "cloud_infrastructure": 3.0, | |
| "security": 2.5, | |
| "machine_learning": 3.0, | |
| "pytorch": 2.5, | |
| "tensorflow": 2.0, | |
| "data_engineering": 3.0, | |
| "data_analysis": 3.0, | |
| "communication": 3.0, | |
| "problem_solving": 3.5, | |
| "team_collaboration": 3.0, | |
| "leadership": 2.0, | |
| "project_management": 2.0, | |
| "frontend_optimization": 1.5, | |
| "mobile_development": 1.0, | |
| "testing": 2.5, | |
| "documentation": 2.5, | |
| "code_review": 3.0 | |
| } | |
| # Run optimization | |
| result = optimizer.optimize( | |
| reduction_percentage=20, | |
| required_skills=required_skills, | |
| algorithm='balanced' | |
| ) | |
| optimizer.print_optimization_results(result) | |
| # Save results | |
| with open('optimization_result.json', 'w') as f: | |
| # Convert employee objects to serializable format | |
| output = { | |
| 'parameters': { | |
| 'reduction_percentage': 20, | |
| 'algorithm': 'balanced' | |
| }, | |
| 'results': { | |
| 'removed_employees': [e['name'] for e in result['removed_employees']], | |
| 'remaining_count': len(result['remaining_employees']), | |
| 'skill_gap': result['final_gap'], | |
| 'cost_saved': result['analysis']['cost']['saved'] | |
| } | |
| } | |
| json.dump(output, f, indent=2) | |
| print("\nβ Results saved to optimization_result.json") | |
| if __name__ == "__main__": | |
| main() | |