| import asyncio |
| import json |
| import re |
| from typing import List, Dict, Any |
| from app.utils.groq_client import get_groq_completion |
| from app.models.schemas import ( |
| Candidate, NormalizedCandidate, RerankResult, |
| DeepReview, FinalShortlist, FinalRank, EvaluationResponse |
| ) |
| from app.services.matching_service import match_service |
| from app.prompts.templates import ( |
| STAGE1_NORMALIZATION_PROMPT, |
| STAGE3_RERANK_PROMPT, |
| STAGE4_DEEP_REVIEW_PROMPT, |
| STAGE5_FINAL_SELECTION_PROMPT |
| ) |
|
|
| |
| sem = asyncio.Semaphore(3) |
|
|
| async def get_completion_with_sem(messages): |
| async with sem: |
| return await get_groq_completion(messages) |
|
|
| async def normalize_candidate(jd: str, candidate: Candidate) -> NormalizedCandidate: |
| candidate_raw = candidate.model_dump_json() |
| resp = await get_completion_with_sem([ |
| {"role": "system", "content": "You are a professional data normalizer. Output JSON ONLY."}, |
| {"role": "user", "content": STAGE1_NORMALIZATION_PROMPT.format(jd=jd, candidate_raw=candidate_raw)} |
| ]) |
| try: |
| match = re.search(r'\{.*\}', resp, re.DOTALL) |
| data = json.loads(match.group() if match else resp) |
| return NormalizedCandidate(**data) |
| except Exception as e: |
| print(f"Failed to normalize {candidate.name}: {e}") |
| |
| return NormalizedCandidate( |
| candidate_id=candidate.id, name=candidate.name, normalized_title="Unknown", |
| experience_years=0, primary_skills=[], secondary_skills=[], |
| backend_score=0, frontend_score=0, cloud_score=0, database_score=0, |
| notice_period_days=0, location="Unknown", employment_status="Unknown", |
| salary_expectation="Unknown", flags=["Parsing Error"] |
| ) |
|
|
| async def rerank_candidate(jd: str, normalized: NormalizedCandidate) -> RerankResult: |
| resp = await get_completion_with_sem([ |
| {"role": "system", "content": "You are a recruitment scoring engine. Output JSON ONLY."}, |
| {"role": "user", "content": STAGE3_RERANK_PROMPT.format(jd=jd, normalized_candidate=normalized.model_dump_json())} |
| ]) |
| try: |
| match = re.search(r'\{.*\}', resp, re.DOTALL) |
| data = json.loads(match.group() if match else resp) |
| return RerankResult(**data) |
| except: |
| return RerankResult(candidate_id=normalized.candidate_id, scores={}, final_score=0, decision="reject") |
|
|
| async def review_candidate(jd: str, candidate_data: str, score: float, cand_id: str) -> DeepReview: |
| resp = await get_completion_with_sem([ |
| {"role": "system", "content": "You are a senior hiring evaluator. Output JSON ONLY."}, |
| {"role": "user", "content": STAGE4_DEEP_REVIEW_PROMPT.format(jd=jd, candidate_data=candidate_data, score=score)} |
| ]) |
| try: |
| match = re.search(r'\{.*\}', resp, re.DOTALL) |
| data = json.loads(match.group() if match else resp) |
| data["candidate_id"] = cand_id |
| return DeepReview(**data) |
| except: |
| return DeepReview(candidate_id=cand_id, verdict="reject", why="Error in evaluation", strengths=[], risks=[], hidden_signal="", confidence=0) |
|
|
| async def perform_hybrid_evaluation(jd: str, candidates: List[Candidate]) -> EvaluationResponse: |
| |
| normalization_tasks = [normalize_candidate(jd, c) for c in candidates] |
| normalized_candidates = await asyncio.gather(*normalization_tasks) |
| |
| |
| normalized_map = {n.candidate_id: n for n in normalized_candidates} |
| candidate_map = {c.id: c for c in candidates} |
| |
| |
| |
| top_20 = await match_service.get_top_candidates(jd, candidates) |
| |
| |
| rerank_tasks = [rerank_candidate(jd, normalized_map[c.id]) for c in top_20] |
| rerank_results = await asyncio.gather(*rerank_tasks) |
| rerank_results.sort(key=lambda x: x.final_score, reverse=True) |
| top_10_results = rerank_results[:10] |
| |
| |
| top_5_for_review = top_10_results[:5] |
| review_tasks = [ |
| review_candidate( |
| jd, |
| candidate_map[r.candidate_id].model_dump_json(), |
| r.final_score, |
| r.candidate_id |
| ) for r in top_5_for_review |
| ] |
| review_results = await asyncio.gather(*review_tasks) |
| review_map = {rev.candidate_id: rev for rev in review_results} |
| |
| |
| reviews_json = json.dumps([r.model_dump() for r in review_results]) |
| final_resp = await get_completion_with_sem([ |
| {"role": "system", "content": "You are the final hiring decision officer. Output JSON ONLY."}, |
| {"role": "user", "content": STAGE5_FINAL_SELECTION_PROMPT.format(all_top_5_results=reviews_json)} |
| ]) |
| |
| try: |
| match = re.search(r'\{.*\}', final_resp, re.DOTALL) |
| final_data = json.loads(match.group() if match else final_resp) |
| shortlist = FinalShortlist(**final_data) |
| except: |
| |
| shortlist = FinalShortlist(final_ranking=[ |
| FinalRank(rank=i+1, candidate_id=r.candidate_id, name=candidate_map[r.candidate_id].name, decision=r.decision, reason="Automatic ranking") |
| for i, r in enumerate(top_5_for_review) |
| ]) |
|
|
| return EvaluationResponse( |
| shortlist=shortlist.final_ranking, |
| details={rev.candidate_id: rev.model_dump() for rev in review_results} |
| ) |
|
|