Niketjain2002's picture
Fix score calibration: use weighted blend instead of multiplicative formula
1d60b95 verified
"""
Probability Calibration Layer
Applies conservative calibration rules to prevent score inflation.
Ensures outputs align with real-world hiring base rates.
v1: Rule-based calibration with sigmoid mapping
v2: Will use Platt scaling trained on real outcome data
"""
import json
import math
from pathlib import Path
from dataclasses import dataclass
@dataclass
class CalibrationResult:
original_value: float
calibrated_value: float
caps_applied: list
adjustments: list
class ProbabilityCalibrator:
"""Calibrates raw scores into realistic probability estimates."""
def __init__(self, config_path: str = None):
if config_path is None:
config_path = Path(__file__).parent.parent / "config" / "calibration.json"
with open(config_path) as f:
self.config = json.load(f)
self.sigmoid_params = self.config["score_to_probability_mapping"]["parameters"]
self.dimension_caps = self.config["dimension_caps"]
self.base_rates = self.config["base_rates"]
self.confidence_adj = self.config["confidence_adjustments"]
def calibrate(self, raw_scores: dict, match_analysis: dict) -> dict:
"""Apply calibration to all probability dimensions."""
# Extract risk data for cap decisions
risks = match_analysis.get("risk_flags", [])
skill_match = match_analysis.get("skill_match_analysis", {})
seniority = match_analysis.get("seniority_alignment", {})
experience = match_analysis.get("experience_depth", {})
context = match_analysis.get("context_fit", {})
# Calibrate each dimension
shortlist = self._calibrate_shortlist(
raw_scores["shortlist_probability"]["value"],
skill_match,
seniority,
)
offer_accept = self._calibrate_offer_acceptance(
raw_scores["offer_acceptance_probability"]["value"],
context,
risks,
)
retention = self._calibrate_retention(
raw_scores["retention_6m_probability"]["value"],
match_analysis,
seniority,
)
# Interview pass estimate (apply sigmoid only)
interview_raw = raw_scores["interview_pass_estimate"]["value"]
interview_calibrated = self._apply_sigmoid(interview_raw / 100) * 100
# Overall: weighted blend of components (pure multiplication crushes scores)
# Shortlist is the primary gate, so weight it most heavily
overall = (
shortlist.calibrated_value * 0.45
+ interview_calibrated * 0.25
+ offer_accept.calibrated_value * 0.30
)
overall = max(self.sigmoid_params["floor"] * 100,
min(self.sigmoid_params["ceiling"] * 100, overall))
# Confidence adjustment
confidence = raw_scores.get("confidence_level", "medium")
if confidence == "low":
penalty = self.confidence_adj["low_confidence_penalty"]
shortlist.calibrated_value *= penalty
offer_accept.calibrated_value *= penalty
retention.calibrated_value *= penalty
overall *= penalty
return {
"shortlist_probability": round(shortlist.calibrated_value, 1),
"offer_acceptance_probability": round(offer_accept.calibrated_value, 1),
"retention_6m_probability": round(retention.calibrated_value, 1),
"interview_pass_estimate": round(interview_calibrated, 1),
"overall_hire_probability": round(overall, 1),
"confidence_level": confidence,
"calibration_details": {
"shortlist_caps": shortlist.caps_applied,
"offer_caps": offer_accept.caps_applied,
"retention_caps": retention.caps_applied,
"shortlist_original": shortlist.original_value,
"offer_original": offer_accept.original_value,
"retention_original": retention.original_value,
},
}
def _apply_sigmoid(self, x: float) -> float:
"""Conservative sigmoid mapping."""
midpoint = self.sigmoid_params["midpoint"]
steepness = self.sigmoid_params["steepness"]
shift = self.sigmoid_params["conservative_shift"]
floor = self.sigmoid_params["floor"]
ceiling = self.sigmoid_params["ceiling"]
raw = 1 / (1 + math.exp(-steepness * (x - midpoint + shift)))
return floor + (ceiling - floor) * raw
def _calibrate_shortlist(self, raw: float, skill_match: dict, seniority: dict) -> CalibrationResult:
caps_applied = []
calibrated = self._apply_sigmoid(raw / 100) * 100
# Apply dimension-specific caps
missing = len(skill_match.get("missing_must_haves", []))
caps = self.dimension_caps["shortlist"]
if missing >= 3:
cap = caps["missing_3_plus_critical_skills"]
if calibrated > cap * 100:
calibrated = cap * 100
caps_applied.append(f"3+ critical skills missing -> cap {cap*100}%")
elif missing >= 2:
cap = caps["missing_2_critical_skills"]
if calibrated > cap * 100:
calibrated = cap * 100
caps_applied.append(f"2 critical skills missing -> cap {cap*100}%")
elif missing >= 1:
cap = caps["missing_1_critical_skill"]
if calibrated > cap * 100:
calibrated = cap * 100
caps_applied.append(f"1 critical skill missing -> cap {cap*100}%")
band_diff = abs(seniority.get("band_difference", 0))
if band_diff >= 2:
cap = caps["seniority_mismatch_2_plus_bands"]
if calibrated > cap * 100:
calibrated = cap * 100
caps_applied.append(f"Seniority mismatch >=2 bands -> cap {cap*100}%")
return CalibrationResult(
original_value=raw,
calibrated_value=max(5, min(92, calibrated)),
caps_applied=caps_applied,
adjustments=[],
)
def _calibrate_offer_acceptance(self, raw: float, context: dict, risks: list) -> CalibrationResult:
caps_applied = []
calibrated = self._apply_sigmoid(raw / 100) * 100
caps = self.dimension_caps["offer_acceptance"]
comp = context.get("compensation_alignment_estimate", "unclear")
if comp == "likely_above":
cap = caps["comp_significantly_below"] * 100 # reuse cap value
if calibrated > cap:
calibrated = cap
caps_applied.append(f"Comp mismatch -> cap {cap}%")
remote = context.get("remote_fit", "possible")
if remote == "incompatible":
cap = caps["location_hard_mismatch"] * 100
if calibrated > cap:
calibrated = cap
caps_applied.append(f"Location incompatible -> cap {cap}%")
career_risks = [r for r in risks if r.get("category") == "offer_acceptance"
and r.get("severity") in ("high", "medium")]
if career_risks:
cap = caps["career_direction_mismatch"] * 100
if calibrated > cap:
calibrated = cap
caps_applied.append(f"Career direction risks -> cap {cap}%")
return CalibrationResult(
original_value=raw,
calibrated_value=max(5, min(92, calibrated)),
caps_applied=caps_applied,
adjustments=[],
)
def _calibrate_retention(self, raw: float, match_analysis: dict, seniority: dict) -> CalibrationResult:
caps_applied = []
calibrated = self._apply_sigmoid(raw / 100) * 100
caps = self.dimension_caps["retention_6m"]
# Check tenure from candidate features if available
risks = match_analysis.get("risk_flags", [])
tenure_risks = [r for r in risks if "tenure" in r.get("risk", "").lower()
or "hopping" in r.get("risk", "").lower()]
if tenure_risks:
cap = caps["avg_tenure_under_12m"] * 100
if calibrated > cap:
calibrated = cap
caps_applied.append(f"Tenure risk flagged -> cap {cap}%")
alignment = seniority.get("alignment", "aligned")
if alignment == "overqualified":
cap = caps["overqualified_2_plus_bands"] * 100
if calibrated > cap:
calibrated = cap
caps_applied.append(f"Overqualified -> cap {cap}%")
return CalibrationResult(
original_value=raw,
calibrated_value=max(5, min(92, calibrated)),
caps_applied=caps_applied,
adjustments=[],
)