| import os |
| import fitz |
| from docx import Document |
| from sentence_transformers import SentenceTransformer, util |
| from transformers import pipeline |
| import pandas as pd |
| from datetime import datetime |
| import zipfile |
| import re |
| import imaplib |
| import email |
|
|
| |
| MODELS = { |
| "Fast (MiniLM)": "sentence-transformers/all-MiniLM-L6-v2", |
| "Balanced (Recommended)": "sentence-transformers/all-mpnet-base-v2", |
| "High Accuracy": "sentence-transformers/multi-qa-mpnet-base-dot-v1" |
| } |
|
|
| loaded_models = {} |
| skills_classifier = None |
|
|
| |
| def get_model(name): |
| if name not in loaded_models: |
| loaded_models[name] = SentenceTransformer(MODELS[name]) |
| return loaded_models[name] |
|
|
| def get_classifier(): |
| global skills_classifier |
| if skills_classifier is None: |
| skills_classifier = pipeline( |
| "zero-shot-classification", |
| model="facebook/bart-large-mnli" |
| ) |
| return skills_classifier |
|
|
| |
| def extract_text(file_path): |
| ext = os.path.splitext(file_path)[1].lower() |
|
|
| try: |
| if ext == ".pdf": |
| doc = fitz.open(file_path) |
| text = "\n".join([page.get_text() for page in doc]) |
| doc.close() |
| return text.strip() |
|
|
| elif ext in [".docx", ".doc"]: |
| doc = Document(file_path) |
| return "\n".join([p.text for p in doc.paragraphs]).strip() |
|
|
| except: |
| return "" |
|
|
| return "" |
|
|
| |
| def fetch_from_gmail(email_user, app_password): |
| mail = imaplib.IMAP4_SSL("imap.gmail.com") |
| mail.login(email_user, app_password) |
| mail.select("inbox") |
|
|
| result, data = mail.search(None, '(SUBJECT "resume")') |
| ids = data[0].split() |
|
|
| files = [] |
|
|
| for i in ids[-10:]: |
| result, msg_data = mail.fetch(i, "(RFC822)") |
| msg = email.message_from_bytes(msg_data[0][1]) |
|
|
| for part in msg.walk(): |
| if part.get_content_disposition() == "attachment": |
| filename = part.get_filename() |
|
|
| if filename: |
| path = f"temp_{filename}" |
| with open(path, "wb") as f: |
| f.write(part.get_payload(decode=True)) |
| files.append(path) |
|
|
| return files |
|
|
| |
| def extract_skills(text): |
| labels = [ |
| "Python", "Machine Learning", "Deep Learning", |
| "SQL", "AWS", "Docker", "Communication" |
| ] |
|
|
| try: |
| clf = get_classifier() |
| res = clf(text[:2000], labels, multi_label=True) |
|
|
| return ", ".join([ |
| l for l, s in zip(res["labels"], res["scores"]) if s > 0.4 |
| ]) |
| except: |
| return "N/A" |
|
|
| def extract_qualifications(text): |
| pattern = r'\b(bba|bs|bsc|ba|mba|msc|phd|bachelor|master)\b' |
| found = re.findall(pattern, text.lower()) |
| return ", ".join(set(found)).upper() if found else "Not mentioned" |
|
|
| |
| def screen_resumes_backend(job_desc, files, model_name="Fast (MiniLM)", threshold=0.65, |
| gmail=None, password=None): |
|
|
| |
| if gmail and password: |
| files = fetch_from_gmail(gmail, password) |
|
|
| model = get_model(model_name) |
| job_emb = model.encode(job_desc, convert_to_tensor=True) |
|
|
| results = [] |
| os.makedirs("outputs", exist_ok=True) |
|
|
| for f in files: |
|
|
| |
| if hasattr(f, "read"): |
| name = f.name |
| path = f"temp_{name}" |
| with open(path, "wb") as x: |
| x.write(f.read()) |
| fpath = path |
| else: |
| fpath = f |
| name = os.path.basename(f) |
|
|
| text = extract_text(fpath) |
|
|
| if len(text) < 50: |
| continue |
|
|
| emb = model.encode(text, convert_to_tensor=True) |
| score = util.cos_sim(job_emb, emb)[0][0].item() |
|
|
| status = "Shortlisted" if score >= threshold else "Rejected" |
|
|
| results.append({ |
| "Candidate": name, |
| "Score (%)": round(score * 100, 2), |
| "Skills": extract_skills(text), |
| "Qualification": extract_qualifications(text), |
| "Status": status |
| }) |
|
|
| |
| df = pd.DataFrame(results) |
| report_path = f"outputs/report_{datetime.now().strftime('%Y%m%d_%H%M')}.csv" |
| df.to_csv(report_path, index=False) |
|
|
| |
| zip_path = None |
| shortlisted = df[df["Status"] == "Shortlisted"] |
|
|
| if len(shortlisted) > 0: |
| zip_path = "outputs/shortlisted.zip" |
| with zipfile.ZipFile(zip_path, "w") as z: |
| for f in files: |
| if os.path.exists(f): |
| z.write(f) |
|
|
| return results, report_path, zip_path |