csAhmad's picture
Update app.py
8443fd7 verified
import os
import re
import zipfile
import tempfile
import pandas as pd
import pdfplumber
import fitz # PyMuPDF
import gradio as gr
from docx import Document
from sentence_transformers import SentenceTransformer, util
# =============================================================
# CONFIG
# =============================================================
# Upload this Excel file to the root of your HF Space
INTERNAL_EXCEL_FILE = "Summary_of_Faculty_Rankig_16th Feb 2025.xlsx"
# Your fine-tuned model on Hugging Face Hub
MODEL_NAME = "csAhmad/zoraiz-model"
# Exact output columns — matches your original Excel exactly
OUTPUT_COLUMNS = [
"Name (Age)", "Contact", "Current Job", "Qualifciation",
"Experience", "Publications", "Citation", "H-index",
"Nationality", "Other Achievements", "Area ", "Comments"
]
# =============================================================
# LOAD MODEL (once at startup)
# =============================================================
print("Loading model...")
app_model = SentenceTransformer(MODEL_NAME)
print("Model loaded.")
# =============================================================
# HELPERS
# =============================================================
def normalize_text(text):
if pd.isna(text):
return ""
text = str(text).strip().lower()
text = re.sub(r"\s+", " ", text)
text = re.sub(r"[^a-z0-9\s]", "", text)
return text
def extract_name_only(name_age_value):
"""Strips URLs, age brackets, and returns clean name only."""
if pd.isna(name_age_value):
return ""
text = str(name_age_value).strip()
# Remove URLs
text = re.sub(r'https?://\S+', '', text)
# Remove age/date in brackets e.g. (35) or (Date of birth: ...)
text = re.sub(r'\([^)]*\)', '', text)
# Find first line that looks like a real name
lines = [l.strip() for l in text.split('\n') if l.strip()]
name = ""
for line in lines:
# Skip emails, long lines, pure numbers, known non-name keywords
if '@' in line or len(line) > 60:
continue
if re.match(r'^[\d\s\+\-\(\)]+$', line):
continue
if any(kw in line.lower() for kw in ['scholar', 'citation', 'http', 'www', 'email', 'phone', 'mobile']):
continue
name = line
break
return re.sub(r'\s+', ' ', name).strip()
def name_to_tokens(name):
name = normalize_text(name)
return [t for t in name.split() if len(t) >= 2]
def detect_document_type(file_name):
name = str(file_name).lower()
if "cv" in name or "resume" in name:
return "cv"
elif "cover" in name:
return "cover_letter"
elif "research" in name:
return "research_statement"
elif "teaching" in name:
return "teaching_statement"
elif "publication" in name:
return "publication_list"
elif "reference" in name:
return "reference"
elif "transcript" in name or "degree" in name or "certificate" in name:
return "academic_document"
elif "passport" in name or "visa" in name:
return "identity_document"
else:
return "other"
# =============================================================
# TEXT EXTRACTION
# =============================================================
def extract_text_from_pdf(file_path):
text = ""
# pdfplumber first
try:
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
try:
t = page.extract_text()
if t:
text += t + "\n"
except Exception:
pass
except Exception:
pass
# PyMuPDF fallback
if not text.strip():
try:
doc = fitz.open(file_path)
for page in doc:
t = page.get_text("text")
if t:
text += t + "\n"
doc.close()
except Exception as e:
print(f"[PDF error] {file_path}: {e}")
return text
def extract_text_from_docx(file_path):
text = ""
try:
doc = Document(file_path)
for para in doc.paragraphs:
if para.text:
text += para.text + "\n"
except Exception as e:
print(f"[DOCX error] {file_path}: {e}")
return text
def extract_document_text(file_path):
ext = os.path.splitext(file_path)[1].lower()
if ext == ".pdf":
return extract_text_from_pdf(file_path)
elif ext in [".docx", ".doc"]:
return extract_text_from_docx(file_path)
elif ext == ".txt":
if not os.path.exists(file_path):
return ""
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
except Exception:
return ""
return ""
# =============================================================
# MATCHING: CV folder name → Excel row
# =============================================================
def match_by_token_overlap(matching_text, excel_df, min_hits=2):
text_clean = normalize_text(matching_text)
best_idx = None
best_hits = -1
best_score = -1
best_name = None
for idx, row in excel_df.iterrows():
tokens = row["candidate_name_tokens"]
if not tokens:
continue
hits = sum(1 for t in tokens if t in text_clean)
coverage = hits / max(len(tokens), 1)
score = hits + coverage
if hits > best_hits or (hits == best_hits and score > best_score):
best_idx = idx
best_hits = hits
best_score = score
best_name = row["candidate_name_only"]
return (best_idx, best_name) if best_hits >= min_hits else (None, None)
# =============================================================
# BUILD RICH PROFILE TEXT FOR SEMANTIC MODEL
# =============================================================
def build_candidate_profile(row):
"""
Combines the pre-filled Excel fields + extracted CV document text
into one string for the semantic model to score against the JD.
"""
parts = []
# Excel fields (already filled in by your team)
fields = [
("Name", row.get("Name (Age)", "")),
("Current Job", row.get("Current Job", "")),
("Qualification", row.get("Qualifciation", "")), # typo preserved from Excel
("Experience", row.get("Experience", "")),
("Publications", row.get("Publications", "")),
("Citations", row.get("Citation", "")),
("H-index", row.get("H-index", "")),
("Nationality", row.get("Nationality", "")),
("Achievements", row.get("Other Achievements", "")),
("Area", row.get("Area ", "")), # trailing space preserved
("Comments", row.get("Comments", "")),
]
for label, value in fields:
value = str(value).strip()
if value and value.lower() != "nan":
parts.append(f"{label}: {value}")
# Extracted CV document text
cv_text = str(row.get("combined_profile_text", "")).strip()
if cv_text:
parts.append(f"CV Documents:\n{cv_text}")
return "\n".join(parts).strip()
# =============================================================
# MAIN PIPELINE
# =============================================================
def run_pipeline(zip_file_path, job_description_text):
work_dir = tempfile.mkdtemp(prefix="cv_rank_")
extract_folder = os.path.join(work_dir, "documents")
os.makedirs(extract_folder, exist_ok=True)
# ------ STEP 1: Load internal Excel ------
if not os.path.exists(INTERNAL_EXCEL_FILE):
raise FileNotFoundError(
f"Internal dataset not found: '{INTERNAL_EXCEL_FILE}'. "
"Please upload it to the root of your HF Space."
)
df = pd.read_excel(INTERNAL_EXCEL_FILE)
# Strip whitespace from all column names
df.columns = df.columns.str.strip()
# NOTE: After stripping, "Area " becomes "Area" — re-add trailing space
# to stay consistent with Excel original
if "Area" in df.columns and "Area " not in df.columns:
df = df.rename(columns={"Area": "Area "})
df["candidate_name_raw"] = df["Name (Age)"].astype(str)
df["candidate_name_only"] = df["candidate_name_raw"].apply(extract_name_only)
df["candidate_name_tokens"] = df["candidate_name_only"].apply(name_to_tokens)
# Fill NaN in key columns
for col in ["Other Achievements", "Area ", "Comments", "Contact",
"Current Job", "Qualifciation", "Experience",
"Publications", "Citation", "H-index", "Nationality"]:
if col in df.columns:
df[col] = df[col].fillna("")
# ------ STEP 2: Extract ZIP ------
try:
with zipfile.ZipFile(zip_file_path, "r") as z:
z.extractall(extract_folder)
except zipfile.BadZipFile:
raise ValueError("Invalid ZIP file.")
# ------ STEP 3: Scan documents ------
valid_ext = {".pdf", ".docx", ".doc"}
doc_rows = []
for root, _, files in os.walk(extract_folder):
for fname in files:
if fname.startswith(".") or fname.startswith("__"):
continue
ext = os.path.splitext(fname)[1].lower()
if ext not in valid_ext:
continue
full_path = os.path.join(root, fname)
rel_path = os.path.relpath(full_path, extract_folder)
folder_name = os.path.dirname(rel_path)
if folder_name in ("", "."):
folder_name = os.path.splitext(fname)[0]
doc_rows.append({
"file_name": fname,
"full_path": full_path,
"folder_name": folder_name,
"extension": ext
})
if not doc_rows:
raise ValueError("No valid PDF or DOCX files found in the ZIP.")
docs_df = pd.DataFrame(doc_rows)
# ------ STEP 4: Extract text ------
text_rows = []
for _, row in docs_df.iterrows():
text = extract_document_text(row["full_path"])
text = text.replace("\x00", " ")
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text).strip()
status = "success" if text else "empty"
text_rows.append({
"file_name": row["file_name"],
"folder_name": row["folder_name"],
"text": text,
"status": status,
"doc_type": detect_document_type(row["file_name"])
})
text_df = pd.DataFrame(text_rows)
# Keep useful doc types; fall back to all readable
useful_types = {"cv", "cover_letter", "research_statement", "teaching_statement", "publication_list"}
useful_df = text_df[(text_df["status"] == "success") & (text_df["doc_type"].isin(useful_types))].copy()
if useful_df.empty:
print("[Warning] No files matched standard doc types — using all readable files.")
useful_df = text_df[text_df["status"] == "success"].copy()
if useful_df.empty:
raise ValueError("No readable documents found in the ZIP.")
# ------ STEP 5: Build one combined profile per folder ------
doc_priority = {"cv": 1, "research_statement": 2, "teaching_statement": 3,
"publication_list": 4, "cover_letter": 5, "other": 99}
useful_df["priority"] = useful_df["doc_type"].map(doc_priority).fillna(99)
useful_df = useful_df.sort_values(["folder_name", "priority", "file_name"]).reset_index(drop=True)
profiles = []
for folder_name, group in useful_df.groupby("folder_name"):
parts = []
included_files = []
included_types = []
for _, doc_row in group.iterrows():
t = str(doc_row["text"]).strip()
if not t:
continue
parts.append(
f"\n--- {doc_row['doc_type'].upper()} | {doc_row['file_name']} ---\n{t}"
)
included_files.append(doc_row["file_name"])
included_types.append(doc_row["doc_type"])
profiles.append({
"folder_name": folder_name,
"combined_profile_text": "\n".join(parts).strip(),
"included_files": " | ".join(included_files),
"included_doc_types": " | ".join(sorted(set(included_types)))
})
profiles_df = pd.DataFrame(profiles)
if profiles_df.empty:
raise ValueError("No candidate profiles could be built.")
# Build matching text (folder name + filenames + first 1500 chars of profile)
profiles_df["matching_text"] = profiles_df.apply(
lambda r: f"{r['folder_name']}\n{r['included_files']}\n{r['combined_profile_text'][:1500]}",
axis=1
)
# ------ STEP 6: Match folders → Excel rows ------
matches = []
for _, row in profiles_df.iterrows():
matched_idx, matched_name = match_by_token_overlap(
row["matching_text"], df, min_hits=2
)
matches.append({
"folder_name": row["folder_name"],
"matched_excel_index": matched_idx,
"matched_name": matched_name
})
matches_df = pd.DataFrame(matches)
matched_only = matches_df[matches_df["matched_excel_index"].notna()].copy()
if matched_only.empty:
raise ValueError(
"No candidates could be matched between ZIP folder names and the Excel dataset. "
"Ensure ZIP folder names contain the candidate names from the Excel file."
)
# Merge with Excel rows
merged_df = matched_only.merge(
df.reset_index().rename(columns={"index": "excel_index"}),
left_on="matched_excel_index",
right_on="excel_index",
how="left"
)
# ------ STEP 7: Merge with profile texts ------
final_df = merged_df.merge(
profiles_df[["folder_name", "combined_profile_text", "included_files", "included_doc_types"]],
on="folder_name",
how="left"
)
for col in ["combined_profile_text", "included_files", "included_doc_types"]:
final_df[col] = final_df[col].fillna("")
# Build rich profile string for model
final_df["candidate_profile_for_model"] = final_df.apply(build_candidate_profile, axis=1)
# ------ STEP 8: Semantic scoring ------
job_embedding = app_model.encode(
job_description_text,
convert_to_tensor=True,
normalize_embeddings=True
)
cand_embeddings = app_model.encode(
final_df["candidate_profile_for_model"].tolist(),
convert_to_tensor=True,
normalize_embeddings=True
)
scores = util.cos_sim(job_embedding, cand_embeddings)[0]
final_df["Match Score"] = scores.cpu().numpy().round(4)
# ------ STEP 9: Rank and shortlist (above median) ------
ranked_df = final_df.sort_values("Match Score", ascending=False).reset_index(drop=True)
threshold = ranked_df["Match Score"].median()
shortlisted = ranked_df[ranked_df["Match Score"] >= threshold].copy().reset_index(drop=True)
# Clean up Name (Age) — strip URLs and show name only
shortlisted["Name (Age)"] = shortlisted["Name (Age)"].apply(extract_name_only)
# ------ STEP 10: Build final output with exact Excel columns ------
# Ensure all output columns exist
for col in OUTPUT_COLUMNS:
if col not in shortlisted.columns:
shortlisted[col] = ""
existing_cols = [c for c in OUTPUT_COLUMNS if c in shortlisted.columns]
final_output = shortlisted[existing_cols].copy()
# ------ STEP 11: Save Excel ------
output_path = os.path.join(work_dir, "shortlisted_ranked_candidates.xlsx")
with pd.ExcelWriter(output_path, engine="xlsxwriter") as writer:
final_output.to_excel(writer, index=False, sheet_name="Shortlisted Candidates")
# Auto-adjust column widths
worksheet = writer.sheets["Shortlisted Candidates"]
for i, col in enumerate(final_output.columns):
max_len = max(
final_output[col].astype(str).map(len).max(),
len(col)
)
worksheet.set_column(i, i, min(max_len + 2, 60))
summary = (
f"Total candidates processed : {len(ranked_df)}\n"
f"Shortlisted (above median) : {len(final_output)}\n"
f"Match score threshold : {threshold:.4f}\n"
f"Unmatched folders skipped : {len(matches_df) - len(matched_only)}"
)
return final_output, output_path, summary
# =============================================================
# GRADIO WRAPPER
# =============================================================
def gradio_app(zip_file, job_description_text):
try:
if zip_file is None:
raise gr.Error("Please upload the ZIP file containing candidate CVs.")
if not job_description_text or not str(job_description_text).strip():
raise gr.Error("Please provide the job description.")
zip_path = zip_file if isinstance(zip_file, str) else zip_file.name
results_df, output_path, summary = run_pipeline(zip_path, job_description_text)
return results_df, output_path, summary
except gr.Error:
raise
except Exception as e:
raise gr.Error(f"Error: {str(e)}")
# =============================================================
# GRADIO UI
# =============================================================
with gr.Blocks(title="AI CV Matching & Ranking System") as demo:
gr.Markdown("""
# AI-Based CV Matching & Ranking System
Upload a ZIP file of candidate CVs and paste the job description.
The system matches CVs to the internal candidate dataset, scores them
with a fine-tuned semantic model, and returns a ranked shortlist Excel file.
""")
with gr.Row():
with gr.Column():
zip_input = gr.File(
label="Upload Candidate CV ZIP File",
file_types=[".zip"],
type="filepath"
)
job_input = gr.Textbox(
label="Paste Job Description",
lines=15,
placeholder="Paste the full job description here..."
)
run_button = gr.Button("Match & Rank Candidates", variant="primary")
with gr.Column():
summary_output = gr.Textbox(
label="Processing Summary",
lines=5,
interactive=False
)
results_output = gr.Dataframe(
label="Shortlisted Ranked Candidates",
interactive=False,
wrap=True
)
excel_download = gr.File(
label="Download Ranked Excel Output"
)
run_button.click(
fn=gradio_app,
inputs=[zip_input, job_input],
outputs=[results_output, excel_download, summary_output]
)
demo.launch()