Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import re | |
| import json | |
| import time | |
| import csv | |
| import hashlib | |
| import tempfile | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import numpy as np | |
| import gradio as gr | |
| import pandas as pd | |
| from sentence_transformers import SentenceTransformer, CrossEncoder | |
| from huggingface_hub import InferenceClient | |
| from pydantic import BaseModel, Field | |
| from pypdf import PdfReader | |
| import docx2txt | |
| # ========================================================= | |
| # Models | |
| # ========================================================= | |
| EMBED_MODEL_NAME = os.getenv("EMBED_MODEL_NAME", "BAAI/bge-base-en-v1.5") | |
| RERANK_MODEL_NAME = os.getenv("RERANK_MODEL_NAME", "BAAI/bge-reranker-large") | |
| LLM_MODEL = os.getenv("LLM_MODEL", "meta-llama/Meta-Llama-3.1-8B-Instruct") | |
| # ========================================================= | |
| # Controls | |
| # ========================================================= | |
| CHUNK_SIZE_CHARS = 1100 | |
| CHUNK_OVERLAP_CHARS = 180 | |
| TOP_CHUNKS_PER_CV = 10 | |
| EVIDENCE_CHUNKS_PER_CV = 4 | |
| LLM_BATCH_SIZE = int(os.getenv("LLM_BATCH_SIZE", "4")) | |
| LLM_MAX_TOKENS = int(os.getenv("LLM_MAX_TOKENS", "3500")) | |
| LLM_TEMPERATURE = float(os.getenv("LLM_TEMPERATURE", "0.15")) | |
| MAX_CV_CHARS = 120_000 | |
| MAX_JD_CHARS = 60_000 | |
| MAX_CV_UPLOADS = 20 # ✅ requested max | |
| # Global singletons | |
| _embedder: Optional[SentenceTransformer] = None | |
| _reranker: Optional[CrossEncoder] = None | |
| _hf_client: Optional[InferenceClient] = None | |
| # ========================================================= | |
| # Output schemas (LLM returns JSON) | |
| # ========================================================= | |
| class RequirementCheck(BaseModel): | |
| requirement: str | |
| status: str = Field(..., description="met | partial | missing") | |
| evidence: str = Field(..., description="short CV snippet quote, <=160 chars, or empty if missing") | |
| class CandidateLLMResult(BaseModel): | |
| filename: str | |
| final_score: float = Field(..., description="0-100") | |
| fit_level: str = Field(..., description="excellent | good | maybe | weak") | |
| summary: str | |
| strengths: List[str] | |
| gaps: List[str] | |
| risks: List[str] | |
| checklist: List[RequirementCheck] | |
| top_evidence: List[str] | |
| class LLMRankingOutput(BaseModel): | |
| ranked: List[CandidateLLMResult] | |
| overall_notes: str | |
| # ========================================================= | |
| # Utilities | |
| # ========================================================= | |
| def ensure_models(): | |
| global _embedder, _reranker | |
| if _embedder is None: | |
| _embedder = SentenceTransformer(EMBED_MODEL_NAME) | |
| if _reranker is None: | |
| _reranker = CrossEncoder(RERANK_MODEL_NAME) | |
| def get_hf_client() -> InferenceClient: | |
| global _hf_client | |
| if _hf_client is not None: | |
| return _hf_client | |
| token = os.getenv("HF_TOKEN", "").strip() | |
| if not token: | |
| raise RuntimeError("HF_TOKEN is not set. Add it in Space Settings → Repository secrets.") | |
| _hf_client = InferenceClient(token=token) | |
| return _hf_client | |
| def gr_file_to_path(f: Any) -> Optional[str]: | |
| if f is None: | |
| return None | |
| if isinstance(f, str): | |
| return f | |
| if isinstance(f, dict) and "path" in f: | |
| return f["path"] | |
| if hasattr(f, "name"): | |
| return f.name | |
| return None | |
| def clean_text(t: str) -> str: | |
| t = (t or "").replace("\x00", " ") | |
| t = re.sub(r"[ \t]+", " ", t) | |
| t = re.sub(r"\n{3,}", "\n\n", t) | |
| return t.strip() | |
| def mask_pii(text: str) -> str: | |
| text = re.sub(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", "[EMAIL]", text) | |
| text = re.sub(r"(\+?\d[\d\-\s]{7,}\d)", "[PHONE]", text) | |
| return text | |
| def chunk_text_safe(text: str, chunk_size: int = CHUNK_SIZE_CHARS, overlap: int = CHUNK_OVERLAP_CHARS) -> List[str]: | |
| text = (text or "").strip() | |
| if not text: | |
| return [] | |
| chunks = [] | |
| i = 0 | |
| n = len(text) | |
| while i < n: | |
| j = min(i + chunk_size, n) | |
| ch = text[i:j].strip() | |
| if ch: | |
| chunks.append(ch) | |
| if j == n: | |
| break | |
| i = max(0, j - overlap) | |
| return chunks | |
| def read_file_to_text(file_path: str) -> str: | |
| lower = file_path.lower() | |
| if lower.endswith(".pdf"): | |
| reader = PdfReader(file_path) | |
| parts = [] | |
| for page in reader.pages: | |
| parts.append(page.extract_text() or "") | |
| return "\n".join(parts).strip() | |
| if lower.endswith(".docx"): | |
| return (docx2txt.process(file_path) or "").strip() | |
| with open(file_path, "rb") as f: | |
| raw = f.read() | |
| try: | |
| return raw.decode("utf-8", errors="ignore").strip() | |
| except Exception: | |
| return raw.decode(errors="ignore").strip() | |
| def file_bytes_hash(path: str) -> str: | |
| with open(path, "rb") as f: | |
| return hashlib.sha256(f.read()).hexdigest() | |
| def cosine_sim_matrix(a: np.ndarray, b: np.ndarray) -> np.ndarray: | |
| a_norm = a / (np.linalg.norm(a, axis=1, keepdims=True) + 1e-12) | |
| b_norm = b / (np.linalg.norm(b, axis=1, keepdims=True) + 1e-12) | |
| return np.matmul(a_norm, b_norm.T) | |
| def sigmoid(x: float) -> float: | |
| return 1.0 / (1.0 + np.exp(-x)) | |
| def clamp(x: float, lo: float, hi: float) -> float: | |
| return max(lo, min(hi, x)) | |
| # ========================================================= | |
| # Contact extraction (Name / Email / Phone) | |
| # ========================================================= | |
| _EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b") | |
| _PHONE_RE = re.compile(r"(?:\+?\d{1,3}[\s\-]?)?(?:\(?\d{2,4}\)?[\s\-]?)?\d{3,4}[\s\-]?\d{3,4}") | |
| def _normalize_phone(p: str) -> str: | |
| p = re.sub(r"[^\d+]", "", p) | |
| return p | |
| def guess_name(text: str) -> str: | |
| lines = [ln.strip() for ln in (text or "").splitlines() if ln.strip()] | |
| for ln in lines[:12]: | |
| if "@" in ln: | |
| continue | |
| if len(ln) > 45: | |
| continue | |
| if re.search(r"\d{3,}", ln): | |
| continue | |
| if re.search(r"[A-Za-z\u0600-\u06FF]", ln): | |
| bad = {"curriculum vitae", "cv", "resume", "profile"} | |
| if ln.lower() in bad: | |
| continue | |
| return ln | |
| return "" | |
| def extract_contact_info(text: str) -> Dict[str, str]: | |
| t = text or "" | |
| emails = _EMAIL_RE.findall(t) | |
| raw_phones = _PHONE_RE.findall(t) | |
| phones = [] | |
| for p in raw_phones: | |
| npn = _normalize_phone(p) | |
| digits = re.sub(r"\D", "", npn) | |
| if len(digits) < 8 or len(digits) > 16: | |
| continue | |
| phones.append(npn) | |
| email = emails[0] if emails else "" | |
| phone = phones[0] if phones else "" | |
| name = guess_name(t) | |
| return {"name": name, "email": email, "phone": phone} | |
| # ========================================================= | |
| # Better local scoring | |
| # ========================================================= | |
| def compute_local_score(retr_sims: List[float], rerank_logits: List[float]) -> float: | |
| if not retr_sims: | |
| retr_sims = [0.0] | |
| if not rerank_logits: | |
| rerank_logits = [0.0] | |
| topk = sorted(retr_sims, reverse=True)[:5] | |
| retr_mean = float(np.mean(topk)) | |
| retr_max = float(np.max(topk)) | |
| retr_0_100 = 100.0 * clamp((0.65 * retr_mean + 0.35 * retr_max), 0.0, 1.0) | |
| rr_mean = float(np.mean(rerank_logits)) | |
| rr_max = float(np.max(rerank_logits)) | |
| rr_0_100 = 100.0 * clamp((0.55 * sigmoid(rr_mean) + 0.45 * sigmoid(rr_max)), 0.0, 1.0) | |
| local = 0.80 * rr_0_100 + 0.20 * retr_0_100 | |
| return float(clamp(local, 0.0, 100.0)) | |
| # ========================================================= | |
| # LLM Prompt (compact to avoid truncation) | |
| # ========================================================= | |
| def build_llm_prompt(jd_text: str, must_haves: str, candidates: List[Dict[str, Any]]) -> str: | |
| schema_example = { | |
| "ranked": [ | |
| { | |
| "filename": "<cv_filename>", | |
| "final_score": 0, | |
| "fit_level": "weak", | |
| "summary": "one short paragraph", | |
| "strengths": ["max 4 items"], | |
| "gaps": ["max 4 items"], | |
| "risks": ["max 3 items"], | |
| "checklist": [ | |
| {"requirement": "SHORT label (<=8 words)", "status": "met", "evidence": "short quote <=160 chars"} | |
| ], | |
| "top_evidence": ["max 3 short quotes"] | |
| } | |
| ], | |
| "overall_notes": "short" | |
| } | |
| return f""" | |
| You are an expert recruiter and ATS evaluator. | |
| Return ONLY one JSON object, EXACTLY matching this schema: | |
| {json.dumps(schema_example, ensure_ascii=False)} | |
| Hard limits (MUST follow): | |
| - strengths: max 4 bullets | |
| - gaps: max 4 bullets | |
| - risks: max 3 bullets | |
| - checklist: max 6 requirements total | |
| - requirement: SHORT label (<=8 words). Do NOT paste long JD sentences. | |
| - evidence: <=160 characters or empty | |
| - top_evidence: max 3 short quotes | |
| Rules: | |
| - Use ONLY the provided evidence_chunks. Do NOT invent experience. | |
| - final_score must be 0-100. | |
| - fit_level: excellent | good | maybe | weak | |
| - status: met | partial | missing | |
| Job Description (compressed): | |
| \"\"\"{jd_text[:4000]}\"\"\" | |
| Must-haves (optional): | |
| \"\"\"{(must_haves or '').strip()[:1200]}\"\"\" | |
| Candidates: | |
| {json.dumps(candidates, ensure_ascii=False)} | |
| Output JSON only. No markdown. No extra text. | |
| """.strip() | |
| def _extract_first_complete_json_object(text: str) -> Optional[str]: | |
| if not text: | |
| return None | |
| start = text.find("{") | |
| if start < 0: | |
| return None | |
| depth = 0 | |
| in_str = False | |
| esc = False | |
| for i in range(start, len(text)): | |
| ch = text[i] | |
| if in_str: | |
| if esc: | |
| esc = False | |
| elif ch == "\\": | |
| esc = True | |
| elif ch == '"': | |
| in_str = False | |
| continue | |
| else: | |
| if ch == '"': | |
| in_str = True | |
| continue | |
| if ch == "{": | |
| depth += 1 | |
| elif ch == "}": | |
| depth -= 1 | |
| if depth == 0: | |
| return text[start:i + 1] | |
| return None | |
| def fit_level_from_score(score: float) -> str: | |
| s = float(score) | |
| if s >= 85: | |
| return "excellent" | |
| if s >= 70: | |
| return "good" | |
| if s >= 55: | |
| return "maybe" | |
| return "weak" | |
| def fallback_candidate(filename: str, score: float) -> CandidateLLMResult: | |
| lvl = fit_level_from_score(score) | |
| return CandidateLLMResult( | |
| filename=filename, | |
| final_score=float(round(score, 2)), | |
| fit_level=lvl, | |
| summary="LLM output incomplete for this candidate; score based on local semantic + rerank signals.", | |
| strengths=[], | |
| gaps=[], | |
| risks=[], | |
| checklist=[], | |
| top_evidence=[], | |
| ) | |
| def llm_judge_rank_batch(jd_text: str, must_haves: str, batch: List[Dict[str, Any]]) -> LLMRankingOutput: | |
| client = get_hf_client() | |
| prompt = build_llm_prompt( | |
| jd_text, | |
| must_haves or "", | |
| [{"filename": b["filename"], "evidence_chunks": b["evidence_chunks"]} for b in batch], | |
| ) | |
| def _call(temp: float, max_toks: int, content: str) -> str: | |
| resp = client.chat_completion( | |
| model=LLM_MODEL, | |
| messages=[ | |
| {"role": "system", "content": "Return ONLY valid JSON exactly matching the schema. No markdown."}, | |
| {"role": "user", "content": content}, | |
| ], | |
| max_tokens=max_toks, | |
| temperature=temp, | |
| ) | |
| return (resp.choices[0].message.content or "").strip() | |
| out: Optional[LLMRankingOutput] = None | |
| text = _call(LLM_TEMPERATURE, LLM_MAX_TOKENS, prompt) | |
| try: | |
| out = LLMRankingOutput.model_validate(json.loads(text)) | |
| except Exception: | |
| obj = _extract_first_complete_json_object(text) | |
| if obj: | |
| out = LLMRankingOutput.model_validate(json.loads(obj)) | |
| if out is None: | |
| text2 = _call(0.0, max(LLM_MAX_TOKENS, 4500), prompt) | |
| try: | |
| out = LLMRankingOutput.model_validate(json.loads(text2)) | |
| except Exception: | |
| obj2 = _extract_first_complete_json_object(text2) | |
| if obj2: | |
| out = LLMRankingOutput.model_validate(json.loads(obj2)) | |
| if out is None: | |
| ranked = [fallback_candidate(b["filename"], b.get("local_score", 50.0)) for b in batch] | |
| return LLMRankingOutput(ranked=ranked, overall_notes="LLM parsing failed; used local scoring fallback.") | |
| returned = {c.filename: c for c in out.ranked} | |
| missing = [b for b in batch if b["filename"] not in returned] | |
| for b in missing: | |
| single_prompt = build_llm_prompt( | |
| jd_text, | |
| must_haves or "", | |
| [{"filename": b["filename"], "evidence_chunks": b["evidence_chunks"]}], | |
| ) | |
| single_text = _call(0.0, min(2200, LLM_MAX_TOKENS), single_prompt) | |
| single_out: Optional[LLMRankingOutput] = None | |
| try: | |
| single_out = LLMRankingOutput.model_validate(json.loads(single_text)) | |
| except Exception: | |
| single_obj = _extract_first_complete_json_object(single_text) | |
| if single_obj: | |
| single_out = LLMRankingOutput.model_validate(json.loads(single_obj)) | |
| if single_out and single_out.ranked: | |
| returned[b["filename"]] = single_out.ranked[0] | |
| else: | |
| returned[b["filename"]] = fallback_candidate(b["filename"], b.get("local_score", 50.0)) | |
| merged_ranked = sorted(returned.values(), key=lambda x: float(x.final_score), reverse=True) | |
| merged_notes = (out.overall_notes or "").strip() | |
| if missing: | |
| merged_notes = (merged_notes + " | Missing candidates re-judged individually / fallback used.").strip(" |") | |
| return LLMRankingOutput(ranked=merged_ranked, overall_notes=merged_notes) | |
| def merge_llm_batches(batch_outputs: List[LLMRankingOutput]) -> LLMRankingOutput: | |
| all_ranked: List[CandidateLLMResult] = [] | |
| notes = [] | |
| for out in batch_outputs: | |
| notes.append(out.overall_notes) | |
| all_ranked.extend(out.ranked) | |
| all_ranked = sorted(all_ranked, key=lambda x: float(x.final_score), reverse=True) | |
| return LLMRankingOutput( | |
| ranked=all_ranked, | |
| overall_notes=" | ".join([n for n in notes if n])[:1200], | |
| ) | |
| # ========================================================= | |
| # UI rendering (SGS) | |
| # ========================================================= | |
| def fit_badge(level: str) -> str: | |
| level = (level or "").lower().strip() | |
| if level == "excellent": | |
| return '<span class="badge b-exc">Excellent</span>' | |
| if level == "good": | |
| return '<span class="badge b-good">Good</span>' | |
| if level == "maybe": | |
| return '<span class="badge b-maybe">Potential</span>' | |
| return '<span class="badge b-weak">Weak</span>' | |
| def score_pill(score: float) -> str: | |
| s = float(score) | |
| cls = "p-high" if s >= 80 else ("p-mid" if s >= 65 else ("p-low" if s >= 45 else "p-bad")) | |
| return f'<span class="pill {cls}">{s:.1f}</span>' | |
| def candidate_card_html(rank: int, c: CandidateLLMResult) -> str: | |
| score = float(c.final_score) | |
| w = max(0, min(100, int(round(score)))) | |
| checklist_rows = "" | |
| for item in (c.checklist or [])[:6]: | |
| st = (item.status or "").lower().strip() | |
| cls = "ok" if st == "met" else ("partial" if st == "partial" else "miss") | |
| ev = (item.evidence or "").strip().replace("<", "<").replace(">", ">") | |
| req = (item.requirement or "").strip().replace("<", "<").replace(">", ">") | |
| checklist_rows += f""" | |
| <div class="checkrow {cls}"> | |
| <div class="req">{req}</div> | |
| <div class="st">{st.upper()}</div> | |
| <div class="ev">{ev if ev else "—"}</div> | |
| </div> | |
| """ | |
| strengths = "".join([f"<li>{s}</li>" for s in (c.strengths or [])[:4]]) or "<li>—</li>" | |
| gaps = "".join([f"<li>{g}</li>" for g in (c.gaps or [])[:4]]) or "<li>—</li>" | |
| risks = "".join([f"<li>{r}</li>" for r in (c.risks or [])[:3]]) or "<li>—</li>" | |
| evidence_html = "" | |
| for q in (c.top_evidence or [])[:3]: | |
| q = q.replace("<", "<").replace(">", ">") | |
| evidence_html += f'<div class="quote">“{q}”</div>' | |
| return f""" | |
| <div class="card"> | |
| <div class="card-top"> | |
| <div class="card-title"> | |
| <div class="rank">#{rank}</div> | |
| <div class="file">{c.filename}</div> | |
| </div> | |
| <div class="card-meta"> | |
| {fit_badge(c.fit_level)} | |
| {score_pill(score)} | |
| </div> | |
| </div> | |
| <div class="bar"><div class="fill" style="width:{w}%"></div></div> | |
| <div class="summary">{c.summary}</div> | |
| <div class="grid"> | |
| <div> | |
| <div class="section-title">Strengths</div> | |
| <ul class="list">{strengths}</ul> | |
| </div> | |
| <div> | |
| <div class="section-title">Gaps</div> | |
| <ul class="list">{gaps}</ul> | |
| </div> | |
| </div> | |
| <div class="section-title">Risks</div> | |
| <ul class="list">{risks}</ul> | |
| <div class="section-title">Requirements Checklist</div> | |
| <div class="checklist"> | |
| {checklist_rows if checklist_rows else '<div class="quote muted">No checklist produced.</div>'} | |
| </div> | |
| <div class="section-title">Evidence</div> | |
| <div class="quotes"> | |
| {evidence_html if evidence_html else '<div class="quote muted">No evidence produced.</div>'} | |
| </div> | |
| </div> | |
| """ | |
| def render_top10_html(ranked: List[CandidateLLMResult], total_count: int) -> str: | |
| top10 = ranked[:10] | |
| cards = "".join([candidate_card_html(i, c) for i, c in enumerate(top10, start=1)]) | |
| top_score = ranked[0].final_score if ranked else 0.0 | |
| return f""" | |
| <div class="hero"> | |
| <div class="hero-left"> | |
| <div class="hero-title">SGS Candidate Fit Report</div> | |
| <div class="hero-sub">Top 10 ranked candidates (evidence-based)</div> | |
| </div> | |
| <div class="hero-right"> | |
| <div class="kpi"> | |
| <div class="kpi-label">Total Ranked</div> | |
| <div class="kpi-val">{total_count}</div> | |
| </div> | |
| <div class="kpi"> | |
| <div class="kpi-label">Top Score</div> | |
| <div class="kpi-val">{top_score:.1f}</div> | |
| </div> | |
| </div> | |
| </div> | |
| <div class="cards">{cards}</div> | |
| """ | |
| # ========================================================= | |
| # Shortlist export (DataFrame-safe) | |
| # ========================================================= | |
| def export_shortlist(shortlist_table: pd.DataFrame) -> Tuple[str, str, str]: | |
| if shortlist_table is None or shortlist_table.empty: | |
| raise gr.Error("No shortlist data yet. Run ranking first.") | |
| shortlisted_df = shortlist_table[shortlist_table.iloc[:, 0] == True] | |
| if shortlisted_df.empty: | |
| raise gr.Error("No candidates marked as shortlisted.") | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") | |
| shortlisted_df.to_csv(tmp.name, index=False) | |
| emails = ( | |
| shortlisted_df.iloc[:, 6] | |
| .dropna() | |
| .astype(str) | |
| .str.strip() | |
| .tolist() | |
| ) | |
| emails = [e for e in emails if e] | |
| emails_unique = sorted(set(emails)) | |
| email_block = ", ".join(emails_unique) | |
| msg = f"Exported {len(shortlisted_df)} shortlisted candidate(s)." | |
| return tmp.name, msg, email_block | |
| # ========================================================= | |
| # Main pipeline (with progress bar) | |
| # ========================================================= | |
| def rank_app( | |
| jd_file_obj, | |
| cv_file_objs, | |
| must_haves: str, | |
| mask_pii_toggle: bool, | |
| show_contacts_toggle: bool, | |
| progress=gr.Progress(track_tqdm=False), # ✅ progress bar | |
| ): | |
| t0 = time.time() | |
| ensure_models() | |
| embedder = _embedder | |
| reranker = _reranker | |
| progress(0.02, desc="Loading Job Description...") | |
| jd_path = gr_file_to_path(jd_file_obj) | |
| if not jd_path: | |
| raise gr.Error("Please upload a Job Description file (PDF/DOCX/TXT).") | |
| jd_text = clean_text(read_file_to_text(jd_path))[:MAX_JD_CHARS] | |
| if not jd_text: | |
| raise gr.Error("Could not extract text from the Job Description file.") | |
| if not cv_file_objs: | |
| raise gr.Error("Please upload at least 1 CV.") | |
| # ✅ enforce max 20 | |
| if len(cv_file_objs) > MAX_CV_UPLOADS: | |
| raise gr.Error(f"Maximum allowed CV uploads is {MAX_CV_UPLOADS}. You uploaded {len(cv_file_objs)}.") | |
| cv_paths = [] | |
| for f in cv_file_objs: | |
| p = gr_file_to_path(f) | |
| if p: | |
| cv_paths.append(p) | |
| if not cv_paths: | |
| raise gr.Error("Could not read uploaded CV files (no valid paths).") | |
| progress(0.06, desc="Checking duplicates...") | |
| seen = {} | |
| duplicates = [] | |
| unique_paths = [] | |
| for p in cv_paths: | |
| fname = os.path.basename(p) | |
| try: | |
| h = file_bytes_hash(p) | |
| except Exception: | |
| h = hashlib.sha256(clean_text(read_file_to_text(p)).encode("utf-8", errors="ignore")).hexdigest() | |
| if h in seen: | |
| duplicates.append((fname, seen[h])) | |
| continue | |
| seen[h] = fname | |
| unique_paths.append(p) | |
| progress(0.10, desc="Embedding Job Description...") | |
| jd_vec = np.array(embedder.encode([jd_text], normalize_embeddings=True), dtype=np.float32) | |
| local_pool = [] | |
| contacts_map: Dict[str, Dict[str, str]] = {} | |
| total = len(unique_paths) | |
| for idx, p in enumerate(unique_paths, start=1): | |
| # progress 10% -> 70% while processing CVs | |
| prog = 0.10 + 0.60 * (idx / max(1, total)) | |
| progress(prog, desc=f"Processing CVs ({idx}/{total}) — {os.path.basename(p)}") | |
| raw = clean_text(read_file_to_text(p))[:MAX_CV_CHARS] | |
| if not raw: | |
| continue | |
| filename = os.path.basename(p) | |
| info = extract_contact_info(raw) if show_contacts_toggle else {"name": "", "email": "", "phone": ""} | |
| contacts_map[filename] = info | |
| chunks = chunk_text_safe(raw) | |
| if not chunks: | |
| continue | |
| chunk_vecs = np.array(embedder.encode(chunks, normalize_embeddings=True), dtype=np.float32) | |
| sims = cosine_sim_matrix(jd_vec, chunk_vecs)[0] | |
| idxs = np.argsort(sims)[::-1][:TOP_CHUNKS_PER_CV] | |
| top_chunks = [(int(i), float(sims[int(i)]), chunks[int(i)]) for i in idxs] | |
| evidence_chunks = [txt for _, _, txt in top_chunks[:EVIDENCE_CHUNKS_PER_CV]] | |
| if mask_pii_toggle: | |
| evidence_chunks = [mask_pii(x) for x in evidence_chunks] | |
| pairs = [(jd_text, ev) for ev in evidence_chunks] | |
| logits = reranker.predict(pairs) if pairs else [0.0] | |
| logits = [float(x) for x in logits] | |
| retr_sims = [s for _, s, _ in top_chunks] | |
| local_score = compute_local_score(retr_sims, logits) | |
| local_pool.append({ | |
| "filename": filename, | |
| "local_score": local_score, | |
| "evidence_chunks": evidence_chunks, | |
| }) | |
| if not local_pool: | |
| raise gr.Error("Could not extract usable text from the uploaded CVs.") | |
| progress(0.72, desc="Preparing LLM ranking...") | |
| local_pool = sorted(local_pool, key=lambda x: float(x["local_score"]), reverse=True) | |
| batch_outputs: List[LLMRankingOutput] = [] | |
| batches = max(1, (len(local_pool) + LLM_BATCH_SIZE - 1) // LLM_BATCH_SIZE) | |
| for b in range(batches): | |
| start = b * LLM_BATCH_SIZE | |
| end = start + LLM_BATCH_SIZE | |
| batch = local_pool[start:end] | |
| # progress 72% -> 92% while LLM runs | |
| prog = 0.72 + 0.20 * ((b + 1) / batches) | |
| progress(prog, desc=f"LLM judging batches ({b+1}/{batches})...") | |
| llm_batch = [ | |
| { | |
| "filename": c["filename"], | |
| "evidence_chunks": c["evidence_chunks"], | |
| "local_score": c["local_score"], | |
| } | |
| for c in batch | |
| ] | |
| out = llm_judge_rank_batch(jd_text, must_haves or "", llm_batch) | |
| batch_outputs.append(out) | |
| progress(0.94, desc="Finalizing report...") | |
| judged = merge_llm_batches(batch_outputs) | |
| ranked = judged.ranked | |
| if not ranked: | |
| raise gr.Error("LLM returned an empty ranking.") | |
| report_html = render_top10_html(ranked, total_count=len(ranked)) | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") | |
| with open(tmp.name, "w", newline="", encoding="utf-8") as f: | |
| w = csv.writer(f) | |
| w.writerow(["Rank", "Filename", "FinalScore(0-100)", "FitLevel", "Name", "Email", "Phone", "Summary"]) | |
| for ridx, c in enumerate(ranked, start=1): | |
| ci = contacts_map.get(c.filename, {"name": "", "email": "", "phone": ""}) | |
| w.writerow([ | |
| ridx, | |
| c.filename, | |
| round(float(c.final_score), 2), | |
| c.fit_level, | |
| ci.get("name", ""), | |
| ci.get("email", ""), | |
| ci.get("phone", ""), | |
| c.summary, | |
| ]) | |
| shortlist_rows = [] | |
| for ridx, c in enumerate(ranked, start=1): | |
| ci = contacts_map.get(c.filename, {"name": "", "email": "", "phone": ""}) | |
| shortlist_rows.append([ | |
| False, | |
| ridx, | |
| c.filename, | |
| round(float(c.final_score), 2), | |
| c.fit_level, | |
| ci.get("name", ""), | |
| ci.get("email", ""), | |
| ci.get("phone", ""), | |
| ]) | |
| shortlist_df = pd.DataFrame( | |
| shortlist_rows, | |
| columns=["Shortlisted", "Rank", "Filename", "Score", "Fit", "Name", "Email", "Phone"], | |
| ) | |
| elapsed = time.time() - t0 | |
| meta = ( | |
| f"**CVs uploaded:** {len(cv_paths)} → **Unique processed:** {len(unique_paths)} (Max allowed: {MAX_CV_UPLOADS}) \n" | |
| f"**Ranked (ALL):** {len(ranked)} \n" | |
| f"**LLM batches:** {batches} (batch size={LLM_BATCH_SIZE}) \n" | |
| f"**Time:** {elapsed:.2f}s \n" | |
| f"**Duplicates skipped:** {len(duplicates)} \n\n" | |
| f"**LLM Notes:** {(judged.overall_notes or '').strip()}" | |
| ) | |
| progress(1.0, desc="Done ✅") | |
| return report_html, meta, tmp.name, shortlist_df, "", "" | |
| # ========================================================= | |
| # SGS Theme / CSS (white text + MET green + nice touches) | |
| # ========================================================= | |
| CUSTOM_CSS = """ | |
| :root{ | |
| --sgs-blue:#0B3D91; | |
| --sgs-green:#00A651; | |
| --text:#F3F7FF; | |
| --line:rgba(255,255,255,.14); | |
| } | |
| .gradio-container{max-width:1180px !important;} | |
| body, .gradio-container{ | |
| background: radial-gradient(1200px 700px at 10% 10%, rgba(11,61,145,.28), transparent 55%), | |
| radial-gradient(900px 600px at 90% 20%, rgba(0,166,81,.20), transparent 60%), | |
| linear-gradient(180deg, #060914, #060914) !important; | |
| } | |
| .gradio-container, .gradio-container *{ color: var(--text); } | |
| /* Hero */ | |
| .hero{ | |
| border:1px solid var(--line); | |
| background: linear-gradient(135deg, rgba(11,61,145,.40), rgba(0,166,81,.20)); | |
| border-radius: 22px; | |
| padding: 18px; | |
| display:flex; | |
| align-items:flex-end; | |
| justify-content:space-between; | |
| gap:16px; | |
| box-shadow: 0 18px 40px rgba(0,0,0,.38); | |
| margin: 12px 0 16px; | |
| position: relative; | |
| overflow: hidden; | |
| } | |
| .hero:before{ | |
| content:""; | |
| position:absolute; | |
| inset:-40%; | |
| background: radial-gradient(circle at 30% 30%, rgba(255,255,255,.10), transparent 45%); | |
| transform: rotate(18deg); | |
| pointer-events:none; | |
| } | |
| .hero-title{font-weight:900;font-size:22px;position:relative;} | |
| .hero-sub{color:rgba(243,247,255,.90);margin-top:6px;font-size:13px;position:relative;} | |
| .hero-right{display:flex;gap:10px;flex-wrap:wrap;justify-content:flex-end;position:relative;} | |
| .kpi{ | |
| background: rgba(255,255,255,.08); | |
| border:1px solid rgba(255,255,255,.14); | |
| border-radius: 16px; | |
| padding: 10px 12px; | |
| min-width: 140px; | |
| backdrop-filter: blur(6px); | |
| } | |
| .kpi-label{color:rgba(243,247,255,.82);font-size:12px;font-weight:700;} | |
| .kpi-val{font-size:18px;font-weight:900;margin-top:2px;} | |
| /* Cards */ | |
| .cards{display:grid;grid-template-columns: 1fr; gap: 12px;} | |
| .card{ | |
| background: linear-gradient(180deg, rgba(16,26,44,.98), rgba(12,19,34,.88)); | |
| border:1px solid rgba(255,255,255,.14); | |
| border-radius: 18px; | |
| padding: 14px; | |
| box-shadow: 0 14px 28px rgba(0,0,0,.28); | |
| transition: transform .18s ease, box-shadow .18s ease, border-color .18s ease; | |
| } | |
| .card:hover{ | |
| transform: translateY(-2px); | |
| box-shadow: 0 20px 40px rgba(0,0,0,.38); | |
| border-color: rgba(255,255,255,.20); | |
| } | |
| .card-top{display:flex;align-items:flex-start;justify-content:space-between;gap:10px;} | |
| .card-title{display:flex;gap:10px;align-items:baseline;flex-wrap:wrap;} | |
| .rank{ | |
| background: rgba(11,61,145,.35); | |
| border:1px solid rgba(11,61,145,.45); | |
| font-weight: 900; | |
| border-radius: 999px; | |
| padding: 6px 10px; | |
| font-size: 12px; | |
| } | |
| .file{font-weight:900;font-size:16px;} | |
| .card-meta{display:flex;gap:8px;align-items:center;flex-wrap:wrap;justify-content:flex-end;} | |
| /* Badges */ | |
| .badge{ | |
| display:inline-flex;align-items:center; | |
| padding: 6px 10px;border-radius: 999px;font-size:12px;font-weight:900; | |
| border:1px solid rgba(255,255,255,.12); | |
| } | |
| .b-exc{ background: rgba(0,166,81,.20); border-color: rgba(0,166,81,.30); } | |
| .b-good{ background: rgba(11,61,145,.20); border-color: rgba(11,61,145,.32); } | |
| .b-maybe{ background: rgba(245,158,11,.18); border-color: rgba(245,158,11,.28); } | |
| .b-weak{ background: rgba(239,68,68,.16); border-color: rgba(239,68,68,.28); } | |
| .pill{ | |
| display:inline-flex;align-items:center;justify-content:center; | |
| min-width:60px;padding: 6px 10px;border-radius: 999px;font-weight: 900; | |
| border:1px solid rgba(255,255,255,.12); | |
| background: rgba(255,255,255,.08); | |
| } | |
| .p-high{ background: rgba(0,166,81,.18); border-color: rgba(0,166,81,.30); } | |
| .p-mid{ background: rgba(11,61,145,.18); border-color: rgba(11,61,145,.30); } | |
| .p-low{ background: rgba(245,158,11,.16); border-color: rgba(245,158,11,.28); } | |
| .p-bad{ background: rgba(239,68,68,.14); border-color: rgba(239,68,68,.28); } | |
| /* Score bar */ | |
| .bar{ | |
| width: 100%; height: 10px; border-radius: 999px; | |
| background: rgba(255,255,255,.10); overflow: hidden; | |
| border:1px solid rgba(255,255,255,.10); | |
| margin: 10px 0 10px; | |
| } | |
| .fill{ | |
| height:100%; border-radius: 999px; | |
| background: linear-gradient(90deg, var(--sgs-green), #4fb2ff, var(--sgs-blue)); | |
| } | |
| .summary{font-size:13px;line-height:1.55rem;margin: 6px 0 10px;color:#fff;} | |
| .section-title{font-size:13px;font-weight:900;margin:10px 0 6px;color:#fff;} | |
| .grid{display:grid;grid-template-columns: 1fr 1fr; gap: 14px;} | |
| @media(max-width:860px){.grid{grid-template-columns:1fr;}} | |
| .list{margin:0;padding-left:18px;color:#fff;} | |
| .list li{margin:6px 0;line-height:1.30rem;color:#fff;} | |
| /* Quotes / Evidence */ | |
| .quotes{display:grid;gap:10px;margin-top:6px;} | |
| .quote{ | |
| background: rgba(255,255,255,.10); | |
| border:1px solid rgba(255,255,255,.16); | |
| border-radius: 14px; | |
| padding: 10px 12px; | |
| color: #fff; | |
| font-size: 13px; | |
| line-height: 1.45rem; | |
| } | |
| /* Checklist */ | |
| .checklist{display:grid;gap:8px;margin-top:6px;} | |
| .checkrow{ | |
| display:grid; grid-template-columns: 1.1fr .4fr 1.5fr; gap:10px; | |
| padding:10px 12px; border-radius:14px; | |
| border:1px solid rgba(255,255,255,.18); | |
| background: rgba(255,255,255,.10); | |
| font-size:13px; | |
| position: relative; | |
| overflow: hidden; | |
| } | |
| .checkrow:before{ | |
| content:""; | |
| position:absolute; | |
| left:0; top:0; bottom:0; | |
| width:4px; | |
| background: rgba(255,255,255,.20); | |
| } | |
| .checkrow .req{font-weight:900;color:#fff;} | |
| .checkrow .ev{color:rgba(255,255,255,0.95);} | |
| .checkrow .st{font-weight:1000;text-align:center;letter-spacing:.4px;} | |
| /* ✅ Status colors (MET green) */ | |
| .checkrow.ok:before{ background: rgba(0,166,81,.95); } | |
| .checkrow.partial:before{ background: rgba(245,158,11,.95); } | |
| .checkrow.miss:before{ background: rgba(239,68,68,.95); } | |
| .checkrow.ok .st{ color:#22ffb6 !important; text-shadow: 0 0 10px rgba(34,255,182,.18); } | |
| .checkrow.partial .st{ color:#ffd27a !important; } | |
| .checkrow.miss .st{ color:#ff9a9a !important; } | |
| /* Dataframe border */ | |
| table { border-color: rgba(255,255,255,.14) !important; } | |
| """ | |
| # ========================================================= | |
| # Gradio UI | |
| # ========================================================= | |
| theme = gr.themes.Soft( | |
| primary_hue="blue", | |
| secondary_hue="green", | |
| neutral_hue="slate", | |
| radius_size="lg", | |
| font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"], | |
| ) | |
| with gr.Blocks(title="SGS ATS Candidate Matcher", theme=theme, css=CUSTOM_CSS) as demo: | |
| gr.Markdown(f""" | |
| # SGS ATS Candidate Matcher | |
| Evidence-based CV ranking against a Job Description (Top 10 Report + Shortlisting). | |
| **Max CV uploads:** {MAX_CV_UPLOADS} | |
| **Important:** set `HF_TOKEN` in Space secrets. | |
| """) | |
| with gr.Row(): | |
| jd_file = gr.File(label="Job Description file (PDF/DOCX/TXT)", file_types=[".pdf", ".docx", ".txt"]) | |
| cv_files = gr.File(label=f"Upload CVs (max {MAX_CV_UPLOADS})", file_count="multiple", file_types=[".pdf", ".docx", ".txt"]) | |
| with gr.Accordion("Settings", open=False): | |
| must_haves = gr.Textbox( | |
| label="Must-have requirements (optional) — one per line", | |
| lines=5, | |
| placeholder="Example:\nRecruitment lifecycle\nATS usage\nInterview scheduling\nOffer negotiation" | |
| ) | |
| mask_pii_toggle = gr.Checkbox(label="Mask PII (emails/phones) in evidence", value=True) | |
| show_contacts_toggle = gr.Checkbox(label="Extract contact info (Name / Email / Phone) from CVs", value=True) | |
| gr.Markdown(""" | |
| **Stability tips** | |
| - If truncation happens: set `LLM_BATCH_SIZE=3` and/or `LLM_MAX_TOKENS=4500` in Space Variables. | |
| - CPU Space: set `RERANK_MODEL_NAME=BAAI/bge-reranker-base` | |
| """) | |
| run_btn = gr.Button("Generate Candidate Fit Report", variant="primary") | |
| with gr.Tabs(): | |
| with gr.Tab("Executive Report (Top 10)"): | |
| report_html = gr.HTML() | |
| meta_md = gr.Markdown() | |
| export_full = gr.File(label="Download Full Ranking CSV (includes contacts)") | |
| with gr.Tab("Shortlist & Export"): | |
| gr.Markdown("Tick **Shortlisted** candidates, then click **Export Shortlist**.") | |
| shortlist_df = gr.Dataframe( | |
| headers=["Shortlisted", "Rank", "Filename", "Score", "Fit", "Name", "Email", "Phone"], | |
| datatype=["bool", "number", "str", "number", "str", "str", "str", "str"], | |
| interactive=True, | |
| ) | |
| with gr.Row(): | |
| export_shortlist_btn = gr.Button("Export Shortlist CSV", variant="secondary") | |
| export_shortlist_file = gr.File(label="Download Shortlist CSV") | |
| export_shortlist_msg = gr.Markdown() | |
| email_list = gr.Textbox( | |
| label="Email list (copy/paste) — shortlisted only", | |
| lines=3, | |
| placeholder="Emails will appear here after exporting shortlist..." | |
| ) | |
| run_btn.click( | |
| fn=rank_app, | |
| inputs=[jd_file, cv_files, must_haves, mask_pii_toggle, show_contacts_toggle], | |
| outputs=[report_html, meta_md, export_full, shortlist_df, export_shortlist_msg, email_list], | |
| ) | |
| export_shortlist_btn.click( | |
| fn=export_shortlist, | |
| inputs=[shortlist_df], | |
| outputs=[export_shortlist_file, export_shortlist_msg, email_list], | |
| ) | |
| demo.launch() | |