""" Probability Calibration Layer Applies conservative calibration rules to prevent score inflation. Ensures outputs align with real-world hiring base rates. v1: Rule-based calibration with sigmoid mapping v2: Will use Platt scaling trained on real outcome data """ import json import math from pathlib import Path from dataclasses import dataclass @dataclass class CalibrationResult: original_value: float calibrated_value: float caps_applied: list adjustments: list class ProbabilityCalibrator: """Calibrates raw scores into realistic probability estimates.""" def __init__(self, config_path: str = None): if config_path is None: config_path = Path(__file__).parent.parent / "config" / "calibration.json" with open(config_path) as f: self.config = json.load(f) self.sigmoid_params = self.config["score_to_probability_mapping"]["parameters"] self.dimension_caps = self.config["dimension_caps"] self.base_rates = self.config["base_rates"] self.confidence_adj = self.config["confidence_adjustments"] def calibrate(self, raw_scores: dict, match_analysis: dict) -> dict: """Apply calibration to all probability dimensions.""" # Extract risk data for cap decisions risks = match_analysis.get("risk_flags", []) skill_match = match_analysis.get("skill_match_analysis", {}) seniority = match_analysis.get("seniority_alignment", {}) experience = match_analysis.get("experience_depth", {}) context = match_analysis.get("context_fit", {}) # Calibrate each dimension shortlist = self._calibrate_shortlist( raw_scores["shortlist_probability"]["value"], skill_match, seniority, ) offer_accept = self._calibrate_offer_acceptance( raw_scores["offer_acceptance_probability"]["value"], context, risks, ) retention = self._calibrate_retention( raw_scores["retention_6m_probability"]["value"], match_analysis, seniority, ) # Interview pass estimate (apply sigmoid only) interview_raw = raw_scores["interview_pass_estimate"]["value"] interview_calibrated = self._apply_sigmoid(interview_raw / 100) * 100 # Overall: weighted blend of components (pure multiplication crushes scores) # Shortlist is the primary gate, so weight it most heavily overall = ( shortlist.calibrated_value * 0.45 + interview_calibrated * 0.25 + offer_accept.calibrated_value * 0.30 ) overall = max(self.sigmoid_params["floor"] * 100, min(self.sigmoid_params["ceiling"] * 100, overall)) # Confidence adjustment confidence = raw_scores.get("confidence_level", "medium") if confidence == "low": penalty = self.confidence_adj["low_confidence_penalty"] shortlist.calibrated_value *= penalty offer_accept.calibrated_value *= penalty retention.calibrated_value *= penalty overall *= penalty return { "shortlist_probability": round(shortlist.calibrated_value, 1), "offer_acceptance_probability": round(offer_accept.calibrated_value, 1), "retention_6m_probability": round(retention.calibrated_value, 1), "interview_pass_estimate": round(interview_calibrated, 1), "overall_hire_probability": round(overall, 1), "confidence_level": confidence, "calibration_details": { "shortlist_caps": shortlist.caps_applied, "offer_caps": offer_accept.caps_applied, "retention_caps": retention.caps_applied, "shortlist_original": shortlist.original_value, "offer_original": offer_accept.original_value, "retention_original": retention.original_value, }, } def _apply_sigmoid(self, x: float) -> float: """Conservative sigmoid mapping.""" midpoint = self.sigmoid_params["midpoint"] steepness = self.sigmoid_params["steepness"] shift = self.sigmoid_params["conservative_shift"] floor = self.sigmoid_params["floor"] ceiling = self.sigmoid_params["ceiling"] raw = 1 / (1 + math.exp(-steepness * (x - midpoint + shift))) return floor + (ceiling - floor) * raw def _calibrate_shortlist(self, raw: float, skill_match: dict, seniority: dict) -> CalibrationResult: caps_applied = [] calibrated = self._apply_sigmoid(raw / 100) * 100 # Apply dimension-specific caps missing = len(skill_match.get("missing_must_haves", [])) caps = self.dimension_caps["shortlist"] if missing >= 3: cap = caps["missing_3_plus_critical_skills"] if calibrated > cap * 100: calibrated = cap * 100 caps_applied.append(f"3+ critical skills missing -> cap {cap*100}%") elif missing >= 2: cap = caps["missing_2_critical_skills"] if calibrated > cap * 100: calibrated = cap * 100 caps_applied.append(f"2 critical skills missing -> cap {cap*100}%") elif missing >= 1: cap = caps["missing_1_critical_skill"] if calibrated > cap * 100: calibrated = cap * 100 caps_applied.append(f"1 critical skill missing -> cap {cap*100}%") band_diff = abs(seniority.get("band_difference", 0)) if band_diff >= 2: cap = caps["seniority_mismatch_2_plus_bands"] if calibrated > cap * 100: calibrated = cap * 100 caps_applied.append(f"Seniority mismatch >=2 bands -> cap {cap*100}%") return CalibrationResult( original_value=raw, calibrated_value=max(5, min(92, calibrated)), caps_applied=caps_applied, adjustments=[], ) def _calibrate_offer_acceptance(self, raw: float, context: dict, risks: list) -> CalibrationResult: caps_applied = [] calibrated = self._apply_sigmoid(raw / 100) * 100 caps = self.dimension_caps["offer_acceptance"] comp = context.get("compensation_alignment_estimate", "unclear") if comp == "likely_above": cap = caps["comp_significantly_below"] * 100 # reuse cap value if calibrated > cap: calibrated = cap caps_applied.append(f"Comp mismatch -> cap {cap}%") remote = context.get("remote_fit", "possible") if remote == "incompatible": cap = caps["location_hard_mismatch"] * 100 if calibrated > cap: calibrated = cap caps_applied.append(f"Location incompatible -> cap {cap}%") career_risks = [r for r in risks if r.get("category") == "offer_acceptance" and r.get("severity") in ("high", "medium")] if career_risks: cap = caps["career_direction_mismatch"] * 100 if calibrated > cap: calibrated = cap caps_applied.append(f"Career direction risks -> cap {cap}%") return CalibrationResult( original_value=raw, calibrated_value=max(5, min(92, calibrated)), caps_applied=caps_applied, adjustments=[], ) def _calibrate_retention(self, raw: float, match_analysis: dict, seniority: dict) -> CalibrationResult: caps_applied = [] calibrated = self._apply_sigmoid(raw / 100) * 100 caps = self.dimension_caps["retention_6m"] # Check tenure from candidate features if available risks = match_analysis.get("risk_flags", []) tenure_risks = [r for r in risks if "tenure" in r.get("risk", "").lower() or "hopping" in r.get("risk", "").lower()] if tenure_risks: cap = caps["avg_tenure_under_12m"] * 100 if calibrated > cap: calibrated = cap caps_applied.append(f"Tenure risk flagged -> cap {cap}%") alignment = seniority.get("alignment", "aligned") if alignment == "overqualified": cap = caps["overqualified_2_plus_bands"] * 100 if calibrated > cap: calibrated = cap caps_applied.append(f"Overqualified -> cap {cap}%") return CalibrationResult( original_value=raw, calibrated_value=max(5, min(92, calibrated)), caps_applied=caps_applied, adjustments=[], )