Niketjain2002's picture
Upload src/pipeline.py with huggingface_hub
374ded8 verified
"""
Main Pipeline Orchestrator
Coordinates the full evaluation flow:
Input Processing -> Feature Extraction -> Scoring -> Calibration -> Explanation -> Bias Audit
Supports two modes:
- "llm": Full LLM-powered pipeline (production)
- "deterministic": Rule-based scoring without LLM calls (testing/offline)
"""
import json
import time
from typing import Optional
from dataclasses import dataclass, asdict
from .input_processor import InputProcessor, ProcessedInput
from .feature_extractor import FeatureExtractor, LLMClient
from .scoring_engine import ScoringEngine
from .calibrator import ProbabilityCalibrator
from .explainer import Explainer
from .bias_guard import BiasGuard
@dataclass
class EvaluationResult:
"""Final output of the recruitment intelligence pipeline."""
overall_hire_probability: float
shortlist_probability: float
offer_acceptance_probability: float
retention_6m_probability: float
confidence_level: str
positive_signals: list
risk_signals: list
missing_signals: list
reasoning_summary: str
# Extended fields (not in base contract but useful)
recommendation: str = ""
calibration_details: dict = None
bias_audit: dict = None
evaluation_metadata: dict = None
def to_contract_json(self) -> dict:
"""Return the strict JSON contract format."""
return {
"overall_hire_probability": self.overall_hire_probability,
"shortlist_probability": self.shortlist_probability,
"offer_acceptance_probability": self.offer_acceptance_probability,
"retention_6m_probability": self.retention_6m_probability,
"confidence_level": self.confidence_level,
"positive_signals": self.positive_signals,
"risk_signals": self.risk_signals,
"missing_signals": self.missing_signals,
"reasoning_summary": self.reasoning_summary,
}
def to_full_json(self) -> dict:
"""Return extended output including calibration details and bias audit."""
result = self.to_contract_json()
result["recommendation"] = self.recommendation
result["calibration_details"] = self.calibration_details
result["bias_audit"] = self.bias_audit
result["evaluation_metadata"] = self.evaluation_metadata
return result
class RecruitmentIntelligencePipeline:
"""
Main orchestrator for hire probability estimation.
Usage:
pipeline = RecruitmentIntelligencePipeline(mode="llm", provider="anthropic")
result = pipeline.evaluate(
job_description="...",
company_context={...},
resume_text="...",
)
print(json.dumps(result.to_contract_json(), indent=2))
"""
def __init__(
self,
mode: str = "llm",
provider: str = "anthropic",
model: Optional[str] = None,
):
self.mode = mode
self.input_processor = InputProcessor()
self.calibrator = ProbabilityCalibrator()
self.bias_guard = BiasGuard()
if mode == "llm":
llm = LLMClient(provider=provider, model=model)
self.feature_extractor = FeatureExtractor(llm_client=llm)
self.scoring_engine = ScoringEngine(llm_client=llm)
self.explainer = Explainer(llm_client=llm)
else:
# Deterministic mode: LLM still needed for feature extraction
# but scoring and explanation are rule-based
llm = LLMClient(provider=provider, model=model)
self.feature_extractor = FeatureExtractor(llm_client=llm)
self.scoring_engine = ScoringEngine(llm_client=llm)
self.explainer = Explainer(llm_client=llm)
def extract_role_features_once(self, job_description: str, company_context: dict) -> tuple:
"""
Extract role features once for batch processing.
Returns (processed_input_template, role_features) to reuse across candidates.
"""
processed = self.input_processor.process(
job_description, company_context, "(placeholder)"
)
role_features = self.feature_extractor.extract_role_features(processed)
return processed, role_features
def evaluate_single_with_role_features(
self,
job_description: str,
company_context: dict,
resume_text: str,
role_features: dict,
) -> EvaluationResult:
"""
Evaluate a single candidate reusing pre-extracted role features.
Saves 1 LLM call per candidate in batch mode.
"""
start_time = time.time()
processed = self.input_processor.process(
job_description, company_context, resume_text
)
input_bias = self.bias_guard.audit_inputs(
processed.job_description, processed.resume_text
)
candidate_features = self.feature_extractor.extract_candidate_features(processed)
feature_bias = self.bias_guard.audit_features(role_features, candidate_features)
match_analysis = self.feature_extractor.analyze_match(
role_features, candidate_features, processed
)
if self.mode == "llm":
raw_scores = self.scoring_engine.score(match_analysis)
else:
raw_scores = self.scoring_engine.score_deterministic(match_analysis)
calibrated = self.calibrator.calibrate(raw_scores, match_analysis)
if self.mode == "llm":
explanation = self.explainer.explain(calibrated, match_analysis)
else:
explanation = self.explainer.explain_deterministic(calibrated, match_analysis)
output_bias = self.bias_guard.audit_output(explanation)
elapsed = time.time() - start_time
# Try to extract candidate name
candidate_name = candidate_features.get("candidate_name") or "Unknown"
return EvaluationResult(
overall_hire_probability=calibrated["overall_hire_probability"],
shortlist_probability=calibrated["shortlist_probability"],
offer_acceptance_probability=calibrated["offer_acceptance_probability"],
retention_6m_probability=calibrated["retention_6m_probability"],
confidence_level=calibrated["confidence_level"],
positive_signals=explanation.get("positive_signals", []),
risk_signals=explanation.get("risk_signals", []),
missing_signals=explanation.get("missing_signals", []),
reasoning_summary=explanation.get("reasoning_summary", ""),
recommendation=explanation.get("recommendation", ""),
calibration_details=calibrated.get("calibration_details"),
bias_audit={
"input_audit": input_bias.summary,
"feature_audit": feature_bias.summary,
"output_audit": output_bias.summary,
"all_passed": input_bias.passed and feature_bias.passed and output_bias.passed,
"flags": [
{"category": f.category, "severity": f.severity, "description": f.description}
for f in input_bias.flags + feature_bias.flags + output_bias.flags
],
},
evaluation_metadata={
"mode": self.mode,
"elapsed_seconds": round(elapsed, 2),
"data_quality_score": processed.data_quality_score,
"input_warnings": processed.warnings,
"pipeline_version": "1.0.0",
"candidate_name": candidate_name,
},
)
def evaluate(
self,
job_description: str,
company_context: dict,
resume_text: str,
) -> EvaluationResult:
"""
Run full evaluation pipeline.
Args:
job_description: Raw job description text
company_context: Dict with stage, industry, compensation_band, location, remote_type
resume_text: Raw resume text
Returns:
EvaluationResult with probabilities and explanation
"""
start_time = time.time()
# Step 1: Input Processing
processed = self.input_processor.process(
job_description, company_context, resume_text
)
# Step 2: Bias audit on inputs
input_bias = self.bias_guard.audit_inputs(
processed.job_description, processed.resume_text
)
# Step 3: Feature Extraction (always LLM-powered in v1)
role_features = self.feature_extractor.extract_role_features(processed)
candidate_features = self.feature_extractor.extract_candidate_features(processed)
# Step 4: Bias audit on features
feature_bias = self.bias_guard.audit_features(role_features, candidate_features)
# Step 5: Match Analysis
match_analysis = self.feature_extractor.analyze_match(
role_features, candidate_features, processed
)
# Step 6: Scoring
if self.mode == "llm":
raw_scores = self.scoring_engine.score(match_analysis)
else:
raw_scores = self.scoring_engine.score_deterministic(match_analysis)
# Step 7: Calibration
calibrated = self.calibrator.calibrate(raw_scores, match_analysis)
# Step 8: Explanation
if self.mode == "llm":
explanation = self.explainer.explain(calibrated, match_analysis)
else:
explanation = self.explainer.explain_deterministic(calibrated, match_analysis)
# Step 9: Bias audit on output
output_bias = self.bias_guard.audit_output(explanation)
elapsed = time.time() - start_time
# Assemble result
return EvaluationResult(
overall_hire_probability=calibrated["overall_hire_probability"],
shortlist_probability=calibrated["shortlist_probability"],
offer_acceptance_probability=calibrated["offer_acceptance_probability"],
retention_6m_probability=calibrated["retention_6m_probability"],
confidence_level=calibrated["confidence_level"],
positive_signals=explanation.get("positive_signals", []),
risk_signals=explanation.get("risk_signals", []),
missing_signals=explanation.get("missing_signals", []),
reasoning_summary=explanation.get("reasoning_summary", ""),
recommendation=explanation.get("recommendation", ""),
calibration_details=calibrated.get("calibration_details"),
bias_audit={
"input_audit": input_bias.summary,
"feature_audit": feature_bias.summary,
"output_audit": output_bias.summary,
"all_passed": input_bias.passed and feature_bias.passed and output_bias.passed,
"flags": [
{"category": f.category, "severity": f.severity, "description": f.description}
for f in input_bias.flags + feature_bias.flags + output_bias.flags
],
},
evaluation_metadata={
"mode": self.mode,
"elapsed_seconds": round(elapsed, 2),
"data_quality_score": processed.data_quality_score,
"input_warnings": processed.warnings,
"pipeline_version": "1.0.0",
},
)