File size: 9,571 Bytes
b2efd24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import asyncio
import json
import re
import logging
from typing import List, Dict, Any, Callable, Optional

from app.utils.groq_client import get_groq_completion
from app.models.schemas import (
    Candidate, NormalizedCandidate, RerankResult,
    DeepReview, FinalShortlist, FinalRank, EvaluationResponse,
)
from app.services.matching_service import match_service
from app.prompts.templates import (
    STAGE1_NORMALIZATION_PROMPT,
    STAGE3_RERANK_PROMPT,
    STAGE4_DEEP_REVIEW_PROMPT,
    STAGE5_FINAL_SELECTION_PROMPT,
)

logger = logging.getLogger(__name__)

# Concurrency throttle β€” max 3 parallel Groq calls
sem = asyncio.Semaphore(3)


async def _llm(messages: list) -> str:
    async with sem:
        return await get_groq_completion(messages)


def _parse_json(raw: str) -> dict:
    """Extract first JSON object from LLM response."""
    match = re.search(r'\{.*\}', raw, re.DOTALL)
    if match:
        return json.loads(match.group())
    return json.loads(raw)


# ────────────────────────────────────────────────
# Stage 1 β€” Normalize
# ────────────────────────────────────────────────
async def normalize_candidate(jd: str, candidate: Candidate) -> NormalizedCandidate:
    candidate_raw = candidate.model_dump_json()
    prompt = STAGE1_NORMALIZATION_PROMPT.format(
        jd=jd,
        candidate_raw=candidate_raw,
        candidate_id=candidate.id,
    )
    resp = await _llm([
        {"role": "system", "content": "You are a professional data normalizer. Output JSON ONLY. No markdown."},
        {"role": "user", "content": prompt},
    ])
    try:
        data = _parse_json(resp)
        data["candidate_id"] = candidate.id  # Ensure ID is always correct
        return NormalizedCandidate(**data)
    except Exception as e:
        logger.warning(f"[Stage1] Failed to normalize {candidate.name}: {e}")
        return NormalizedCandidate(
            candidate_id=candidate.id,
            name=candidate.name,
            normalized_title="Unknown",
            experience_years=0,
            primary_skills=[],
            secondary_skills=[],
            backend_score=0,
            frontend_score=0,
            cloud_score=0,
            database_score=0,
            notice_period_days=90,
            location="Unknown",
            employment_status="Unknown",
            salary_expectation="Unknown",
            flags=["Normalization Error"],
        )


# ────────────────────────────────────────────────
# Stage 3 β€” Rerank
# ────────────────────────────────────────────────
async def rerank_candidate(jd: str, normalized: NormalizedCandidate) -> RerankResult:
    resp = await _llm([
        {"role": "system", "content": "You are a recruitment scoring engine. Output JSON ONLY. No markdown."},
        {"role": "user", "content": STAGE3_RERANK_PROMPT.format(
            jd=jd,
            normalized_candidate=normalized.model_dump_json(),
        )},
    ])
    try:
        data = _parse_json(resp)
        data["candidate_id"] = normalized.candidate_id
        return RerankResult(**data)
    except Exception as e:
        logger.warning(f"[Stage3] Rerank failed for {normalized.candidate_id}: {e}")
        return RerankResult(
            candidate_id=normalized.candidate_id,
            scores={},
            final_score=0,
            decision="reject",
        )


# ────────────────────────────────────────────────
# Stage 4 β€” Deep Review
# ────────────────────────────────────────────────
async def review_candidate(
    jd: str, candidate: Candidate, score: float
) -> DeepReview:
    resp = await _llm([
        {"role": "system", "content": "You are a senior hiring evaluator. Output JSON ONLY. No markdown."},
        {"role": "user", "content": STAGE4_DEEP_REVIEW_PROMPT.format(
            jd=jd,
            candidate_data=candidate.model_dump_json(),
            score=round(score, 1),
        )},
    ])
    try:
        data = _parse_json(resp)
        data["candidate_id"] = candidate.id
        return DeepReview(**data)
    except Exception as e:
        logger.warning(f"[Stage4] Deep review failed for {candidate.id}: {e}")
        return DeepReview(
            candidate_id=candidate.id,
            verdict="reject",
            why="Evaluation error β€” could not parse LLM response.",
            strengths=[],
            risks=["Evaluation error"],
            hidden_signal="",
            confidence=0.0,
        )


