import asyncio import json import re from typing import List, Dict, Any from app.utils.groq_client import get_groq_completion from app.models.schemas import ( Candidate, NormalizedCandidate, RerankResult, DeepReview, FinalShortlist, FinalRank, EvaluationResponse ) from app.services.matching_service import match_service from app.prompts.templates import ( STAGE1_NORMALIZATION_PROMPT, STAGE3_RERANK_PROMPT, STAGE4_DEEP_REVIEW_PROMPT, STAGE5_FINAL_SELECTION_PROMPT ) # Concurrency Throttling sem = asyncio.Semaphore(3) async def get_completion_with_sem(messages): async with sem: return await get_groq_completion(messages) async def normalize_candidate(jd: str, candidate: Candidate) -> NormalizedCandidate: candidate_raw = candidate.model_dump_json() resp = await get_completion_with_sem([ {"role": "system", "content": "You are a professional data normalizer. Output JSON ONLY."}, {"role": "user", "content": STAGE1_NORMALIZATION_PROMPT.format(jd=jd, candidate_raw=candidate_raw)} ]) try: match = re.search(r'\{.*\}', resp, re.DOTALL) data = json.loads(match.group() if match else resp) return NormalizedCandidate(**data) except Exception as e: print(f"Failed to normalize {candidate.name}: {e}") # Return a fallback object return NormalizedCandidate( candidate_id=candidate.id, name=candidate.name, normalized_title="Unknown", experience_years=0, primary_skills=[], secondary_skills=[], backend_score=0, frontend_score=0, cloud_score=0, database_score=0, notice_period_days=0, location="Unknown", employment_status="Unknown", salary_expectation="Unknown", flags=["Parsing Error"] ) async def rerank_candidate(jd: str, normalized: NormalizedCandidate) -> RerankResult: resp = await get_completion_with_sem([ {"role": "system", "content": "You are a recruitment scoring engine. Output JSON ONLY."}, {"role": "user", "content": STAGE3_RERANK_PROMPT.format(jd=jd, normalized_candidate=normalized.model_dump_json())} ]) try: match = re.search(r'\{.*\}', resp, re.DOTALL) data = json.loads(match.group() if match else resp) return RerankResult(**data) except: return RerankResult(candidate_id=normalized.candidate_id, scores={}, final_score=0, decision="reject") async def review_candidate(jd: str, candidate_data: str, score: float, cand_id: str) -> DeepReview: resp = await get_completion_with_sem([ {"role": "system", "content": "You are a senior hiring evaluator. Output JSON ONLY."}, {"role": "user", "content": STAGE4_DEEP_REVIEW_PROMPT.format(jd=jd, candidate_data=candidate_data, score=score)} ]) try: match = re.search(r'\{.*\}', resp, re.DOTALL) data = json.loads(match.group() if match else resp) data["candidate_id"] = cand_id return DeepReview(**data) except: return DeepReview(candidate_id=cand_id, verdict="reject", why="Error in evaluation", strengths=[], risks=[], hidden_signal="", confidence=0) async def perform_hybrid_evaluation(jd: str, candidates: List[Candidate]) -> EvaluationResponse: # 1. Normalization (Stage 1) - All candidates normalization_tasks = [normalize_candidate(jd, c) for c in candidates] normalized_candidates = await asyncio.gather(*normalization_tasks) # Map for easy lookup normalized_map = {n.candidate_id: n for n in normalized_candidates} candidate_map = {c.id: c for c in candidates} # 2. Embedding Matching (Stage 2) - Retrieves Top 20 # We pass the normalized summary/skills for better matching top_20 = await match_service.get_top_candidates(jd, candidates) # 3. Deterministic Reranking (Stage 3) - Top 20 -> Top 10 rerank_tasks = [rerank_candidate(jd, normalized_map[c.id]) for c in top_20] rerank_results = await asyncio.gather(*rerank_tasks) rerank_results.sort(key=lambda x: x.final_score, reverse=True) top_10_results = rerank_results[:10] # 4. LLM Deep Review (Stage 4) - Top 5 Only top_5_for_review = top_10_results[:5] review_tasks = [ review_candidate( jd, candidate_map[r.candidate_id].model_dump_json(), r.final_score, r.candidate_id ) for r in top_5_for_review ] review_results = await asyncio.gather(*review_tasks) review_map = {rev.candidate_id: rev for rev in review_results} # 5. Final Selection (Stage 5) reviews_json = json.dumps([r.model_dump() for r in review_results]) final_resp = await get_completion_with_sem([ {"role": "system", "content": "You are the final hiring decision officer. Output JSON ONLY."}, {"role": "user", "content": STAGE5_FINAL_SELECTION_PROMPT.format(all_top_5_results=reviews_json)} ]) try: match = re.search(r'\{.*\}', final_resp, re.DOTALL) final_data = json.loads(match.group() if match else final_resp) shortlist = FinalShortlist(**final_data) except: # Fallback ranking if synthesis fails shortlist = FinalShortlist(final_ranking=[ FinalRank(rank=i+1, candidate_id=r.candidate_id, name=candidate_map[r.candidate_id].name, decision=r.decision, reason="Automatic ranking") for i, r in enumerate(top_5_for_review) ]) return EvaluationResponse( shortlist=shortlist.final_ranking, details={rev.candidate_id: rev.model_dump() for rev in review_results} )