| | """ |
| | Probability Calibration Layer |
| | |
| | Applies conservative calibration rules to prevent score inflation. |
| | Ensures outputs align with real-world hiring base rates. |
| | |
| | v1: Rule-based calibration with sigmoid mapping |
| | v2: Will use Platt scaling trained on real outcome data |
| | """ |
| |
|
| | import json |
| | import math |
| | from pathlib import Path |
| | from dataclasses import dataclass |
| |
|
| |
|
| | @dataclass |
| | class CalibrationResult: |
| | original_value: float |
| | calibrated_value: float |
| | caps_applied: list |
| | adjustments: list |
| |
|
| |
|
| | class ProbabilityCalibrator: |
| | """Calibrates raw scores into realistic probability estimates.""" |
| |
|
| | def __init__(self, config_path: str = None): |
| | if config_path is None: |
| | config_path = Path(__file__).parent.parent / "config" / "calibration.json" |
| | with open(config_path) as f: |
| | self.config = json.load(f) |
| |
|
| | self.sigmoid_params = self.config["score_to_probability_mapping"]["parameters"] |
| | self.dimension_caps = self.config["dimension_caps"] |
| | self.base_rates = self.config["base_rates"] |
| | self.confidence_adj = self.config["confidence_adjustments"] |
| |
|
| | def calibrate(self, raw_scores: dict, match_analysis: dict) -> dict: |
| | """Apply calibration to all probability dimensions.""" |
| |
|
| | |
| | risks = match_analysis.get("risk_flags", []) |
| | skill_match = match_analysis.get("skill_match_analysis", {}) |
| | seniority = match_analysis.get("seniority_alignment", {}) |
| | experience = match_analysis.get("experience_depth", {}) |
| | context = match_analysis.get("context_fit", {}) |
| |
|
| | |
| | shortlist = self._calibrate_shortlist( |
| | raw_scores["shortlist_probability"]["value"], |
| | skill_match, |
| | seniority, |
| | ) |
| |
|
| | offer_accept = self._calibrate_offer_acceptance( |
| | raw_scores["offer_acceptance_probability"]["value"], |
| | context, |
| | risks, |
| | ) |
| |
|
| | retention = self._calibrate_retention( |
| | raw_scores["retention_6m_probability"]["value"], |
| | match_analysis, |
| | seniority, |
| | ) |
| |
|
| | |
| | interview_raw = raw_scores["interview_pass_estimate"]["value"] |
| | interview_calibrated = self._apply_sigmoid(interview_raw / 100) * 100 |
| |
|
| | |
| | |
| | overall = ( |
| | shortlist.calibrated_value * 0.45 |
| | + interview_calibrated * 0.25 |
| | + offer_accept.calibrated_value * 0.30 |
| | ) |
| | overall = max(self.sigmoid_params["floor"] * 100, |
| | min(self.sigmoid_params["ceiling"] * 100, overall)) |
| |
|
| | |
| | confidence = raw_scores.get("confidence_level", "medium") |
| | if confidence == "low": |
| | penalty = self.confidence_adj["low_confidence_penalty"] |
| | shortlist.calibrated_value *= penalty |
| | offer_accept.calibrated_value *= penalty |
| | retention.calibrated_value *= penalty |
| | overall *= penalty |
| |
|
| | return { |
| | "shortlist_probability": round(shortlist.calibrated_value, 1), |
| | "offer_acceptance_probability": round(offer_accept.calibrated_value, 1), |
| | "retention_6m_probability": round(retention.calibrated_value, 1), |
| | "interview_pass_estimate": round(interview_calibrated, 1), |
| | "overall_hire_probability": round(overall, 1), |
| | "confidence_level": confidence, |
| | "calibration_details": { |
| | "shortlist_caps": shortlist.caps_applied, |
| | "offer_caps": offer_accept.caps_applied, |
| | "retention_caps": retention.caps_applied, |
| | "shortlist_original": shortlist.original_value, |
| | "offer_original": offer_accept.original_value, |
| | "retention_original": retention.original_value, |
| | }, |
| | } |
| |
|
| | def _apply_sigmoid(self, x: float) -> float: |
| | """Conservative sigmoid mapping.""" |
| | midpoint = self.sigmoid_params["midpoint"] |
| | steepness = self.sigmoid_params["steepness"] |
| | shift = self.sigmoid_params["conservative_shift"] |
| | floor = self.sigmoid_params["floor"] |
| | ceiling = self.sigmoid_params["ceiling"] |
| |
|
| | raw = 1 / (1 + math.exp(-steepness * (x - midpoint + shift))) |
| | return floor + (ceiling - floor) * raw |
| |
|
| | def _calibrate_shortlist(self, raw: float, skill_match: dict, seniority: dict) -> CalibrationResult: |
| | caps_applied = [] |
| | calibrated = self._apply_sigmoid(raw / 100) * 100 |
| |
|
| | |
| | missing = len(skill_match.get("missing_must_haves", [])) |
| | caps = self.dimension_caps["shortlist"] |
| |
|
| | if missing >= 3: |
| | cap = caps["missing_3_plus_critical_skills"] |
| | if calibrated > cap * 100: |
| | calibrated = cap * 100 |
| | caps_applied.append(f"3+ critical skills missing -> cap {cap*100}%") |
| | elif missing >= 2: |
| | cap = caps["missing_2_critical_skills"] |
| | if calibrated > cap * 100: |
| | calibrated = cap * 100 |
| | caps_applied.append(f"2 critical skills missing -> cap {cap*100}%") |
| | elif missing >= 1: |
| | cap = caps["missing_1_critical_skill"] |
| | if calibrated > cap * 100: |
| | calibrated = cap * 100 |
| | caps_applied.append(f"1 critical skill missing -> cap {cap*100}%") |
| |
|
| | band_diff = abs(seniority.get("band_difference", 0)) |
| | if band_diff >= 2: |
| | cap = caps["seniority_mismatch_2_plus_bands"] |
| | if calibrated > cap * 100: |
| | calibrated = cap * 100 |
| | caps_applied.append(f"Seniority mismatch >=2 bands -> cap {cap*100}%") |
| |
|
| | return CalibrationResult( |
| | original_value=raw, |
| | calibrated_value=max(5, min(92, calibrated)), |
| | caps_applied=caps_applied, |
| | adjustments=[], |
| | ) |
| |
|
| | def _calibrate_offer_acceptance(self, raw: float, context: dict, risks: list) -> CalibrationResult: |
| | caps_applied = [] |
| | calibrated = self._apply_sigmoid(raw / 100) * 100 |
| |
|
| | caps = self.dimension_caps["offer_acceptance"] |
| |
|
| | comp = context.get("compensation_alignment_estimate", "unclear") |
| | if comp == "likely_above": |
| | cap = caps["comp_significantly_below"] * 100 |
| | if calibrated > cap: |
| | calibrated = cap |
| | caps_applied.append(f"Comp mismatch -> cap {cap}%") |
| |
|
| | remote = context.get("remote_fit", "possible") |
| | if remote == "incompatible": |
| | cap = caps["location_hard_mismatch"] * 100 |
| | if calibrated > cap: |
| | calibrated = cap |
| | caps_applied.append(f"Location incompatible -> cap {cap}%") |
| |
|
| | career_risks = [r for r in risks if r.get("category") == "offer_acceptance" |
| | and r.get("severity") in ("high", "medium")] |
| | if career_risks: |
| | cap = caps["career_direction_mismatch"] * 100 |
| | if calibrated > cap: |
| | calibrated = cap |
| | caps_applied.append(f"Career direction risks -> cap {cap}%") |
| |
|
| | return CalibrationResult( |
| | original_value=raw, |
| | calibrated_value=max(5, min(92, calibrated)), |
| | caps_applied=caps_applied, |
| | adjustments=[], |
| | ) |
| |
|
| | def _calibrate_retention(self, raw: float, match_analysis: dict, seniority: dict) -> CalibrationResult: |
| | caps_applied = [] |
| | calibrated = self._apply_sigmoid(raw / 100) * 100 |
| |
|
| | caps = self.dimension_caps["retention_6m"] |
| |
|
| | |
| | risks = match_analysis.get("risk_flags", []) |
| | tenure_risks = [r for r in risks if "tenure" in r.get("risk", "").lower() |
| | or "hopping" in r.get("risk", "").lower()] |
| | if tenure_risks: |
| | cap = caps["avg_tenure_under_12m"] * 100 |
| | if calibrated > cap: |
| | calibrated = cap |
| | caps_applied.append(f"Tenure risk flagged -> cap {cap}%") |
| |
|
| | alignment = seniority.get("alignment", "aligned") |
| | if alignment == "overqualified": |
| | cap = caps["overqualified_2_plus_bands"] * 100 |
| | if calibrated > cap: |
| | calibrated = cap |
| | caps_applied.append(f"Overqualified -> cap {cap}%") |
| |
|
| | return CalibrationResult( |
| | original_value=raw, |
| | calibrated_value=max(5, min(92, calibrated)), |
| | caps_applied=caps_applied, |
| | adjustments=[], |
| | ) |
| |
|