"""Business/Finance Domain Plugin Scores business competency based on: - Resume content (ATS-style keyword matching) - Case study submission analysis - Excel/analytical test scores - Internship experience in business domains """ import re import time import logging from typing import Dict, List from .base_plugin import BaseDomainPlugin, DomainScore from .plugin_factory import register_plugin logger = logging.getLogger(__name__) @register_plugin('business') class BusinessPlugin(BaseDomainPlugin): """Business/Finance domain scoring plugin""" def __init__(self): super().__init__() # Business-relevant keywords self.business_keywords = { 'consulting': ['consulting', 'consultant', 'advisory', 'strategy', 'mckinsey', 'bain', 'bcg'], 'finance': ['finance', 'banking', 'investment', 'equity', 'portfolio', 'analyst', 'goldman', 'morgan'], 'analytics': ['data analysis', 'business intelligence', 'tableau', 'power bi', 'sql', 'excel'], 'management': ['project management', 'product management', 'stakeholder', 'agile', 'scrum'], 'sales': ['sales', 'business development', 'client acquisition', 'revenue', 'crm'], 'operations': ['operations', 'supply chain', 'logistics', 'process improvement', 'lean', 'six sigma'] } def _get_domain_type(self) -> str: return 'business' def _get_feature_weights(self) -> Dict[str, float]: return { 'resume_keyword_score': 0.30, 'internship_relevance': 0.25, 'case_study_score': 0.20, 'excel_test_score': 0.15, 'business_depth': 0.10 } def get_required_fields(self) -> List[str]: return ['resume_text'] # Resume text (extracted from PDF) def get_optional_fields(self) -> List[str]: return ['case_study_text', 'excel_test_score', 'internship_descriptions'] def score(self, evidence_data: Dict) -> DomainScore: """Calculate business domain score""" start_time = time.time() features = {} # Resume keyword analysis resume_text = evidence_data.get('resume_text', '') if resume_text: features['resume_keyword_score'] = self._analyze_resume_keywords(resume_text) features['internship_relevance'] = self._extract_internship_relevance(resume_text) features['business_depth'] = self._assess_business_depth(resume_text) else: features['resume_keyword_score'] = 0.0 features['internship_relevance'] = 0.0 features['business_depth'] = 0.0 # Case study analysis case_study = evidence_data.get('case_study_text', '') if case_study: features['case_study_score'] = self._analyze_case_study(case_study) else: features['case_study_score'] = 0.0 # Excel test score (normalized 0-100 to 0-1) excel_score = evidence_data.get('excel_test_score', 0) features['excel_test_score'] = min(excel_score / 100, 1.0) if excel_score else 0.0 # Calculate weighted score score = sum(features[k] * self.feature_weights[k] for k in features.keys()) # Calculate confidence confidence = self.calculate_confidence(evidence_data) processing_time = (time.time() - start_time) * 1000 return DomainScore( domain_type='business', score=min(score, 1.0), confidence=confidence, raw_features=features, processing_time_ms=processing_time ) def _analyze_resume_keywords(self, resume_text: str) -> float: """ ATS-style keyword matching for business roles Returns: 0-1 score based on keyword density and relevance """ text_lower = resume_text.lower() # Count keywords in each category category_scores = {} for category, keywords in self.business_keywords.items(): matches = sum(1 for kw in keywords if kw in text_lower) category_scores[category] = min(matches / len(keywords), 1.0) # Average across categories with some categories weighted more weights = { 'consulting': 0.20, 'finance': 0.20, 'analytics': 0.20, 'management': 0.15, 'sales': 0.15, 'operations': 0.10 } score = sum(category_scores.get(cat, 0) * weight for cat, weight in weights.items()) logger.info(f"Resume keyword score: {score:.2f} (categories: {category_scores})") return score def _extract_internship_relevance(self, resume_text: str) -> float: """ Extract and score internship relevance to business Returns: 0-1 score based on business-related internships """ text_lower = resume_text.lower() # Internship indicators internship_patterns = [ r'intern(?:ship)?\s+at\s+([^\n]+)', r'(?:summer|winter)\s+intern', r'([a-z\s]+)\s+intern' ] internship_mentions = [] for pattern in internship_patterns: matches = re.findall(pattern, text_lower) internship_mentions.extend(matches) if not internship_mentions: return 0.0 # Score based on business keyword overlap in internship context business_internship_score = 0.0 for mention in internship_mentions[:5]: # Top 5 internships mention_text = mention if isinstance(mention, str) else ' '.join(mention) for category, keywords in self.business_keywords.items(): if any(kw in mention_text for kw in keywords): business_internship_score += 0.2 score = min(business_internship_score, 1.0) logger.info(f"Internship relevance: {score:.2f}") return score def _assess_business_depth(self, resume_text: str) -> float: """ Assess overall business knowledge depth Returns: 0-1 score based on technical business terms """ text_lower = resume_text.lower() # Advanced business terms advanced_terms = [ 'financial modeling', 'valuation', 'dcf', 'market research', 'competitive analysis', 'business plan', 'roi', 'kpi', 'p&l', 'balance sheet', 'cash flow', 'stakeholder management', 'go-to-market', 'pricing strategy', 'market segmentation' ] term_count = sum(1 for term in advanced_terms if term in text_lower) score = min(term_count / 10, 1.0) # 10+ terms = max logger.info(f"Business depth score: {score:.2f} ({term_count} advanced terms)") return score def _analyze_case_study(self, case_study_text: str) -> float: """ Analyze case study submission quality Returns: 0-1 score based on structure and depth """ if not case_study_text or len(case_study_text) < 100: return 0.0 score = 0.0 text_lower = case_study_text.lower() # Structure indicators structure_keywords = ['problem', 'analysis', 'solution', 'recommendation', 'conclusion'] structure_score = sum(0.1 for kw in structure_keywords if kw in text_lower) score += min(structure_score, 0.4) # Analytical depth analytical_terms = ['data', 'metric', 'assumption', 'framework', 'hypothesis', 'evidence'] analytical_score = sum(0.05 for term in analytical_terms if term in text_lower) score += min(analytical_score, 0.3) # Length (quality proxy) length_score = min(len(case_study_text) / 2000, 0.3) # 2000+ chars = max score += length_score logger.info(f"Case study score: {score:.2f}") return min(score, 1.0)