""" Batch Aggregation Service - College-level macro analysis Aggregates individual student scores into batch-level reports """ import logging import numpy as np from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict from datetime import datetime from collections import Counter logger = logging.getLogger(__name__) @dataclass class AggregateMetrics: """Batch-level aggregate metrics""" total_students: int avg_employability_score: float median_score: float std_dev: float placement_ready_pct: float # % with score >= 0.6 skill_diversity_index: int # Unique skills count avg_cgpa: float avg_internship_months: float @dataclass class AspectDistribution: """Distribution stats for an aspect""" aspect: str avg: float std: float min: float max: float top_10_pct_avg: float # Avg of top 10% bottom_10_pct_avg: float @dataclass class DomainBreakdown: """Domain-wise student distribution""" domain_id: str display_name: str count: int percentage: float avg_score: float @dataclass class SkillGap: """Skill gap analysis result""" skill: str demand_score: float students_with_skill: int students_pct: float gap_severity: str # 'critical', 'moderate', 'low' @dataclass class BatchRecommendation: """Recommendation for batch improvement""" category: str # 'curriculum', 'training', 'industry' priority: str # 'high', 'medium', 'low' recommendation: str impact: str class BatchAggregationService: """ Aggregates individual student data into college-level macro reports """ # Thresholds PLACEMENT_READY_THRESHOLD = 0.60 CRITICAL_GAP_THRESHOLD = 0.30 # < 30% students have skill MODERATE_GAP_THRESHOLD = 0.50 def __init__(self): # Industry demand mapping (can be loaded from external source) self.industry_demands = { 'python': 0.90, 'sql': 0.85, 'java': 0.80, 'javascript': 0.75, 'machine_learning': 0.70, 'cloud': 0.85, 'devops': 0.75, 'data_analysis': 0.70, 'system_design': 0.65, 'communication': 0.80, 'leadership': 0.60, 'teamwork': 0.75 } def aggregate_batch(self, students: List[Dict[str, Any]], college_name: str = "Unknown College", batch_year: int = None) -> Dict[str, Any]: """ Generate comprehensive batch report from student data Args: students: List of student score packets (from scoring endpoint) college_name: Name of the college batch_year: Graduation year Returns: Complete macro analysis report """ if not students: return self._empty_report(college_name, batch_year) batch_year = batch_year or datetime.now().year # Extract scores and features scores = [] cgpas = [] internship_months = [] all_skills = [] domain_counts = Counter() aspect_scores = { 'technical_skills': [], 'problem_solving': [], 'leadership': [], 'communication': [], 'teamwork': [], 'learning_agility': [] } for student in students: # Final score final_score = student.get('final_score', 0) scores.append(final_score) # Features features = student.get('detailed_features', {}) universal = features.get('universal', {}) text = features.get('text', {}) cgpas.append(universal.get('cgpa_norm', 0) * 10) # Denormalize internship_months.append(universal.get('internship_exposure', 0) * 12) # Domain domain = student.get('domain_type') or student.get('detected_domain', 'general') domain_counts[domain] += 1 # Aspect scores for aspect in aspect_scores: if aspect in text: aspect_scores[aspect].append(text[aspect]) elif aspect in universal: aspect_scores[aspect].append(universal[aspect]) # Skills (from raw student data if available) if 'skills' in student: skills = student['skills'] if isinstance(skills, str): skills = [s.strip().lower() for s in skills.split(',')] all_skills.extend(skills) # Compute aggregates aggregate = self._compute_aggregate_metrics( scores, cgpas, internship_months, all_skills ) # Aspect distributions aspects = self._compute_aspect_distributions(aspect_scores) # Domain breakdown domains = self._compute_domain_breakdown(domain_counts, students) # Skill gaps skill_gaps = self._analyze_skill_gaps(all_skills, len(students)) # Recommendations recommendations = self._generate_recommendations( aggregate, aspects, skill_gaps ) # Build report report = { 'report_id': f"BATCH_{batch_year}_{college_name[:3].upper()}", 'college_name': college_name, 'batch_year': batch_year, 'generated_at': datetime.utcnow().isoformat() + 'Z', 'total_students': len(students), 'aggregate_metrics': asdict(aggregate), 'score_distribution': self._compute_score_distribution(scores), 'aspect_analysis': [asdict(a) for a in aspects], 'domain_breakdown': [asdict(d) for d in domains], 'skill_gap_analysis': [asdict(g) for g in skill_gaps], 'recommendations': [asdict(r) for r in recommendations], 'percentile_bands': self._compute_percentile_bands(scores) } return report def _compute_aggregate_metrics(self, scores, cgpas, internship_months, skills) -> AggregateMetrics: """Compute high-level aggregate metrics""" scores_arr = np.array(scores) placement_ready = sum(1 for s in scores if s >= self.PLACEMENT_READY_THRESHOLD) placement_pct = (placement_ready / len(scores)) * 100 if scores else 0 return AggregateMetrics( total_students=len(scores), avg_employability_score=round(float(np.mean(scores_arr)), 3), median_score=round(float(np.median(scores_arr)), 3), std_dev=round(float(np.std(scores_arr)), 3), placement_ready_pct=round(placement_pct, 1), skill_diversity_index=len(set(skills)), avg_cgpa=round(float(np.mean(cgpas)) if cgpas else 0, 2), avg_internship_months=round(float(np.mean(internship_months)) if internship_months else 0, 1) ) def _compute_aspect_distributions(self, aspect_scores) -> List[AspectDistribution]: """Compute distribution stats for each aspect""" distributions = [] for aspect, scores in aspect_scores.items(): if not scores: continue arr = np.array(scores) top_10_idx = int(len(arr) * 0.1) or 1 bottom_10_idx = int(len(arr) * 0.1) or 1 sorted_arr = np.sort(arr) distributions.append(AspectDistribution( aspect=aspect, avg=round(float(np.mean(arr)), 3), std=round(float(np.std(arr)), 3), min=round(float(np.min(arr)), 3), max=round(float(np.max(arr)), 3), top_10_pct_avg=round(float(np.mean(sorted_arr[-top_10_idx:])), 3), bottom_10_pct_avg=round(float(np.mean(sorted_arr[:bottom_10_idx])), 3) )) return distributions def _compute_domain_breakdown(self, domain_counts, students) -> List[DomainBreakdown]: """Compute domain-wise breakdown""" breakdowns = [] total = len(students) for domain, count in domain_counts.most_common(): # Calculate avg score for this domain domain_scores = [ s.get('final_score', 0) for s in students if (s.get('domain_type') or s.get('detected_domain', 'general')) == domain ] avg_score = np.mean(domain_scores) if domain_scores else 0 breakdowns.append(DomainBreakdown( domain_id=domain, display_name=domain.replace('_', ' ').title(), count=count, percentage=round((count / total) * 100, 1), avg_score=round(float(avg_score), 3) )) return breakdowns def _analyze_skill_gaps(self, all_skills, total_students) -> List[SkillGap]: """Analyze skill gaps against industry demand""" skill_counts = Counter(all_skills) gaps = [] for skill, demand in self.industry_demands.items(): count = skill_counts.get(skill, 0) pct = (count / total_students) * 100 if total_students else 0 # Determine severity if pct < self.CRITICAL_GAP_THRESHOLD * 100: severity = 'critical' elif pct < self.MODERATE_GAP_THRESHOLD * 100: severity = 'moderate' else: severity = 'low' gaps.append(SkillGap( skill=skill, demand_score=demand, students_with_skill=count, students_pct=round(pct, 1), gap_severity=severity )) # Sort by demand * (1 - coverage) gaps.sort(key=lambda g: g.demand_score * (1 - g.students_pct/100), reverse=True) return gaps[:10] # Top 10 gaps def _generate_recommendations(self, aggregate, aspects, skill_gaps) -> List[BatchRecommendation]: """Generate actionable recommendations""" recommendations = [] # Critical skill gaps critical_gaps = [g for g in skill_gaps if g.gap_severity == 'critical'] for gap in critical_gaps[:3]: recommendations.append(BatchRecommendation( category='curriculum', priority='high', recommendation=f"Add {gap.skill.replace('_', ' ').title()} training to curriculum", impact=f"Only {gap.students_pct}% students have this in-demand skill" )) # Low placement readiness if aggregate.placement_ready_pct < 60: recommendations.append(BatchRecommendation( category='training', priority='high', recommendation="Implement intensive placement preparation program", impact=f"Only {aggregate.placement_ready_pct}% students are placement-ready" )) # Low internship exposure if aggregate.avg_internship_months < 3: recommendations.append(BatchRecommendation( category='industry', priority='medium', recommendation="Establish mandatory internship partnerships with industry", impact=f"Average internship exposure is only {aggregate.avg_internship_months} months" )) # Weak aspects for aspect in aspects: if aspect.avg < 0.5: recommendations.append(BatchRecommendation( category='training', priority='medium', recommendation=f"Conduct workshops on {aspect.aspect.replace('_', ' ').title()}", impact=f"Average score is {aspect.avg:.0%}, below acceptable threshold" )) return recommendations[:8] # Limit to 8 recommendations def _compute_score_distribution(self, scores) -> Dict[str, int]: """Compute score distribution by grade bands""" distribution = { 'A+ (90-100%)': 0, 'A (80-90%)': 0, 'B+ (70-80%)': 0, 'B (60-70%)': 0, 'C (50-60%)': 0, 'D (<50%)': 0 } for score in scores: pct = score * 100 if pct >= 90: distribution['A+ (90-100%)'] += 1 elif pct >= 80: distribution['A (80-90%)'] += 1 elif pct >= 70: distribution['B+ (70-80%)'] += 1 elif pct >= 60: distribution['B (60-70%)'] += 1 elif pct >= 50: distribution['C (50-60%)'] += 1 else: distribution['D (<50%)'] += 1 return distribution def _compute_percentile_bands(self, scores) -> Dict[str, float]: """Compute percentile thresholds""" if not scores: return {} arr = np.array(scores) return { 'p10': round(float(np.percentile(arr, 10)), 3), 'p25': round(float(np.percentile(arr, 25)), 3), 'p50': round(float(np.percentile(arr, 50)), 3), 'p75': round(float(np.percentile(arr, 75)), 3), 'p90': round(float(np.percentile(arr, 90)), 3) } def _empty_report(self, college_name: str, batch_year: int) -> Dict[str, Any]: """Generate empty report for no data""" return { 'report_id': f"BATCH_{batch_year or 'UNKNOWN'}_{college_name[:3].upper()}", 'college_name': college_name, 'batch_year': batch_year, 'generated_at': datetime.utcnow().isoformat() + 'Z', 'total_students': 0, 'error': 'No student data provided', 'aggregate_metrics': None, 'recommendations': [] } # Singleton _batch_service: Optional[BatchAggregationService] = None def get_batch_aggregation_service() -> BatchAggregationService: """Get singleton batch aggregation service""" global _batch_service if _batch_service is None: _batch_service = BatchAggregationService() return _batch_service