Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import zipfile | |
| import tempfile | |
| import pandas as pd | |
| import pdfplumber | |
| import fitz # PyMuPDF | |
| import gradio as gr | |
| from docx import Document | |
| from sentence_transformers import SentenceTransformer, util | |
| # ============================================================= | |
| # CONFIG | |
| # ============================================================= | |
| # Upload this Excel file to the root of your HF Space | |
| INTERNAL_EXCEL_FILE = "Summary_of_Faculty_Rankig_16th Feb 2025.xlsx" | |
| # Your fine-tuned model on Hugging Face Hub | |
| MODEL_NAME = "csAhmad/zoraiz-model" | |
| # Exact output columns — matches your original Excel exactly | |
| OUTPUT_COLUMNS = [ | |
| "Name (Age)", "Contact", "Current Job", "Qualifciation", | |
| "Experience", "Publications", "Citation", "H-index", | |
| "Nationality", "Other Achievements", "Area ", "Comments" | |
| ] | |
| # ============================================================= | |
| # LOAD MODEL (once at startup) | |
| # ============================================================= | |
| print("Loading model...") | |
| app_model = SentenceTransformer(MODEL_NAME) | |
| print("Model loaded.") | |
| # ============================================================= | |
| # HELPERS | |
| # ============================================================= | |
| def normalize_text(text): | |
| if pd.isna(text): | |
| return "" | |
| text = str(text).strip().lower() | |
| text = re.sub(r"\s+", " ", text) | |
| text = re.sub(r"[^a-z0-9\s]", "", text) | |
| return text | |
| def extract_name_only(name_age_value): | |
| """Strips URLs, age brackets, and returns clean name only.""" | |
| if pd.isna(name_age_value): | |
| return "" | |
| text = str(name_age_value).strip() | |
| # Remove URLs | |
| text = re.sub(r'https?://\S+', '', text) | |
| # Remove age/date in brackets e.g. (35) or (Date of birth: ...) | |
| text = re.sub(r'\([^)]*\)', '', text) | |
| # Find first line that looks like a real name | |
| lines = [l.strip() for l in text.split('\n') if l.strip()] | |
| name = "" | |
| for line in lines: | |
| # Skip emails, long lines, pure numbers, known non-name keywords | |
| if '@' in line or len(line) > 60: | |
| continue | |
| if re.match(r'^[\d\s\+\-\(\)]+$', line): | |
| continue | |
| if any(kw in line.lower() for kw in ['scholar', 'citation', 'http', 'www', 'email', 'phone', 'mobile']): | |
| continue | |
| name = line | |
| break | |
| return re.sub(r'\s+', ' ', name).strip() | |
| def name_to_tokens(name): | |
| name = normalize_text(name) | |
| return [t for t in name.split() if len(t) >= 2] | |
| def detect_document_type(file_name): | |
| name = str(file_name).lower() | |
| if "cv" in name or "resume" in name: | |
| return "cv" | |
| elif "cover" in name: | |
| return "cover_letter" | |
| elif "research" in name: | |
| return "research_statement" | |
| elif "teaching" in name: | |
| return "teaching_statement" | |
| elif "publication" in name: | |
| return "publication_list" | |
| elif "reference" in name: | |
| return "reference" | |
| elif "transcript" in name or "degree" in name or "certificate" in name: | |
| return "academic_document" | |
| elif "passport" in name or "visa" in name: | |
| return "identity_document" | |
| else: | |
| return "other" | |
| # ============================================================= | |
| # TEXT EXTRACTION | |
| # ============================================================= | |
| def extract_text_from_pdf(file_path): | |
| text = "" | |
| # pdfplumber first | |
| try: | |
| with pdfplumber.open(file_path) as pdf: | |
| for page in pdf.pages: | |
| try: | |
| t = page.extract_text() | |
| if t: | |
| text += t + "\n" | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| # PyMuPDF fallback | |
| if not text.strip(): | |
| try: | |
| doc = fitz.open(file_path) | |
| for page in doc: | |
| t = page.get_text("text") | |
| if t: | |
| text += t + "\n" | |
| doc.close() | |
| except Exception as e: | |
| print(f"[PDF error] {file_path}: {e}") | |
| return text | |
| def extract_text_from_docx(file_path): | |
| text = "" | |
| try: | |
| doc = Document(file_path) | |
| for para in doc.paragraphs: | |
| if para.text: | |
| text += para.text + "\n" | |
| except Exception as e: | |
| print(f"[DOCX error] {file_path}: {e}") | |
| return text | |
| def extract_document_text(file_path): | |
| ext = os.path.splitext(file_path)[1].lower() | |
| if ext == ".pdf": | |
| return extract_text_from_pdf(file_path) | |
| elif ext in [".docx", ".doc"]: | |
| return extract_text_from_docx(file_path) | |
| elif ext == ".txt": | |
| if not os.path.exists(file_path): | |
| return "" | |
| try: | |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read() | |
| except Exception: | |
| return "" | |
| return "" | |
| # ============================================================= | |
| # MATCHING: CV folder name → Excel row | |
| # ============================================================= | |
| def match_by_token_overlap(matching_text, excel_df, min_hits=2): | |
| text_clean = normalize_text(matching_text) | |
| best_idx = None | |
| best_hits = -1 | |
| best_score = -1 | |
| best_name = None | |
| for idx, row in excel_df.iterrows(): | |
| tokens = row["candidate_name_tokens"] | |
| if not tokens: | |
| continue | |
| hits = sum(1 for t in tokens if t in text_clean) | |
| coverage = hits / max(len(tokens), 1) | |
| score = hits + coverage | |
| if hits > best_hits or (hits == best_hits and score > best_score): | |
| best_idx = idx | |
| best_hits = hits | |
| best_score = score | |
| best_name = row["candidate_name_only"] | |
| return (best_idx, best_name) if best_hits >= min_hits else (None, None) | |
| # ============================================================= | |
| # BUILD RICH PROFILE TEXT FOR SEMANTIC MODEL | |
| # ============================================================= | |
| def build_candidate_profile(row): | |
| """ | |
| Combines the pre-filled Excel fields + extracted CV document text | |
| into one string for the semantic model to score against the JD. | |
| """ | |
| parts = [] | |
| # Excel fields (already filled in by your team) | |
| fields = [ | |
| ("Name", row.get("Name (Age)", "")), | |
| ("Current Job", row.get("Current Job", "")), | |
| ("Qualification", row.get("Qualifciation", "")), # typo preserved from Excel | |
| ("Experience", row.get("Experience", "")), | |
| ("Publications", row.get("Publications", "")), | |
| ("Citations", row.get("Citation", "")), | |
| ("H-index", row.get("H-index", "")), | |
| ("Nationality", row.get("Nationality", "")), | |
| ("Achievements", row.get("Other Achievements", "")), | |
| ("Area", row.get("Area ", "")), # trailing space preserved | |
| ("Comments", row.get("Comments", "")), | |
| ] | |
| for label, value in fields: | |
| value = str(value).strip() | |
| if value and value.lower() != "nan": | |
| parts.append(f"{label}: {value}") | |
| # Extracted CV document text | |
| cv_text = str(row.get("combined_profile_text", "")).strip() | |
| if cv_text: | |
| parts.append(f"CV Documents:\n{cv_text}") | |
| return "\n".join(parts).strip() | |
| # ============================================================= | |
| # MAIN PIPELINE | |
| # ============================================================= | |
| def run_pipeline(zip_file_path, job_description_text): | |
| work_dir = tempfile.mkdtemp(prefix="cv_rank_") | |
| extract_folder = os.path.join(work_dir, "documents") | |
| os.makedirs(extract_folder, exist_ok=True) | |
| # ------ STEP 1: Load internal Excel ------ | |
| if not os.path.exists(INTERNAL_EXCEL_FILE): | |
| raise FileNotFoundError( | |
| f"Internal dataset not found: '{INTERNAL_EXCEL_FILE}'. " | |
| "Please upload it to the root of your HF Space." | |
| ) | |
| df = pd.read_excel(INTERNAL_EXCEL_FILE) | |
| # Strip whitespace from all column names | |
| df.columns = df.columns.str.strip() | |
| # NOTE: After stripping, "Area " becomes "Area" — re-add trailing space | |
| # to stay consistent with Excel original | |
| if "Area" in df.columns and "Area " not in df.columns: | |
| df = df.rename(columns={"Area": "Area "}) | |
| df["candidate_name_raw"] = df["Name (Age)"].astype(str) | |
| df["candidate_name_only"] = df["candidate_name_raw"].apply(extract_name_only) | |
| df["candidate_name_tokens"] = df["candidate_name_only"].apply(name_to_tokens) | |
| # Fill NaN in key columns | |
| for col in ["Other Achievements", "Area ", "Comments", "Contact", | |
| "Current Job", "Qualifciation", "Experience", | |
| "Publications", "Citation", "H-index", "Nationality"]: | |
| if col in df.columns: | |
| df[col] = df[col].fillna("") | |
| # ------ STEP 2: Extract ZIP ------ | |
| try: | |
| with zipfile.ZipFile(zip_file_path, "r") as z: | |
| z.extractall(extract_folder) | |
| except zipfile.BadZipFile: | |
| raise ValueError("Invalid ZIP file.") | |
| # ------ STEP 3: Scan documents ------ | |
| valid_ext = {".pdf", ".docx", ".doc"} | |
| doc_rows = [] | |
| for root, _, files in os.walk(extract_folder): | |
| for fname in files: | |
| if fname.startswith(".") or fname.startswith("__"): | |
| continue | |
| ext = os.path.splitext(fname)[1].lower() | |
| if ext not in valid_ext: | |
| continue | |
| full_path = os.path.join(root, fname) | |
| rel_path = os.path.relpath(full_path, extract_folder) | |
| folder_name = os.path.dirname(rel_path) | |
| if folder_name in ("", "."): | |
| folder_name = os.path.splitext(fname)[0] | |
| doc_rows.append({ | |
| "file_name": fname, | |
| "full_path": full_path, | |
| "folder_name": folder_name, | |
| "extension": ext | |
| }) | |
| if not doc_rows: | |
| raise ValueError("No valid PDF or DOCX files found in the ZIP.") | |
| docs_df = pd.DataFrame(doc_rows) | |
| # ------ STEP 4: Extract text ------ | |
| text_rows = [] | |
| for _, row in docs_df.iterrows(): | |
| text = extract_document_text(row["full_path"]) | |
| text = text.replace("\x00", " ") | |
| text = re.sub(r"[ \t]+", " ", text) | |
| text = re.sub(r"\n{3,}", "\n\n", text).strip() | |
| status = "success" if text else "empty" | |
| text_rows.append({ | |
| "file_name": row["file_name"], | |
| "folder_name": row["folder_name"], | |
| "text": text, | |
| "status": status, | |
| "doc_type": detect_document_type(row["file_name"]) | |
| }) | |
| text_df = pd.DataFrame(text_rows) | |
| # Keep useful doc types; fall back to all readable | |
| useful_types = {"cv", "cover_letter", "research_statement", "teaching_statement", "publication_list"} | |
| useful_df = text_df[(text_df["status"] == "success") & (text_df["doc_type"].isin(useful_types))].copy() | |
| if useful_df.empty: | |
| print("[Warning] No files matched standard doc types — using all readable files.") | |
| useful_df = text_df[text_df["status"] == "success"].copy() | |
| if useful_df.empty: | |
| raise ValueError("No readable documents found in the ZIP.") | |
| # ------ STEP 5: Build one combined profile per folder ------ | |
| doc_priority = {"cv": 1, "research_statement": 2, "teaching_statement": 3, | |
| "publication_list": 4, "cover_letter": 5, "other": 99} | |
| useful_df["priority"] = useful_df["doc_type"].map(doc_priority).fillna(99) | |
| useful_df = useful_df.sort_values(["folder_name", "priority", "file_name"]).reset_index(drop=True) | |
| profiles = [] | |
| for folder_name, group in useful_df.groupby("folder_name"): | |
| parts = [] | |
| included_files = [] | |
| included_types = [] | |
| for _, doc_row in group.iterrows(): | |
| t = str(doc_row["text"]).strip() | |
| if not t: | |
| continue | |
| parts.append( | |
| f"\n--- {doc_row['doc_type'].upper()} | {doc_row['file_name']} ---\n{t}" | |
| ) | |
| included_files.append(doc_row["file_name"]) | |
| included_types.append(doc_row["doc_type"]) | |
| profiles.append({ | |
| "folder_name": folder_name, | |
| "combined_profile_text": "\n".join(parts).strip(), | |
| "included_files": " | ".join(included_files), | |
| "included_doc_types": " | ".join(sorted(set(included_types))) | |
| }) | |
| profiles_df = pd.DataFrame(profiles) | |
| if profiles_df.empty: | |
| raise ValueError("No candidate profiles could be built.") | |
| # Build matching text (folder name + filenames + first 1500 chars of profile) | |
| profiles_df["matching_text"] = profiles_df.apply( | |
| lambda r: f"{r['folder_name']}\n{r['included_files']}\n{r['combined_profile_text'][:1500]}", | |
| axis=1 | |
| ) | |
| # ------ STEP 6: Match folders → Excel rows ------ | |
| matches = [] | |
| for _, row in profiles_df.iterrows(): | |
| matched_idx, matched_name = match_by_token_overlap( | |
| row["matching_text"], df, min_hits=2 | |
| ) | |
| matches.append({ | |
| "folder_name": row["folder_name"], | |
| "matched_excel_index": matched_idx, | |
| "matched_name": matched_name | |
| }) | |
| matches_df = pd.DataFrame(matches) | |
| matched_only = matches_df[matches_df["matched_excel_index"].notna()].copy() | |
| if matched_only.empty: | |
| raise ValueError( | |
| "No candidates could be matched between ZIP folder names and the Excel dataset. " | |
| "Ensure ZIP folder names contain the candidate names from the Excel file." | |
| ) | |
| # Merge with Excel rows | |
| merged_df = matched_only.merge( | |
| df.reset_index().rename(columns={"index": "excel_index"}), | |
| left_on="matched_excel_index", | |
| right_on="excel_index", | |
| how="left" | |
| ) | |
| # ------ STEP 7: Merge with profile texts ------ | |
| final_df = merged_df.merge( | |
| profiles_df[["folder_name", "combined_profile_text", "included_files", "included_doc_types"]], | |
| on="folder_name", | |
| how="left" | |
| ) | |
| for col in ["combined_profile_text", "included_files", "included_doc_types"]: | |
| final_df[col] = final_df[col].fillna("") | |
| # Build rich profile string for model | |
| final_df["candidate_profile_for_model"] = final_df.apply(build_candidate_profile, axis=1) | |
| # ------ STEP 8: Semantic scoring ------ | |
| job_embedding = app_model.encode( | |
| job_description_text, | |
| convert_to_tensor=True, | |
| normalize_embeddings=True | |
| ) | |
| cand_embeddings = app_model.encode( | |
| final_df["candidate_profile_for_model"].tolist(), | |
| convert_to_tensor=True, | |
| normalize_embeddings=True | |
| ) | |
| scores = util.cos_sim(job_embedding, cand_embeddings)[0] | |
| final_df["Match Score"] = scores.cpu().numpy().round(4) | |
| # ------ STEP 9: Rank and shortlist (above median) ------ | |
| ranked_df = final_df.sort_values("Match Score", ascending=False).reset_index(drop=True) | |
| threshold = ranked_df["Match Score"].median() | |
| shortlisted = ranked_df[ranked_df["Match Score"] >= threshold].copy().reset_index(drop=True) | |
| # Clean up Name (Age) — strip URLs and show name only | |
| shortlisted["Name (Age)"] = shortlisted["Name (Age)"].apply(extract_name_only) | |
| # ------ STEP 10: Build final output with exact Excel columns ------ | |
| # Ensure all output columns exist | |
| for col in OUTPUT_COLUMNS: | |
| if col not in shortlisted.columns: | |
| shortlisted[col] = "" | |
| existing_cols = [c for c in OUTPUT_COLUMNS if c in shortlisted.columns] | |
| final_output = shortlisted[existing_cols].copy() | |
| # ------ STEP 11: Save Excel ------ | |
| output_path = os.path.join(work_dir, "shortlisted_ranked_candidates.xlsx") | |
| with pd.ExcelWriter(output_path, engine="xlsxwriter") as writer: | |
| final_output.to_excel(writer, index=False, sheet_name="Shortlisted Candidates") | |
| # Auto-adjust column widths | |
| worksheet = writer.sheets["Shortlisted Candidates"] | |
| for i, col in enumerate(final_output.columns): | |
| max_len = max( | |
| final_output[col].astype(str).map(len).max(), | |
| len(col) | |
| ) | |
| worksheet.set_column(i, i, min(max_len + 2, 60)) | |
| summary = ( | |
| f"Total candidates processed : {len(ranked_df)}\n" | |
| f"Shortlisted (above median) : {len(final_output)}\n" | |
| f"Match score threshold : {threshold:.4f}\n" | |
| f"Unmatched folders skipped : {len(matches_df) - len(matched_only)}" | |
| ) | |
| return final_output, output_path, summary | |
| # ============================================================= | |
| # GRADIO WRAPPER | |
| # ============================================================= | |
| def gradio_app(zip_file, job_description_text): | |
| try: | |
| if zip_file is None: | |
| raise gr.Error("Please upload the ZIP file containing candidate CVs.") | |
| if not job_description_text or not str(job_description_text).strip(): | |
| raise gr.Error("Please provide the job description.") | |
| zip_path = zip_file if isinstance(zip_file, str) else zip_file.name | |
| results_df, output_path, summary = run_pipeline(zip_path, job_description_text) | |
| return results_df, output_path, summary | |
| except gr.Error: | |
| raise | |
| except Exception as e: | |
| raise gr.Error(f"Error: {str(e)}") | |
| # ============================================================= | |
| # GRADIO UI | |
| # ============================================================= | |
| with gr.Blocks(title="AI CV Matching & Ranking System") as demo: | |
| gr.Markdown(""" | |
| # AI-Based CV Matching & Ranking System | |
| Upload a ZIP file of candidate CVs and paste the job description. | |
| The system matches CVs to the internal candidate dataset, scores them | |
| with a fine-tuned semantic model, and returns a ranked shortlist Excel file. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| zip_input = gr.File( | |
| label="Upload Candidate CV ZIP File", | |
| file_types=[".zip"], | |
| type="filepath" | |
| ) | |
| job_input = gr.Textbox( | |
| label="Paste Job Description", | |
| lines=15, | |
| placeholder="Paste the full job description here..." | |
| ) | |
| run_button = gr.Button("Match & Rank Candidates", variant="primary") | |
| with gr.Column(): | |
| summary_output = gr.Textbox( | |
| label="Processing Summary", | |
| lines=5, | |
| interactive=False | |
| ) | |
| results_output = gr.Dataframe( | |
| label="Shortlisted Ranked Candidates", | |
| interactive=False, | |
| wrap=True | |
| ) | |
| excel_download = gr.File( | |
| label="Download Ranked Excel Output" | |
| ) | |
| run_button.click( | |
| fn=gradio_app, | |
| inputs=[zip_input, job_input], | |
| outputs=[results_output, excel_download, summary_output] | |
| ) | |
| demo.launch() |