Zayeemk's picture
Upload engine.py
b8f2a93 verified
"""
utils/engine.py — Main orchestration logic for the Resume Match Analyzer.
Consolidates the business logic previously handled by FastAPI routes.
"""
from __future__ import annotations
import uuid
import asyncio
from pathlib import Path
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
from utils.db import crud, models
from utils.db.database import get_db, create_tables
from utils.nlp.text_extractor import text_extractor
from utils.nlp.preprocessor import preprocessor
from utils.nlp.similarity_engine import similarity_engine
from utils.nlp.skill_gap_analyzer import skill_gap_analyzer
from utils.nlp.ats_simulator import ats_simulator
from utils.nlp.section_analyzer import section_analyzer
from utils.nlp.llm_suggestions import llm_suggestions
from utils.core.config import settings
from utils.core.logging_config import get_logger
logger = get_logger(__name__)
async def init_engine():
"""Initialize the database tables."""
await create_tables()
logger.info("Database tables initialized.")
async def analyze_resume(
resume_content: bytes,
resume_filename: str,
jd_text: str,
db: AsyncSession
) -> dict:
"""
Analyze a single resume against a job description.
Integrated version of the original FastAPI /analyze route.
"""
# ── 1. Text extraction ─────────────────────────────────────────
file_ext = Path(resume_filename or "resume.pdf").suffix.lower()
source_type = file_ext.lstrip(".")
try:
resume_text = text_extractor.extract(resume_content, source_type=source_type)
except Exception as exc:
logger.error(f"Text extraction failed: {exc}")
raise ValueError(f"Could not extract text from resume: {exc}")
# JD may be raw text or a URL
if jd_text.strip().startswith("http"):
try:
jd_raw = text_extractor.extract(jd_text, source_type="url")
except Exception as exc:
raise ValueError(f"Could not fetch JD from URL: {exc}")
else:
jd_raw = jd_text
if len(resume_text) < 50:
raise ValueError("Resume appears to be empty or unreadable.")
if len(jd_raw) < 30:
raise ValueError("Job description is too short.")
# ── 2. Run NLP pipeline ─────────────────────────────────────
clean_resume = preprocessor.clean(resume_text)
clean_jd = preprocessor.clean(jd_raw)
similarity = similarity_engine.compute(clean_resume, clean_jd)
skill_gap = skill_gap_analyzer.analyze(clean_resume, clean_jd)
ats_report = ats_simulator.score(resume_text, jd_raw)
sections = section_analyzer.analyze(resume_text, jd_raw)
suggestions = llm_suggestions.generate(
resume_text=resume_text,
jd_text=jd_raw,
missing_skills=skill_gap.all_missing[:10],
match_score=similarity["ensemble_score"],
)
# ── 3. Persist to DB ────────────────────────────────────────
resume_db = await crud.create_resume_file(
db,
filename=resume_filename,
file_size=len(resume_content),
content_type="application/octet-stream",
)
analysis = await crud.create_analysis(
db,
resume_id =resume_db.id,
resume_filename =resume_filename,
jd_snippet =jd_raw[:300],
ensemble_score =similarity["ensemble_score"],
tfidf_score =similarity.get("tfidf_score"),
sbert_score =similarity.get("sbert_score"),
bert_score =similarity.get("bert_score"),
ats_score =ats_report.ats_score,
skill_coverage =skill_gap.coverage_score,
skill_gap_data ={
"missing_by_category": skill_gap.missing_by_category,
"all_missing": skill_gap.all_missing,
"all_jd_skills": skill_gap.all_jd_skills,
"coverage_score": skill_gap.coverage_score,
"priority_gaps": skill_gap.priority_gaps,
},
section_scores =sections.section_scores,
suggestions ={
"bullets": suggestions.resume_bullets,
"keywords": suggestions.keyword_suggestions,
"summary": suggestions.summary_rewrite,
"advice": suggestions.overall_advice,
"used_llm": suggestions.used_llm,
},
ats_report ={
"ats_score": ats_report.ats_score,
"keyword_match_score": ats_report.keyword_match_score,
"format_score": ats_report.format_score,
"keyword_density_score": ats_report.keyword_density_score,
"matched_keywords": ats_report.matched_keywords[:30],
"missing_keywords": ats_report.missing_keywords[:20],
"format_feedback": ats_report.format_feedback,
"improvement_tips": ats_report.improvement_tips,
},
missing_skills =skill_gap.all_missing,
)
return {
"analysis_id": analysis.id,
"resume_file": resume_filename,
"match_scores": similarity,
"skill_gap": {
"coverage_score": skill_gap.coverage_score,
"missing_by_category": skill_gap.missing_by_category,
"all_missing": skill_gap.all_missing,
"priority_gaps": skill_gap.priority_gaps,
"learning_resources": skill_gap.learning_resources,
},
"ats": {
"ats_score": ats_report.ats_score,
"keyword_match_score": ats_report.keyword_match_score,
"format_score": ats_report.format_score,
"matched_keywords": ats_report.matched_keywords[:20],
"missing_keywords": ats_report.missing_keywords[:10],
"improvement_tips": ats_report.improvement_tips,
},
"sections": {
"scores": sections.section_scores,
"feedback": sections.section_feedback,
"strongest": sections.strongest_section,
"weakest": sections.weakest_section,
"overall_score": sections.overall_section_score,
},
"suggestions": {
"bullets": suggestions.resume_bullets,
"keywords": suggestions.keyword_suggestions,
"summary": suggestions.summary_rewrite,
"advice": suggestions.overall_advice,
"used_llm": suggestions.used_llm,
},
}
async def rank_resumes(
resumes: list[tuple[str, bytes]],
jd_text: str
) -> dict:
"""
Rank multiple resumes against a job description.
"""
results = []
for filename, content in resumes:
file_ext = Path(filename or "resume.pdf").suffix.lower()
try:
resume_text = text_extractor.extract(content, source_type=file_ext.lstrip("."))
except Exception as exc:
results.append({
"filename": filename,
"error": f"Could not read file: {exc}",
"rank": None,
})
continue
clean_resume = preprocessor.clean(resume_text)
clean_jd = preprocessor.clean(jd_text)
sim = similarity_engine.compute(clean_resume, clean_jd)
gaps = skill_gap_analyzer.analyze(clean_resume, clean_jd)
results.append({
"filename": filename,
"ensemble_score": sim["ensemble_score"],
"tfidf_score": sim["tfidf_score"],
"sbert_score": sim["sbert_score"],
"confidence": sim["confidence"],
"skill_coverage": gaps.coverage_score,
"missing_skills": gaps.all_missing[:8],
"rank": None,
})
# Sort
valid = [r for r in results if r.get("ensemble_score") is not None]
valid.sort(key=lambda x: x["ensemble_score"], reverse=True)
for i, item in enumerate(valid, start=1):
item["rank"] = i
return {
"ranked_resumes": valid,
"errors": [r for r in results if r.get("ensemble_score") is None],
"jd_snippet": jd_text[:200],
"total_submitted": len(resumes),
"total_ranked": len(valid),
}
async def get_history(db: AsyncSession, limit: int = 20) -> list:
"""Retrieve recent analysis history."""
try:
stmt = select(models.AnalysisResult).order_by(models.AnalysisResult.created_at.desc()).limit(limit)
result = await db.execute(stmt)
return list(result.scalars().all())
except Exception as exc:
logger.error(f"History retrieval failed: {exc}")
return []
async def delete_history_item(db: AsyncSession, analysis_id: str) -> bool:
"""Delete an analysis from history."""
try:
stmt = delete(models.AnalysisResult).where(models.AnalysisResult.id == analysis_id)
await db.execute(stmt)
await db.commit()
return True
except Exception as exc:
logger.error(f"Deletion failed: {exc}")
return False