Coderound_Comeback / evaluation_service.py
cloud450's picture
Upload 11 files
b2efd24 verified
import asyncio
import json
import re
import logging
from typing import List, Dict, Any, Callable, Optional
from app.utils.groq_client import get_groq_completion
from app.models.schemas import (
Candidate, NormalizedCandidate, RerankResult,
DeepReview, FinalShortlist, FinalRank, EvaluationResponse,
)
from app.services.matching_service import match_service
from app.prompts.templates import (
STAGE1_NORMALIZATION_PROMPT,
STAGE3_RERANK_PROMPT,
STAGE4_DEEP_REVIEW_PROMPT,
STAGE5_FINAL_SELECTION_PROMPT,
)
logger = logging.getLogger(__name__)
# Concurrency throttle β€” max 3 parallel Groq calls
sem = asyncio.Semaphore(3)
async def _llm(messages: list) -> str:
async with sem:
return await get_groq_completion(messages)
def _parse_json(raw: str) -> dict:
"""Extract first JSON object from LLM response."""
match = re.search(r'\{.*\}', raw, re.DOTALL)
if match:
return json.loads(match.group())
return json.loads(raw)
# ────────────────────────────────────────────────
# Stage 1 β€” Normalize
# ────────────────────────────────────────────────
async def normalize_candidate(jd: str, candidate: Candidate) -> NormalizedCandidate:
candidate_raw = candidate.model_dump_json()
prompt = STAGE1_NORMALIZATION_PROMPT.format(
jd=jd,
candidate_raw=candidate_raw,
candidate_id=candidate.id,
)
resp = await _llm([
{"role": "system", "content": "You are a professional data normalizer. Output JSON ONLY. No markdown."},
{"role": "user", "content": prompt},
])
try:
data = _parse_json(resp)
data["candidate_id"] = candidate.id # Ensure ID is always correct
return NormalizedCandidate(**data)
except Exception as e:
logger.warning(f"[Stage1] Failed to normalize {candidate.name}: {e}")
return NormalizedCandidate(
candidate_id=candidate.id,
name=candidate.name,
normalized_title="Unknown",
experience_years=0,
primary_skills=[],
secondary_skills=[],
backend_score=0,
frontend_score=0,
cloud_score=0,
database_score=0,
notice_period_days=90,
location="Unknown",
employment_status="Unknown",
salary_expectation="Unknown",
flags=["Normalization Error"],
)
# ────────────────────────────────────────────────
# Stage 3 β€” Rerank
# ────────────────────────────────────────────────
async def rerank_candidate(jd: str, normalized: NormalizedCandidate) -> RerankResult:
resp = await _llm([
{"role": "system", "content": "You are a recruitment scoring engine. Output JSON ONLY. No markdown."},
{"role": "user", "content": STAGE3_RERANK_PROMPT.format(
jd=jd,
normalized_candidate=normalized.model_dump_json(),
)},
])
try:
data = _parse_json(resp)
data["candidate_id"] = normalized.candidate_id
return RerankResult(**data)
except Exception as e:
logger.warning(f"[Stage3] Rerank failed for {normalized.candidate_id}: {e}")
return RerankResult(
candidate_id=normalized.candidate_id,
scores={},
final_score=0,
decision="reject",
)
# ────────────────────────────────────────────────
# Stage 4 β€” Deep Review
# ────────────────────────────────────────────────
async def review_candidate(
jd: str, candidate: Candidate, score: float
) -> DeepReview:
resp = await _llm([
{"role": "system", "content": "You are a senior hiring evaluator. Output JSON ONLY. No markdown."},
{"role": "user", "content": STAGE4_DEEP_REVIEW_PROMPT.format(
jd=jd,
candidate_data=candidate.model_dump_json(),
score=round(score, 1),
)},
])
try:
data = _parse_json(resp)
data["candidate_id"] = candidate.id
return DeepReview(**data)
except Exception as e:
logger.warning(f"[Stage4] Deep review failed for {candidate.id}: {e}")
return DeepReview(
candidate_id=candidate.id,
verdict="reject",
why="Evaluation error β€” could not parse LLM response.",
strengths=[],
risks=["Evaluation error"],
hidden_signal="",
confidence=0.0,
)
# ────────────────────────────────────────────────
# Main Pipeline
# ────────────────────────────────────────────────
async def perform_hybrid_evaluation(
jd: str,
candidates: List[Candidate],
progress_cb: Optional[Callable[[str], None]] = None,
) -> EvaluationResponse:
"""
Full 5-stage hybrid evaluation pipeline.
progress_cb: optional callable for streaming progress logs to UI.
"""
def log(msg: str):
logger.info(msg)
if progress_cb:
progress_cb(msg)
candidate_map = {c.id: c for c in candidates}
# ── Stage 1: Normalize all candidates ──────────────────────
log(f"[Stage 1] Normalizing {len(candidates)} candidates...")
norm_tasks = [normalize_candidate(jd, c) for c in candidates]
normalized_list: List[NormalizedCandidate] = await asyncio.gather(*norm_tasks)
normalized_map = {n.candidate_id: n for n in normalized_list}
log(f"[Stage 1] βœ“ Normalization complete.")
# ── Stage 2: Embedding matching β†’ Top 20 ───────────────────
log(f"[Stage 2] Running embedding match against Pinecone...")
try:
top_20 = await match_service.get_top_candidates(jd, candidates)
except Exception as e:
log(f"[Stage 2] ⚠ Pinecone unavailable ({e}). Falling back to all candidates.")
top_20 = candidates[:20]
# Clamp to available
top_20 = top_20[:20]
log(f"[Stage 2] βœ“ Retrieved {len(top_20)} candidates.")
# ── Stage 3: Deterministic rerank β†’ Top 10 ─────────────────
log(f"[Stage 3] Reranking {len(top_20)} candidates...")
rerank_tasks = [
rerank_candidate(jd, normalized_map[c.id])
for c in top_20
if c.id in normalized_map
]
rerank_results: List[RerankResult] = await asyncio.gather(*rerank_tasks)
rerank_results.sort(key=lambda x: x.final_score, reverse=True)
top_10 = rerank_results[:10]
log(f"[Stage 3] βœ“ Top 10 selected. Scores: {[round(r.final_score, 1) for r in top_10]}")
# ── Stage 4: LLM deep review β†’ Top 5 ──────────────────────
top_5_results = top_10[:5]
log(f"[Stage 4] Deep reviewing top {len(top_5_results)} candidates...")
review_tasks = [
review_candidate(jd, candidate_map[r.candidate_id], r.final_score)
for r in top_5_results
if r.candidate_id in candidate_map
]
reviews: List[DeepReview] = await asyncio.gather(*review_tasks)
review_map = {rev.candidate_id: rev for rev in reviews}
log(f"[Stage 4] βœ“ Deep reviews complete.")
# ── Stage 5: Final synthesis ───────────────────────────────
log(f"[Stage 5] Synthesizing final shortlist...")
reviews_json = json.dumps([r.model_dump() for r in reviews])
final_resp = await _llm([
{"role": "system", "content": "You are the final hiring decision officer. Output JSON ONLY. No markdown."},
{"role": "user", "content": STAGE5_FINAL_SELECTION_PROMPT.format(
all_top_5_results=reviews_json
)},
])
try:
final_data = _parse_json(final_resp)
shortlist = FinalShortlist(**final_data)
except Exception as e:
log(f"[Stage 5] ⚠ Synthesis parse failed ({e}). Using automatic ranking.")
shortlist = FinalShortlist(
final_ranking=[
FinalRank(
rank=i + 1,
candidate_id=r.candidate_id,
name=candidate_map.get(r.candidate_id, Candidate(id=r.candidate_id, name="Unknown")).name,
decision=review_map.get(r.candidate_id, DeepReview(
candidate_id=r.candidate_id, verdict="consider", why="", strengths=[],
risks=[], hidden_signal="", confidence=0
)).verdict,
reason="Auto-ranked by rerank score.",
)
for i, r in enumerate(top_5_results)
]
)
log(f"[Stage 5] βœ“ Pipeline complete. {len(shortlist.final_ranking)} candidates shortlisted.")
return EvaluationResponse(
shortlist=shortlist.final_ranking,
details={rev.candidate_id: rev.model_dump() for rev in reviews},
)