import os import re import zipfile import tempfile import pandas as pd import pdfplumber import fitz # PyMuPDF import gradio as gr from docx import Document from sentence_transformers import SentenceTransformer, util # ============================================================= # CONFIG # ============================================================= # Upload this Excel file to the root of your HF Space INTERNAL_EXCEL_FILE = "Summary_of_Faculty_Rankig_16th Feb 2025.xlsx" # Your fine-tuned model on Hugging Face Hub MODEL_NAME = "csAhmad/zoraiz-model" # Exact output columns — matches your original Excel exactly OUTPUT_COLUMNS = [ "Name (Age)", "Contact", "Current Job", "Qualifciation", "Experience", "Publications", "Citation", "H-index", "Nationality", "Other Achievements", "Area ", "Comments" ] # ============================================================= # LOAD MODEL (once at startup) # ============================================================= print("Loading model...") app_model = SentenceTransformer(MODEL_NAME) print("Model loaded.") # ============================================================= # HELPERS # ============================================================= def normalize_text(text): if pd.isna(text): return "" text = str(text).strip().lower() text = re.sub(r"\s+", " ", text) text = re.sub(r"[^a-z0-9\s]", "", text) return text def extract_name_only(name_age_value): """Strips URLs, age brackets, and returns clean name only.""" if pd.isna(name_age_value): return "" text = str(name_age_value).strip() # Remove URLs text = re.sub(r'https?://\S+', '', text) # Remove age/date in brackets e.g. (35) or (Date of birth: ...) text = re.sub(r'\([^)]*\)', '', text) # Find first line that looks like a real name lines = [l.strip() for l in text.split('\n') if l.strip()] name = "" for line in lines: # Skip emails, long lines, pure numbers, known non-name keywords if '@' in line or len(line) > 60: continue if re.match(r'^[\d\s\+\-\(\)]+$', line): continue if any(kw in line.lower() for kw in ['scholar', 'citation', 'http', 'www', 'email', 'phone', 'mobile']): continue name = line break return re.sub(r'\s+', ' ', name).strip() def name_to_tokens(name): name = normalize_text(name) return [t for t in name.split() if len(t) >= 2] def detect_document_type(file_name): name = str(file_name).lower() if "cv" in name or "resume" in name: return "cv" elif "cover" in name: return "cover_letter" elif "research" in name: return "research_statement" elif "teaching" in name: return "teaching_statement" elif "publication" in name: return "publication_list" elif "reference" in name: return "reference" elif "transcript" in name or "degree" in name or "certificate" in name: return "academic_document" elif "passport" in name or "visa" in name: return "identity_document" else: return "other" # ============================================================= # TEXT EXTRACTION # ============================================================= def extract_text_from_pdf(file_path): text = "" # pdfplumber first try: with pdfplumber.open(file_path) as pdf: for page in pdf.pages: try: t = page.extract_text() if t: text += t + "\n" except Exception: pass except Exception: pass # PyMuPDF fallback if not text.strip(): try: doc = fitz.open(file_path) for page in doc: t = page.get_text("text") if t: text += t + "\n" doc.close() except Exception as e: print(f"[PDF error] {file_path}: {e}") return text def extract_text_from_docx(file_path): text = "" try: doc = Document(file_path) for para in doc.paragraphs: if para.text: text += para.text + "\n" except Exception as e: print(f"[DOCX error] {file_path}: {e}") return text def extract_document_text(file_path): ext = os.path.splitext(file_path)[1].lower() if ext == ".pdf": return extract_text_from_pdf(file_path) elif ext in [".docx", ".doc"]: return extract_text_from_docx(file_path) elif ext == ".txt": if not os.path.exists(file_path): return "" try: with open(file_path, "r", encoding="utf-8", errors="ignore") as f: return f.read() except Exception: return "" return "" # ============================================================= # MATCHING: CV folder name → Excel row # ============================================================= def match_by_token_overlap(matching_text, excel_df, min_hits=2): text_clean = normalize_text(matching_text) best_idx = None best_hits = -1 best_score = -1 best_name = None for idx, row in excel_df.iterrows(): tokens = row["candidate_name_tokens"] if not tokens: continue hits = sum(1 for t in tokens if t in text_clean) coverage = hits / max(len(tokens), 1) score = hits + coverage if hits > best_hits or (hits == best_hits and score > best_score): best_idx = idx best_hits = hits best_score = score best_name = row["candidate_name_only"] return (best_idx, best_name) if best_hits >= min_hits else (None, None) # ============================================================= # BUILD RICH PROFILE TEXT FOR SEMANTIC MODEL # ============================================================= def build_candidate_profile(row): """ Combines the pre-filled Excel fields + extracted CV document text into one string for the semantic model to score against the JD. """ parts = [] # Excel fields (already filled in by your team) fields = [ ("Name", row.get("Name (Age)", "")), ("Current Job", row.get("Current Job", "")), ("Qualification", row.get("Qualifciation", "")), # typo preserved from Excel ("Experience", row.get("Experience", "")), ("Publications", row.get("Publications", "")), ("Citations", row.get("Citation", "")), ("H-index", row.get("H-index", "")), ("Nationality", row.get("Nationality", "")), ("Achievements", row.get("Other Achievements", "")), ("Area", row.get("Area ", "")), # trailing space preserved ("Comments", row.get("Comments", "")), ] for label, value in fields: value = str(value).strip() if value and value.lower() != "nan": parts.append(f"{label}: {value}") # Extracted CV document text cv_text = str(row.get("combined_profile_text", "")).strip() if cv_text: parts.append(f"CV Documents:\n{cv_text}") return "\n".join(parts).strip() # ============================================================= # MAIN PIPELINE # ============================================================= def run_pipeline(zip_file_path, job_description_text): work_dir = tempfile.mkdtemp(prefix="cv_rank_") extract_folder = os.path.join(work_dir, "documents") os.makedirs(extract_folder, exist_ok=True) # ------ STEP 1: Load internal Excel ------ if not os.path.exists(INTERNAL_EXCEL_FILE): raise FileNotFoundError( f"Internal dataset not found: '{INTERNAL_EXCEL_FILE}'. " "Please upload it to the root of your HF Space." ) df = pd.read_excel(INTERNAL_EXCEL_FILE) # Strip whitespace from all column names df.columns = df.columns.str.strip() # NOTE: After stripping, "Area " becomes "Area" — re-add trailing space # to stay consistent with Excel original if "Area" in df.columns and "Area " not in df.columns: df = df.rename(columns={"Area": "Area "}) df["candidate_name_raw"] = df["Name (Age)"].astype(str) df["candidate_name_only"] = df["candidate_name_raw"].apply(extract_name_only) df["candidate_name_tokens"] = df["candidate_name_only"].apply(name_to_tokens) # Fill NaN in key columns for col in ["Other Achievements", "Area ", "Comments", "Contact", "Current Job", "Qualifciation", "Experience", "Publications", "Citation", "H-index", "Nationality"]: if col in df.columns: df[col] = df[col].fillna("") # ------ STEP 2: Extract ZIP ------ try: with zipfile.ZipFile(zip_file_path, "r") as z: z.extractall(extract_folder) except zipfile.BadZipFile: raise ValueError("Invalid ZIP file.") # ------ STEP 3: Scan documents ------ valid_ext = {".pdf", ".docx", ".doc"} doc_rows = [] for root, _, files in os.walk(extract_folder): for fname in files: if fname.startswith(".") or fname.startswith("__"): continue ext = os.path.splitext(fname)[1].lower() if ext not in valid_ext: continue full_path = os.path.join(root, fname) rel_path = os.path.relpath(full_path, extract_folder) folder_name = os.path.dirname(rel_path) if folder_name in ("", "."): folder_name = os.path.splitext(fname)[0] doc_rows.append({ "file_name": fname, "full_path": full_path, "folder_name": folder_name, "extension": ext }) if not doc_rows: raise ValueError("No valid PDF or DOCX files found in the ZIP.") docs_df = pd.DataFrame(doc_rows) # ------ STEP 4: Extract text ------ text_rows = [] for _, row in docs_df.iterrows(): text = extract_document_text(row["full_path"]) text = text.replace("\x00", " ") text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text).strip() status = "success" if text else "empty" text_rows.append({ "file_name": row["file_name"], "folder_name": row["folder_name"], "text": text, "status": status, "doc_type": detect_document_type(row["file_name"]) }) text_df = pd.DataFrame(text_rows) # Keep useful doc types; fall back to all readable useful_types = {"cv", "cover_letter", "research_statement", "teaching_statement", "publication_list"} useful_df = text_df[(text_df["status"] == "success") & (text_df["doc_type"].isin(useful_types))].copy() if useful_df.empty: print("[Warning] No files matched standard doc types — using all readable files.") useful_df = text_df[text_df["status"] == "success"].copy() if useful_df.empty: raise ValueError("No readable documents found in the ZIP.") # ------ STEP 5: Build one combined profile per folder ------ doc_priority = {"cv": 1, "research_statement": 2, "teaching_statement": 3, "publication_list": 4, "cover_letter": 5, "other": 99} useful_df["priority"] = useful_df["doc_type"].map(doc_priority).fillna(99) useful_df = useful_df.sort_values(["folder_name", "priority", "file_name"]).reset_index(drop=True) profiles = [] for folder_name, group in useful_df.groupby("folder_name"): parts = [] included_files = [] included_types = [] for _, doc_row in group.iterrows(): t = str(doc_row["text"]).strip() if not t: continue parts.append( f"\n--- {doc_row['doc_type'].upper()} | {doc_row['file_name']} ---\n{t}" ) included_files.append(doc_row["file_name"]) included_types.append(doc_row["doc_type"]) profiles.append({ "folder_name": folder_name, "combined_profile_text": "\n".join(parts).strip(), "included_files": " | ".join(included_files), "included_doc_types": " | ".join(sorted(set(included_types))) }) profiles_df = pd.DataFrame(profiles) if profiles_df.empty: raise ValueError("No candidate profiles could be built.") # Build matching text (folder name + filenames + first 1500 chars of profile) profiles_df["matching_text"] = profiles_df.apply( lambda r: f"{r['folder_name']}\n{r['included_files']}\n{r['combined_profile_text'][:1500]}", axis=1 ) # ------ STEP 6: Match folders → Excel rows ------ matches = [] for _, row in profiles_df.iterrows(): matched_idx, matched_name = match_by_token_overlap( row["matching_text"], df, min_hits=2 ) matches.append({ "folder_name": row["folder_name"], "matched_excel_index": matched_idx, "matched_name": matched_name }) matches_df = pd.DataFrame(matches) matched_only = matches_df[matches_df["matched_excel_index"].notna()].copy() if matched_only.empty: raise ValueError( "No candidates could be matched between ZIP folder names and the Excel dataset. " "Ensure ZIP folder names contain the candidate names from the Excel file." ) # Merge with Excel rows merged_df = matched_only.merge( df.reset_index().rename(columns={"index": "excel_index"}), left_on="matched_excel_index", right_on="excel_index", how="left" ) # ------ STEP 7: Merge with profile texts ------ final_df = merged_df.merge( profiles_df[["folder_name", "combined_profile_text", "included_files", "included_doc_types"]], on="folder_name", how="left" ) for col in ["combined_profile_text", "included_files", "included_doc_types"]: final_df[col] = final_df[col].fillna("") # Build rich profile string for model final_df["candidate_profile_for_model"] = final_df.apply(build_candidate_profile, axis=1) # ------ STEP 8: Semantic scoring ------ job_embedding = app_model.encode( job_description_text, convert_to_tensor=True, normalize_embeddings=True ) cand_embeddings = app_model.encode( final_df["candidate_profile_for_model"].tolist(), convert_to_tensor=True, normalize_embeddings=True ) scores = util.cos_sim(job_embedding, cand_embeddings)[0] final_df["Match Score"] = scores.cpu().numpy().round(4) # ------ STEP 9: Rank and shortlist (above median) ------ ranked_df = final_df.sort_values("Match Score", ascending=False).reset_index(drop=True) threshold = ranked_df["Match Score"].median() shortlisted = ranked_df[ranked_df["Match Score"] >= threshold].copy().reset_index(drop=True) # Clean up Name (Age) — strip URLs and show name only shortlisted["Name (Age)"] = shortlisted["Name (Age)"].apply(extract_name_only) # ------ STEP 10: Build final output with exact Excel columns ------ # Ensure all output columns exist for col in OUTPUT_COLUMNS: if col not in shortlisted.columns: shortlisted[col] = "" existing_cols = [c for c in OUTPUT_COLUMNS if c in shortlisted.columns] final_output = shortlisted[existing_cols].copy() # ------ STEP 11: Save Excel ------ output_path = os.path.join(work_dir, "shortlisted_ranked_candidates.xlsx") with pd.ExcelWriter(output_path, engine="xlsxwriter") as writer: final_output.to_excel(writer, index=False, sheet_name="Shortlisted Candidates") # Auto-adjust column widths worksheet = writer.sheets["Shortlisted Candidates"] for i, col in enumerate(final_output.columns): max_len = max( final_output[col].astype(str).map(len).max(), len(col) ) worksheet.set_column(i, i, min(max_len + 2, 60)) summary = ( f"Total candidates processed : {len(ranked_df)}\n" f"Shortlisted (above median) : {len(final_output)}\n" f"Match score threshold : {threshold:.4f}\n" f"Unmatched folders skipped : {len(matches_df) - len(matched_only)}" ) return final_output, output_path, summary # ============================================================= # GRADIO WRAPPER # ============================================================= def gradio_app(zip_file, job_description_text): try: if zip_file is None: raise gr.Error("Please upload the ZIP file containing candidate CVs.") if not job_description_text or not str(job_description_text).strip(): raise gr.Error("Please provide the job description.") zip_path = zip_file if isinstance(zip_file, str) else zip_file.name results_df, output_path, summary = run_pipeline(zip_path, job_description_text) return results_df, output_path, summary except gr.Error: raise except Exception as e: raise gr.Error(f"Error: {str(e)}") # ============================================================= # GRADIO UI # ============================================================= with gr.Blocks(title="AI CV Matching & Ranking System") as demo: gr.Markdown(""" # AI-Based CV Matching & Ranking System Upload a ZIP file of candidate CVs and paste the job description. The system matches CVs to the internal candidate dataset, scores them with a fine-tuned semantic model, and returns a ranked shortlist Excel file. """) with gr.Row(): with gr.Column(): zip_input = gr.File( label="Upload Candidate CV ZIP File", file_types=[".zip"], type="filepath" ) job_input = gr.Textbox( label="Paste Job Description", lines=15, placeholder="Paste the full job description here..." ) run_button = gr.Button("Match & Rank Candidates", variant="primary") with gr.Column(): summary_output = gr.Textbox( label="Processing Summary", lines=5, interactive=False ) results_output = gr.Dataframe( label="Shortlisted Ranked Candidates", interactive=False, wrap=True ) excel_download = gr.File( label="Download Ranked Excel Output" ) run_button.click( fn=gradio_app, inputs=[zip_input, job_input], outputs=[results_output, excel_download, summary_output] ) demo.launch()