import asyncio import json import re import logging from typing import List, Dict, Any, Callable, Optional from app.utils.groq_client import get_groq_completion from app.models.schemas import ( Candidate, NormalizedCandidate, RerankResult, DeepReview, FinalShortlist, FinalRank, EvaluationResponse, ) from app.services.matching_service import match_service from app.prompts.templates import ( STAGE1_NORMALIZATION_PROMPT, STAGE3_RERANK_PROMPT, STAGE4_DEEP_REVIEW_PROMPT, STAGE5_FINAL_SELECTION_PROMPT, ) logger = logging.getLogger(__name__) # Concurrency throttle — max 3 parallel Groq calls sem = asyncio.Semaphore(3) async def _llm(messages: list) -> str: async with sem: return await get_groq_completion(messages) def _parse_json(raw: str) -> dict: """Extract first JSON object from LLM response.""" match = re.search(r'\{.*\}', raw, re.DOTALL) if match: return json.loads(match.group()) return json.loads(raw) # ──────────────────────────────────────────────── # Stage 1 — Normalize # ──────────────────────────────────────────────── async def normalize_candidate(jd: str, candidate: Candidate) -> NormalizedCandidate: candidate_raw = candidate.model_dump_json() prompt = STAGE1_NORMALIZATION_PROMPT.format( jd=jd, candidate_raw=candidate_raw, candidate_id=candidate.id, ) resp = await _llm([ {"role": "system", "content": "You are a professional data normalizer. Output JSON ONLY. No markdown."}, {"role": "user", "content": prompt}, ]) try: data = _parse_json(resp) data["candidate_id"] = candidate.id # Ensure ID is always correct return NormalizedCandidate(**data) except Exception as e: logger.warning(f"[Stage1] Failed to normalize {candidate.name}: {e}") return NormalizedCandidate( candidate_id=candidate.id, name=candidate.name, normalized_title="Unknown", experience_years=0, primary_skills=[], secondary_skills=[], backend_score=0, frontend_score=0, cloud_score=0, database_score=0, notice_period_days=90, location="Unknown", employment_status="Unknown", salary_expectation="Unknown", flags=["Normalization Error"], ) # ──────────────────────────────────────────────── # Stage 3 — Rerank # ──────────────────────────────────────────────── async def rerank_candidate(jd: str, normalized: NormalizedCandidate) -> RerankResult: resp = await _llm([ {"role": "system", "content": "You are a recruitment scoring engine. Output JSON ONLY. No markdown."}, {"role": "user", "content": STAGE3_RERANK_PROMPT.format( jd=jd, normalized_candidate=normalized.model_dump_json(), )}, ]) try: data = _parse_json(resp) data["candidate_id"] = normalized.candidate_id return RerankResult(**data) except Exception as e: logger.warning(f"[Stage3] Rerank failed for {normalized.candidate_id}: {e}") return RerankResult( candidate_id=normalized.candidate_id, scores={}, final_score=0, decision="reject", ) # ──────────────────────────────────────────────── # Stage 4 — Deep Review # ──────────────────────────────────────────────── async def review_candidate( jd: str, candidate: Candidate, score: float ) -> DeepReview: resp = await _llm([ {"role": "system", "content": "You are a senior hiring evaluator. Output JSON ONLY. No markdown."}, {"role": "user", "content": STAGE4_DEEP_REVIEW_PROMPT.format( jd=jd, candidate_data=candidate.model_dump_json(), score=round(score, 1), )}, ]) try: data = _parse_json(resp) data["candidate_id"] = candidate.id return DeepReview(**data) except Exception as e: logger.warning(f"[Stage4] Deep review failed for {candidate.id}: {e}") return DeepReview( candidate_id=candidate.id, verdict="reject", why="Evaluation error — could not parse LLM response.", strengths=[], risks=["Evaluation error"], hidden_signal="", confidence=0.0, ) # ──────────────────────────────────────────────── # Main Pipeline # ──────────────────────────────────────────────── async def perform_hybrid_evaluation( jd: str, candidates: List[Candidate], progress_cb: Optional[Callable[[str], None]] = None, ) -> EvaluationResponse: """ Full 5-stage hybrid evaluation pipeline. progress_cb: optional callable for streaming progress logs to UI. """ def log(msg: str): logger.info(msg) if progress_cb: progress_cb(msg) candidate_map = {c.id: c for c in candidates} # ── Stage 1: Normalize all candidates ────────────────────── log(f"[Stage 1] Normalizing {len(candidates)} candidates...") norm_tasks = [normalize_candidate(jd, c) for c in candidates] normalized_list: List[NormalizedCandidate] = await asyncio.gather(*norm_tasks) normalized_map = {n.candidate_id: n for n in normalized_list} log(f"[Stage 1] ✓ Normalization complete.") # ── Stage 2: Embedding matching → Top 20 ─────────────────── log(f"[Stage 2] Running embedding match against Pinecone...") try: top_20 = await match_service.get_top_candidates(jd, candidates) except Exception as e: log(f"[Stage 2] ⚠ Pinecone unavailable ({e}). Falling back to all candidates.") top_20 = candidates[:20] # Clamp to available top_20 = top_20[:20] log(f"[Stage 2] ✓ Retrieved {len(top_20)} candidates.") # ── Stage 3: Deterministic rerank → Top 10 ───────────────── log(f"[Stage 3] Reranking {len(top_20)} candidates...") rerank_tasks = [ rerank_candidate(jd, normalized_map[c.id]) for c in top_20 if c.id in normalized_map ] rerank_results: List[RerankResult] = await asyncio.gather(*rerank_tasks) rerank_results.sort(key=lambda x: x.final_score, reverse=True) top_10 = rerank_results[:10] log(f"[Stage 3] ✓ Top 10 selected. Scores: {[round(r.final_score, 1) for r in top_10]}") # ── Stage 4: LLM deep review → Top 5 ────────────────────── top_5_results = top_10[:5] log(f"[Stage 4] Deep reviewing top {len(top_5_results)} candidates...") review_tasks = [ review_candidate(jd, candidate_map[r.candidate_id], r.final_score) for r in top_5_results if r.candidate_id in candidate_map ] reviews: List[DeepReview] = await asyncio.gather(*review_tasks) review_map = {rev.candidate_id: rev for rev in reviews} log(f"[Stage 4] ✓ Deep reviews complete.") # ── Stage 5: Final synthesis ─────────────────────────────── log(f"[Stage 5] Synthesizing final shortlist...") reviews_json = json.dumps([r.model_dump() for r in reviews]) final_resp = await _llm([ {"role": "system", "content": "You are the final hiring decision officer. Output JSON ONLY. No markdown."}, {"role": "user", "content": STAGE5_FINAL_SELECTION_PROMPT.format( all_top_5_results=reviews_json )}, ]) try: final_data = _parse_json(final_resp) shortlist = FinalShortlist(**final_data) except Exception as e: log(f"[Stage 5] ⚠ Synthesis parse failed ({e}). Using automatic ranking.") shortlist = FinalShortlist( final_ranking=[ FinalRank( rank=i + 1, candidate_id=r.candidate_id, name=candidate_map.get(r.candidate_id, Candidate(id=r.candidate_id, name="Unknown")).name, decision=review_map.get(r.candidate_id, DeepReview( candidate_id=r.candidate_id, verdict="consider", why="", strengths=[], risks=[], hidden_signal="", confidence=0 )).verdict, reason="Auto-ranked by rerank score.", ) for i, r in enumerate(top_5_results) ] ) log(f"[Stage 5] ✓ Pipeline complete. {len(shortlist.final_ranking)} candidates shortlisted.") return EvaluationResponse( shortlist=shortlist.final_ranking, details={rev.candidate_id: rev.model_dump() for rev in reviews}, )