Spaces:
No application file
No application file
| import asyncio | |
| import json | |
| import re | |
| import logging | |
| from typing import List, Dict, Any, Callable, Optional | |
| from app.utils.groq_client import get_groq_completion | |
| from app.models.schemas import ( | |
| Candidate, NormalizedCandidate, RerankResult, | |
| DeepReview, FinalShortlist, FinalRank, EvaluationResponse, | |
| ) | |
| from app.services.matching_service import match_service | |
| from app.prompts.templates import ( | |
| STAGE1_NORMALIZATION_PROMPT, | |
| STAGE3_RERANK_PROMPT, | |
| STAGE4_DEEP_REVIEW_PROMPT, | |
| STAGE5_FINAL_SELECTION_PROMPT, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Concurrency throttle β max 3 parallel Groq calls | |
| sem = asyncio.Semaphore(3) | |
| async def _llm(messages: list) -> str: | |
| async with sem: | |
| return await get_groq_completion(messages) | |
| def _parse_json(raw: str) -> dict: | |
| """Extract first JSON object from LLM response.""" | |
| match = re.search(r'\{.*\}', raw, re.DOTALL) | |
| if match: | |
| return json.loads(match.group()) | |
| return json.loads(raw) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 1 β Normalize | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def normalize_candidate(jd: str, candidate: Candidate) -> NormalizedCandidate: | |
| candidate_raw = candidate.model_dump_json() | |
| prompt = STAGE1_NORMALIZATION_PROMPT.format( | |
| jd=jd, | |
| candidate_raw=candidate_raw, | |
| candidate_id=candidate.id, | |
| ) | |
| resp = await _llm([ | |
| {"role": "system", "content": "You are a professional data normalizer. Output JSON ONLY. No markdown."}, | |
| {"role": "user", "content": prompt}, | |
| ]) | |
| try: | |
| data = _parse_json(resp) | |
| data["candidate_id"] = candidate.id # Ensure ID is always correct | |
| return NormalizedCandidate(**data) | |
| except Exception as e: | |
| logger.warning(f"[Stage1] Failed to normalize {candidate.name}: {e}") | |
| return NormalizedCandidate( | |
| candidate_id=candidate.id, | |
| name=candidate.name, | |
| normalized_title="Unknown", | |
| experience_years=0, | |
| primary_skills=[], | |
| secondary_skills=[], | |
| backend_score=0, | |
| frontend_score=0, | |
| cloud_score=0, | |
| database_score=0, | |
| notice_period_days=90, | |
| location="Unknown", | |
| employment_status="Unknown", | |
| salary_expectation="Unknown", | |
| flags=["Normalization Error"], | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 3 β Rerank | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def rerank_candidate(jd: str, normalized: NormalizedCandidate) -> RerankResult: | |
| resp = await _llm([ | |
| {"role": "system", "content": "You are a recruitment scoring engine. Output JSON ONLY. No markdown."}, | |
| {"role": "user", "content": STAGE3_RERANK_PROMPT.format( | |
| jd=jd, | |
| normalized_candidate=normalized.model_dump_json(), | |
| )}, | |
| ]) | |
| try: | |
| data = _parse_json(resp) | |
| data["candidate_id"] = normalized.candidate_id | |
| return RerankResult(**data) | |
| except Exception as e: | |
| logger.warning(f"[Stage3] Rerank failed for {normalized.candidate_id}: {e}") | |
| return RerankResult( | |
| candidate_id=normalized.candidate_id, | |
| scores={}, | |
| final_score=0, | |
| decision="reject", | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Stage 4 β Deep Review | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def review_candidate( | |
| jd: str, candidate: Candidate, score: float | |
| ) -> DeepReview: | |
| resp = await _llm([ | |
| {"role": "system", "content": "You are a senior hiring evaluator. Output JSON ONLY. No markdown."}, | |
| {"role": "user", "content": STAGE4_DEEP_REVIEW_PROMPT.format( | |
| jd=jd, | |
| candidate_data=candidate.model_dump_json(), | |
| score=round(score, 1), | |
| )}, | |
| ]) | |
| try: | |
| data = _parse_json(resp) | |
| data["candidate_id"] = candidate.id | |
| return DeepReview(**data) | |
| except Exception as e: | |
| logger.warning(f"[Stage4] Deep review failed for {candidate.id}: {e}") | |
| return DeepReview( | |
| candidate_id=candidate.id, | |
| verdict="reject", | |
| why="Evaluation error β could not parse LLM response.", | |
| strengths=[], | |
| risks=["Evaluation error"], | |
| hidden_signal="", | |
| confidence=0.0, | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main Pipeline | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def perform_hybrid_evaluation( | |
| jd: str, | |
| candidates: List[Candidate], | |
| progress_cb: Optional[Callable[[str], None]] = None, | |
| ) -> EvaluationResponse: | |
| """ | |
| Full 5-stage hybrid evaluation pipeline. | |
| progress_cb: optional callable for streaming progress logs to UI. | |
| """ | |
| def log(msg: str): | |
| logger.info(msg) | |
| if progress_cb: | |
| progress_cb(msg) | |
| candidate_map = {c.id: c for c in candidates} | |
| # ββ Stage 1: Normalize all candidates ββββββββββββββββββββββ | |
| log(f"[Stage 1] Normalizing {len(candidates)} candidates...") | |
| norm_tasks = [normalize_candidate(jd, c) for c in candidates] | |
| normalized_list: List[NormalizedCandidate] = await asyncio.gather(*norm_tasks) | |
| normalized_map = {n.candidate_id: n for n in normalized_list} | |
| log(f"[Stage 1] β Normalization complete.") | |
| # ββ Stage 2: Embedding matching β Top 20 βββββββββββββββββββ | |
| log(f"[Stage 2] Running embedding match against Pinecone...") | |
| try: | |
| top_20 = await match_service.get_top_candidates(jd, candidates) | |
| except Exception as e: | |
| log(f"[Stage 2] β Pinecone unavailable ({e}). Falling back to all candidates.") | |
| top_20 = candidates[:20] | |
| # Clamp to available | |
| top_20 = top_20[:20] | |
| log(f"[Stage 2] β Retrieved {len(top_20)} candidates.") | |
| # ββ Stage 3: Deterministic rerank β Top 10 βββββββββββββββββ | |
| log(f"[Stage 3] Reranking {len(top_20)} candidates...") | |
| rerank_tasks = [ | |
| rerank_candidate(jd, normalized_map[c.id]) | |
| for c in top_20 | |
| if c.id in normalized_map | |
| ] | |
| rerank_results: List[RerankResult] = await asyncio.gather(*rerank_tasks) | |
| rerank_results.sort(key=lambda x: x.final_score, reverse=True) | |
| top_10 = rerank_results[:10] | |
| log(f"[Stage 3] β Top 10 selected. Scores: {[round(r.final_score, 1) for r in top_10]}") | |
| # ββ Stage 4: LLM deep review β Top 5 ββββββββββββββββββββββ | |
| top_5_results = top_10[:5] | |
| log(f"[Stage 4] Deep reviewing top {len(top_5_results)} candidates...") | |
| review_tasks = [ | |
| review_candidate(jd, candidate_map[r.candidate_id], r.final_score) | |
| for r in top_5_results | |
| if r.candidate_id in candidate_map | |
| ] | |
| reviews: List[DeepReview] = await asyncio.gather(*review_tasks) | |
| review_map = {rev.candidate_id: rev for rev in reviews} | |
| log(f"[Stage 4] β Deep reviews complete.") | |
| # ββ Stage 5: Final synthesis βββββββββββββββββββββββββββββββ | |
| log(f"[Stage 5] Synthesizing final shortlist...") | |
| reviews_json = json.dumps([r.model_dump() for r in reviews]) | |
| final_resp = await _llm([ | |
| {"role": "system", "content": "You are the final hiring decision officer. Output JSON ONLY. No markdown."}, | |
| {"role": "user", "content": STAGE5_FINAL_SELECTION_PROMPT.format( | |
| all_top_5_results=reviews_json | |
| )}, | |
| ]) | |
| try: | |
| final_data = _parse_json(final_resp) | |
| shortlist = FinalShortlist(**final_data) | |
| except Exception as e: | |
| log(f"[Stage 5] β Synthesis parse failed ({e}). Using automatic ranking.") | |
| shortlist = FinalShortlist( | |
| final_ranking=[ | |
| FinalRank( | |
| rank=i + 1, | |
| candidate_id=r.candidate_id, | |
| name=candidate_map.get(r.candidate_id, Candidate(id=r.candidate_id, name="Unknown")).name, | |
| decision=review_map.get(r.candidate_id, DeepReview( | |
| candidate_id=r.candidate_id, verdict="consider", why="", strengths=[], | |
| risks=[], hidden_signal="", confidence=0 | |
| )).verdict, | |
| reason="Auto-ranked by rerank score.", | |
| ) | |
| for i, r in enumerate(top_5_results) | |
| ] | |
| ) | |
| log(f"[Stage 5] β Pipeline complete. {len(shortlist.final_ranking)} candidates shortlisted.") | |
| return EvaluationResponse( | |
| shortlist=shortlist.final_ranking, | |
| details={rev.candidate_id: rev.model_dump() for rev in reviews}, | |
| ) | |