import json import logging from datetime import datetime from collections import defaultdict import re from typing import Dict, List, Tuple, Any logger = logging.getLogger(__name__) class EnhancedContextualScoringEngine: """ Moteur de scoring avancé qui analyse multiple dimensions pour évaluer le niveau de maîtrise d'une compétence : contexte, fréquence, profondeur, progression, et niveau d'expertise démontré. """ # Pondérations pour la formule de scoring (total = 1.0) ALPHA = 0.35 # Poids du contexte (où la compétence est mentionnée) BETA = 0.20 # Poids de la fréquence (combien de fois elle apparaît) GAMMA = 0.25 # Poids de la profondeur (durée d'expérience) DELTA = 0.15 # Poids de l'expertise (niveau démontré) EPSILON = 0.05 # Poids de la progression (évolution dans le temps) # Valeurs des contextes (révisées pour plus de granularité) CONTEXT_VALUES = { "formations": 0.3, "projets_personnels": 0.5, "projets_professionnels": 0.7, "experiences_professionnelles": 0.8, "responsabilités_leadership": 0.9, # Nouveau : si mentionné comme leader/expert } # Mots-clés indiquant différents niveaux d'expertise EXPERTISE_KEYWORDS = { "débutant": ["initiation", "découverte", "apprentissage", "formation"], "intermédiaire": ["utilisation", "application", "développement", "mise en œuvre"], "avancé": ["maîtrise", "expertise", "optimisation", "architecture", "conception"], "expert": ["lead", "senior", "expert", "mentor", "formateur", "référent", "spécialiste"] } # Scores d'expertise EXPERTISE_SCORES = { "débutant": 0.2, "intermédiaire": 0.5, "avancé": 0.8, "expert": 1.0 } def __init__(self, parsed_cv_data: dict): self.cv_data = parsed_cv_data.get("candidat", {}) if not self.cv_data: raise ValueError("Données du candidat non trouvées dans le CV parsé.") def _normalize_score(self, value: float, max_value: float = 10.0) -> float: """Normalise une valeur avec une fonction sigmoïde améliorée.""" if value <= 0: return 0.0 # Utilisation d'une courbe plus douce pour éviter la saturation trop rapide return min(1.0, value / (value + max_value)) def _parse_date(self, date_str: str) -> datetime | None: """Parse une date de manière robuste.""" if not date_str or not isinstance(date_str, str): return None date_str_clean = date_str.strip().lower() current_indicators = ["aujourd'hui", "maintenant", "en cours", "current", "présent"] if any(indicator in date_str_clean for indicator in current_indicators): return datetime.now() # Formats supportés étendus formats = ["%m/%Y", "%Y-%m", "%Y", "%m-%Y", "%B %Y", "%b %Y"] for fmt in formats: try: return datetime.strptime(date_str_clean, fmt) except ValueError: continue return None def _calculate_duration_in_years(self, start_date_str: str, end_date_str: str) -> float: """Calcule la durée avec gestion améliorée des cas limites.""" start_date = self._parse_date(start_date_str) end_date = self._parse_date(end_date_str) if start_date and end_date: if end_date < start_date: return 0.0 duration = (end_date - start_date).days / 365.25 return max(0.0, duration) elif start_date: # Si seule la date de début est connue current = datetime.now() duration = (current - start_date).days / 365.25 return max(0.0, min(duration, 10.0)) # Cap à 10 ans pour éviter les aberrations return 0.0 def _detect_expertise_level(self, text: str, skill: str) -> str: """ Détecte le niveau d'expertise d'une compétence basé sur le contexte. """ text_lower = text.lower() skill_lower = skill.lower() # Chercher des patterns spécifiques autour de la compétence skill_context = "" skill_positions = [m.start() for m in re.finditer(re.escape(skill_lower), text_lower)] for pos in skill_positions: # Extraire 100 caractères avant et après la mention de la compétence start = max(0, pos - 100) end = min(len(text_lower), pos + len(skill_lower) + 100) skill_context += " " + text_lower[start:end] # Analyser le contexte pour déterminer le niveau for level in ["expert", "avancé", "intermédiaire", "débutant"]: for keyword in self.EXPERTISE_KEYWORDS[level]: if keyword in skill_context: return level # Si aucun indicateur spécifique, analyser les verbes d'action advanced_verbs = ["concevoir", "architecturer", "optimiser", "diriger", "superviser"] intermediate_verbs = ["développer", "implémenter", "créer", "réaliser"] if any(verb in skill_context for verb in advanced_verbs): return "avancé" elif any(verb in skill_context for verb in intermediate_verbs): return "intermédiaire" return "intermédiaire" # Valeur par défaut def _calculate_progression_score(self, skill_timeline: List[Tuple[datetime, str]]) -> float: """ Calcule un score de progression basé sur l'évolution de la compétence dans le temps. """ if len(skill_timeline) < 2: return 0.0 # Trier par date skill_timeline.sort(key=lambda x: x[0]) # Analyser la progression des niveaux levels = ["débutant", "intermédiaire", "avancé", "expert"] level_values = {level: i for i, level in enumerate(levels)} progression = 0.0 for i in range(1, len(skill_timeline)): prev_level = level_values.get(skill_timeline[i-1][1], 1) curr_level = level_values.get(skill_timeline[i][1], 1) if curr_level > prev_level: progression += 0.3 # Bonus pour progression positive # Bonus pour utilisation récente (dans les 2 dernières années) recent_cutoff = datetime.now().replace(year=datetime.now().year - 2) if skill_timeline[-1][0] >= recent_cutoff: progression += 0.2 return min(1.0, progression) def _analyze_skill_in_projects(self, skill: str, projects_data: Dict) -> Tuple[int, float, List[str]]: """ Analyse spécifique des compétences dans les projets. Retourne: (fréquence, score_expertise_moyen, contextes) """ frequency = 0 expertise_scores = [] contexts = [] for project_type in ["professional", "personal"]: for project in projects_data.get(project_type, []): project_text = json.dumps(project, ensure_ascii=False).lower() if skill.lower() in project_text: contexts.append(f"projets_{project_type}") frequency += project_text.count(skill.lower()) # Analyser le rôle pour déterminer l'expertise role = project.get("role", "").lower() if any(expert_word in role for expert_word in ["lead", "chef", "responsable", "senior"]): expertise_scores.append(self.EXPERTISE_SCORES["expert"]) else: level = self._detect_expertise_level(project_text, skill) expertise_scores.append(self.EXPERTISE_SCORES[level]) avg_expertise = sum(expertise_scores) / len(expertise_scores) if expertise_scores else 0.5 return frequency, avg_expertise, contexts def calculate_scores(self) -> dict: """ Calcule les scores pondérés avec analyse multi-dimensionnelle. """ skills_list = self.cv_data.get("compétences", {}) all_skills = [] # Combiner hard et soft skills if isinstance(skills_list, dict): all_skills.extend(skills_list.get("hard_skills", [])) all_skills.extend(skills_list.get("soft_skills", [])) elif isinstance(skills_list, list): all_skills = [item.get("nom") for item in skills_list if item.get("nom")] if not all_skills: logger.warning("Aucune compétence à analyser dans le CV.") return {"analyse_competences": []} # Structure pour chaque compétence skill_metrics = { skill.lower(): { "original_name": skill, "contexts": set(), "frequency": 0, "max_duration": 0.0, "expertise_scores": [], "timeline": [], # Pour analyser la progression "project_involvement": {"count": 0, "leadership": False} } for skill in all_skills if skill } # 1. Analyse des expériences professionnelles for exp in self.cv_data.get("expériences", []): if not isinstance(exp, dict): continue exp_text = json.dumps(exp, ensure_ascii=False).lower() duration = self._calculate_duration_in_years( exp.get("start_date", ""), exp.get("end_date", "") ) # Déterminer la date pour la timeline exp_date = self._parse_date(exp.get("start_date", "")) for skill in skill_metrics: if skill in exp_text: skill_metrics[skill]["contexts"].add("experiences_professionnelles") skill_metrics[skill]["frequency"] += exp_text.count(skill) if duration > skill_metrics[skill]["max_duration"]: skill_metrics[skill]["max_duration"] = duration # Analyser le niveau d'expertise expertise_level = self._detect_expertise_level(exp_text, skill) skill_metrics[skill]["expertise_scores"].append( self.EXPERTISE_SCORES[expertise_level] ) # Ajouter à la timeline if exp_date: skill_metrics[skill]["timeline"].append((exp_date, expertise_level)) # 2. Analyse des projets (avec analyse approfondie) projects_data = self.cv_data.get("projets", {}) for skill in skill_metrics: proj_freq, proj_expertise, proj_contexts = self._analyze_skill_in_projects( skill, projects_data ) skill_metrics[skill]["frequency"] += proj_freq skill_metrics[skill]["contexts"].update(proj_contexts) if proj_expertise > 0: skill_metrics[skill]["expertise_scores"].append(proj_expertise) # 3. Analyse des formations for formation in self.cv_data.get("formations", []): if not isinstance(formation, dict): continue formation_text = json.dumps(formation, ensure_ascii=False).lower() formation_date = self._parse_date(formation.get("start_date", "")) for skill in skill_metrics: if skill in formation_text: skill_metrics[skill]["contexts"].add("formations") skill_metrics[skill]["frequency"] += formation_text.count(skill) skill_metrics[skill]["expertise_scores"].append( self.EXPERTISE_SCORES["débutant"] ) if formation_date: skill_metrics[skill]["timeline"].append((formation_date, "débutant")) # 4. Calcul final des scores final_scores = [] for skill, metrics in skill_metrics.items(): if metrics["frequency"] == 0: # Skip skills not found continue # Score de contexte (avec bonus multi-contexte) context_scores = [self.CONTEXT_VALUES.get(c, 0.1) for c in metrics["contexts"]] context_score = max(context_scores) if context_scores else 0.1 if len(metrics["contexts"]) > 2: context_score = min(1.0, context_score * 1.2) # Bonus multi-contexte # Score d'expertise (moyenne pondérée) avg_expertise = ( sum(metrics["expertise_scores"]) / len(metrics["expertise_scores"]) if metrics["expertise_scores"] else 0.5 ) # Score de progression progression_score = self._calculate_progression_score(metrics["timeline"]) # Normalisation normalized_frequency = self._normalize_score(metrics["frequency"], 5.0) normalized_depth = self._normalize_score(metrics["max_duration"], 3.0) # Formule finale améliorée final_score = ( self.ALPHA * context_score + self.BETA * normalized_frequency + self.GAMMA * normalized_depth + self.DELTA * avg_expertise + self.EPSILON * progression_score ) final_scores.append({ "skill": metrics["original_name"], "score": round(final_score, 3), "niveau_estime": self._estimate_skill_level(final_score), "details": { "context_score": round(context_score, 3), "contexts_found": list(metrics["contexts"]), "frequency": metrics["frequency"], "max_duration_years": round(metrics["max_duration"], 1), "expertise_level": round(avg_expertise, 3), "progression_score": round(progression_score, 3), "timeline_points": len(metrics["timeline"]) } }) # Trier par score décroissant final_scores.sort(key=lambda x: x["score"], reverse=True) logger.info(f"Scoring avancé terminé pour {len(final_scores)} compétences.") return {"analyse_competences": final_scores} def _estimate_skill_level(self, score: float) -> str: """ Convertit le score numérique en niveau de compétence lisible. """ if score >= 0.8: return "Expert" elif score >= 0.6: return "Avancé" elif score >= 0.4: return "Intermédiaire" elif score >= 0.2: return "Débutant" else: return "Notions" def get_top_skills(self, n: int = 10, skill_type: str = None) -> List[Dict]: """ Retourne les N meilleures compétences, optionnellement filtrées par type. """ results = self.calculate_scores() skills = results.get("analyse_competences", []) if skill_type: # Filtrer par type si spécifié (nécessiterait une classification préalable) pass return skills[:n]