# ────────────────────────────────────────────────
# Main Pipeline
# ────────────────────────────────────────────────
async def perform_hybrid_evaluation(
    jd: str,
    candidates: List[Candidate],
    progress_cb: Optional[Callable[[str], None]] = None,
) -> EvaluationResponse:
    """
    Full 5-stage hybrid evaluation pipeline.
    progress_cb: optional callable for streaming progress logs to UI.
    """

    def log(msg: str):
        logger.info(msg)
        if progress_cb:
            progress_cb(msg)

    candidate_map = {c.id: c for c in candidates}

    # ── Stage 1: Normalize all candidates ──────────────────────
    log(f"[Stage 1] Normalizing {len(candidates)} candidates...")
    norm_tasks = [normalize_candidate(jd, c) for c in candidates]
    normalized_list: List[NormalizedCandidate] = await asyncio.gather(*norm_tasks)
    normalized_map = {n.candidate_id: n for n in normalized_list}
    log(f"[Stage 1] βœ“ Normalization complete.")

    # ── Stage 2: Embedding matching β†’ Top 20 ───────────────────
    log(f"[Stage 2] Running embedding match against Pinecone...")
    try:
        top_20 = await match_service.get_top_candidates(jd, candidates)
    except Exception as e:
        log(f"[Stage 2] ⚠ Pinecone unavailable ({e}). Falling back to all candidates.")
        top_20 = candidates[:20]

    # Clamp to available
    top_20 = top_20[:20]
    log(f"[Stage 2] βœ“ Retrieved {len(top_20)} candidates.")

    # ── Stage 3: Deterministic rerank β†’ Top 10 ─────────────────
    log(f"[Stage 3] Reranking {len(top_20)} candidates...")
    rerank_tasks = [
        rerank_candidate(jd, normalized_map[c.id])
        for c in top_20
        if c.id in normalized_map
    ]
    rerank_results: List[RerankResult] = await asyncio.gather(*rerank_tasks)
    rerank_results.sort(key=lambda x: x.final_score, reverse=True)
    top_10 = rerank_results[:10]
    log(f"[Stage 3] βœ“ Top 10 selected. Scores: {[round(r.final_score, 1) for r in top_10]}")

    # ── Stage 4: LLM deep review β†’ Top 5 ──────────────────────
    top_5_results = top_10[:5]
    log(f"[Stage 4] Deep reviewing top {len(top_5_results)} candidates...")
    review_tasks = [
        review_candidate(jd, candidate_map[r.candidate_id], r.final_score)
        for r in top_5_results
        if r.candidate_id in candidate_map
    ]
    reviews: List[DeepReview] = await asyncio.gather(*review_tasks)
    review_map = {rev.candidate_id: rev for rev in reviews}
    log(f"[Stage 4] βœ“ Deep reviews complete.")

    # ── Stage 5: Final synthesis ───────────────────────────────
    log(f"[Stage 5] Synthesizing final shortlist...")
    reviews_json = json.dumps([r.model_dump() for r in reviews])
    final_resp = await _llm([
        {"role": "system", "content": "You are the final hiring decision officer. Output JSON ONLY. No markdown."},
        {"role": "user", "content": STAGE5_FINAL_SELECTION_PROMPT.format(
            all_top_5_results=reviews_json
        )},
    ])

    try:
        final_data = _parse_json(final_resp)
        shortlist = FinalShortlist(**final_data)
    except Exception as e:
        log(f"[Stage 5] ⚠ Synthesis parse failed ({e}). Using automatic ranking.")
        shortlist = FinalShortlist(
            final_ranking=[
                FinalRank(
                    rank=i + 1,
                    candidate_id=r.candidate_id,
                    name=candidate_map.get(r.candidate_id, Candidate(id=r.candidate_id, name="Unknown")).name,
                    decision=review_map.get(r.candidate_id, DeepReview(
                        candidate_id=r.candidate_id, verdict="consider", why="", strengths=[],
                        risks=[], hidden_signal="", confidence=0
                    )).verdict,
                    reason="Auto-ranked by rerank score.",
                )
                for i, r in enumerate(top_5_results)
            ]
        )

    log(f"[Stage 5] βœ“ Pipeline complete. {len(shortlist.final_ranking)} candidates shortlisted.")

    return EvaluationResponse(
        shortlist=shortlist.final_ranking,
        details={rev.candidate_id: rev.model_dump() for rev in reviews},
    )