""" Candidate source APIs - compute metrics from actual data. AUTO-GENERATED by scripts/generate_hf.sh - DO NOT EDIT DIRECTLY Edit candidate_source.py in main repo and regenerate. """ from typing import Dict, List, Any, Optional, Union import pandas as pd from loguru import logger from data_loader import get_data_loader from models import ( RequisitionNotFoundResponse, SLAPerSourceResponse, TotalHiresBySourceResponse, CandidateVolumeResponse, FunnelConversionResponse, MetadataResponse, DefinitionsResponse, SourceRecommendationResponse, ) BPO_LOG_API_CALLS = False # Disabled for deployment def _log_api_call(msg: str) -> None: """Log API call if BPO_LOG_API_CALLS is enabled.""" if BPO_LOG_API_CALLS: logger.info(msg) def _check_requisition_valid(requisition_id: str) -> Optional[RequisitionNotFoundResponse]: """ Check if a requisition ID is valid. Returns None if valid, or an error response model if invalid. """ loader = get_data_loader() if not loader.is_valid_requisition(requisition_id): suggestions = loader.get_suggested_requisitions(requisition_id) return RequisitionNotFoundResponse( error="requisition_not_found", message=f"No job can be found with the ID {requisition_id}.", suggested_requisition_ids=suggestions, ) return None def get_sla_per_source(requisition_id: str) -> Union[SLAPerSourceResponse, RequisitionNotFoundResponse]: """ Retrieves the SLA percentage for each sourcing channel. Args: requisition_id: The specific requisition ID to filter SLA data for. Returns: A dictionary with source names and their SLA percentages. """ _log_api_call(f"API call: get_sla_per_source(requisition_id={requisition_id})") # Check if requisition ID is valid error = _check_requisition_valid(requisition_id) if error: return error loader = get_data_loader() data = loader.get_similar_requisitions(requisition_id) # Filter to only reviewed candidates (SLA only applies to reviewed candidates) reviewed_data = data[data['reviewed']] # Group by source and calculate SLA met percentage sla_by_source = reviewed_data.groupby('source_name').agg( total=('sla_met', 'count'), sla_met=('sla_met', 'sum') ) sla_by_source['sla_percentage'] = (sla_by_source['sla_met'] / sla_by_source['total'] * 100).round(0).astype(int) metrics = [ { "source_name": source, "sla_percentage": int(row['sla_percentage']) } for source, row in sla_by_source.iterrows() ] # Sort by SLA percentage (ascending) for consistency metrics.sort(key=lambda x: x['sla_percentage']) return SLAPerSourceResponse(metrics=metrics) def get_total_hires_by_source(requisition_id: str) -> Union[TotalHiresBySourceResponse, RequisitionNotFoundResponse]: """ Retrieves the total number of hires per sourcing channel. Args: requisition_id: The specific requisition ID to filter hiring data for. Returns: A dictionary with source names and total hires. """ _log_api_call(f"API call: get_total_hires_by_source(requisition_id={requisition_id})") # Check if requisition ID is valid error = _check_requisition_valid(requisition_id) if error: return error loader = get_data_loader() data = loader.get_similar_requisitions(requisition_id) # Count hires by source hires_by_source = data[data['hired']].groupby('source_name').size() metrics = [ { "source_name": source, "total_hires": int(count) } for source, count in hires_by_source.items() ] # Sort by total hires (descending) metrics.sort(key=lambda x: x['total_hires'], reverse=True) total_hires = int(data['hired'].sum()) return TotalHiresBySourceResponse( job_id=requisition_id, metrics=metrics, total_hires=total_hires, ) def get_candidate_volume_by_source( requisition_id: str, sources: Optional[List[str]] = None ) -> Union[CandidateVolumeResponse, RequisitionNotFoundResponse]: """ Retrieves candidate volume per sourcing channel. Args: requisition_id: The specific requisition ID to filter candidate volume. sources: Optional subset of sourcing channels to include (case-sensitive). Returns: A dictionary with source names and candidate volumes. """ _log_api_call(f"API call: get_candidate_volume_by_source(requisition_id={requisition_id}, sources={sources})") # Check if requisition ID is valid error = _check_requisition_valid(requisition_id) if error: return error loader = get_data_loader() data = loader.get_similar_requisitions(requisition_id) total_volume = len(data) # Count candidates by source volume_by_source = data.groupby('source_name').size() metrics = [ { "source_name": source, "candidate_volume": int(count), "percentage": int(round(count/total_volume*100)) } for source, count in volume_by_source.items() ] # Filter by sources if provided if sources: metrics = [m for m in metrics if m['source_name'] in sources] # Sort by volume (descending) metrics.sort(key=lambda x: x['candidate_volume'], reverse=True) return CandidateVolumeResponse( job_id=requisition_id, total_candidate_volume=total_volume, metrics=metrics, heading=( f"For requisitions similar to {requisition_id}, there were {total_volume} candidates over " "the past three years. Here's how many candidates came from each source " "(with percentages from the total number):" ), ) def get_funnel_conversion_by_source(requisition_id: str) -> Union[FunnelConversionResponse, RequisitionNotFoundResponse]: """ Retrieves conversion rates at each funnel stage for each sourcing channel. Args: requisition_id: The specific requisition ID to filter funnel data for. Returns: A dictionary with review %, interview rate, and offer acceptance rate. """ _log_api_call(f"API call: get_funnel_conversion_by_source(requisition_id={requisition_id})") # Check if requisition ID is valid error = _check_requisition_valid(requisition_id) if error: return error loader = get_data_loader() data = loader.get_similar_requisitions(requisition_id) metrics = [] for source in data['source_name'].unique(): source_data = data[data['source_name'] == source] total = len(source_data) if total == 0: continue reviewed = source_data['reviewed'].sum() interviewed = source_data['interviewed'].sum() offered = source_data['offer_extended'].sum() metrics.append({ "source_name": source, "first_round_review_percentage": round(reviewed / total * 100, 1), "interview_rate": round(interviewed / total * 100, 1), "offer_acceptance_rate": round(offered / total * 100, 1), }) # Sort by source name for consistency metrics.sort(key=lambda x: x['source_name']) return FunnelConversionResponse( job_id=requisition_id, metrics=metrics, ) def get_metadata_and_timeframe(requisition_id: str) -> Union[MetadataResponse, RequisitionNotFoundResponse]: """ Retrieves metadata including data timeframe, last update date, and the number of requisitions analysed. Args: requisition_id: The job requisition ID. Returns: A dictionary containing timeframe and requisition summary. """ _log_api_call(f"API call: get_metadata_and_timeframe(requisition_id={requisition_id})") # Check if requisition ID is valid error = _check_requisition_valid(requisition_id) if error: return error loader = get_data_loader() data = loader.get_similar_requisitions(requisition_id) # Get date range from applied_at column min_date = data['applied_at'].min() max_date = data['applied_at'].max() # Count unique requisitions num_requisitions = data['requisition_id'].nunique() # Static dates for reproducible benchmarking # Use actual dates from data but with last_updated fixed for stability return MetadataResponse( job_id=requisition_id, time_frame_start="2023-10-09", time_frame_end="2025-03-15", data_last_updated="2025-04-29", total_requisitions_analysed=num_requisitions, ) def get_definitions_and_methodology(requisition_id: str) -> Union[DefinitionsResponse, RequisitionNotFoundResponse]: """ Provides definitions of key metrics and outlines the methodology used to calculate performance. Args: requisition_id: The specific requisition ID for context. Returns: A dictionary including metric definitions, calculation notes, and the top metrics considered. """ _log_api_call(f"API call: get_definitions_and_methodology(requisition_id={requisition_id})") # Check if requisition ID is valid error = _check_requisition_valid(requisition_id) if error: return error loader = get_data_loader() data = loader.get_similar_requisitions(requisition_id) # Report total requisitions in dataset (full analysis framework) num_total_requisitions = loader.data['requisition_id'].nunique() min_date = data['applied_at'].min() max_date = data['applied_at'].max() years = (max_date - min_date).days / 365.25 return DefinitionsResponse( job_id=requisition_id, definitions={ "sla": "Percentage of candidates reviewed within the defined SLA window (e.g., 48 hours)", "time_to_fill": "Average time from job posting to accepted offer", "success_rate": "Ratio of candidates who accepted offers out of those interviewed", }, calculation_notes=( f"Metrics are computed from {num_total_requisitions} requisitions over the last {years:.1f} years. " "Funnel stats are based on system timestamps and recruiter actions in ATS." ), top_metrics_considered=[ "SLA %", "First round review %", "Offer acceptance rate", "Candidate volume", "Total hires", ], ) def get_source_recommendation_summary(requisition_id: str) -> Union[SourceRecommendationResponse, RequisitionNotFoundResponse]: """ Returns a high-level summary combining jobs-filled %, review %, offer-accept rate, and total hires for each source. Args: requisition_id: The job requisition ID. Returns: A dictionary with composite source metrics. """ _log_api_call(f"API call: get_source_recommendation_summary(requisition_id={requisition_id})") # Check if requisition ID is valid error = _check_requisition_valid(requisition_id) if error: return error loader = get_data_loader() data = loader.get_similar_requisitions(requisition_id) num_requisitions = data['requisition_id'].nunique() metrics = [] for source in data['source_name'].unique(): source_data = data[data['source_name'] == source] total = len(source_data) if total == 0: continue # Calculate metrics reviewed = source_data['reviewed'].sum() hired = source_data['hired'].sum() # Jobs filled percentage: what % of requisitions had at least 1 hire from this source reqs_with_hires = source_data[source_data['hired']]['requisition_id'].nunique() jobs_filled_pct = int(reqs_with_hires / num_requisitions * 100) # Offer acceptance rate: of those who got offers, how many accepted? offers = source_data['offer_extended'].sum() accepted = source_data['offer_accepted'].sum() offer_accept_rate = round(accepted / offers * 100) if offers > 0 else 0 metrics.append({ "source_name": source, "jobs_filled_percentage": jobs_filled_pct, "first_round_review_percentage": int(reviewed / total * 100), "offer_acceptance_rate": offer_accept_rate, "total_hires": int(hired), }) # Sort by source name metrics.sort(key=lambda x: x['source_name']) return SourceRecommendationResponse( total_requisitions=num_requisitions, metrics=metrics, )