resume-evaluator / pipeline.py
Avinashnalla7's picture
Update pipeline.py
45de952 verified
import csv
import json
import os
import re
import hashlib
import shutil
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import fitz # pymupdf
import pytesseract
from PIL import Image
from openai import OpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
SCHEMA_VERSION = "1.0"
DEFAULT_MODEL = "gpt-4o-mini"
ALLOWED_SCORE_KEYS = ["skill", "experience", "growth", "context_fit", "combined"]
def _now_ts() -> str:
return datetime.now(timezone.utc).isoformat()
def _safe_slug(s: str, max_len: int = 80) -> str:
s = (s or "").strip()
s = re.sub(r"\s+", "_", s)
s = re.sub(r"[^A-Za-z0-9_\-]+", "", s)
return s[:max_len] if s else "UNKNOWN"
def _sha256_file(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def _atomic_write_text(path: Path, text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(text, encoding="utf-8")
tmp.replace(path)
def _atomic_write_json(path: Path, obj: Any) -> None:
_atomic_write_text(path, json.dumps(obj, ensure_ascii=False, indent=2))
def _read_json(path: Path, default: Any) -> Any:
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
# keep a backup of corrupt state and start fresh
try:
shutil.copy2(path, path.with_suffix(path.suffix + ".corrupt"))
except Exception:
pass
return default
def _pixmap_to_pil_rgb(pix: "fitz.Pixmap") -> Image.Image:
if pix.alpha:
pix = fitz.Pixmap(pix, 0)
return Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
def extract_text_from_pdf(
pdf_path: str,
*,
ocr_if_empty: bool = True,
max_pages: int = 8,
ocr_dpi: int = 200,
) -> str:
"""
1) Extract text with PyMuPDF.
2) If empty and ocr_if_empty: OCR first max_pages pages.
"""
try:
doc = fitz.open(pdf_path)
except Exception:
return ""
parts: List[str] = []
page_count = min(len(doc), max_pages)
# normal extraction
for i in range(page_count):
try:
t = doc[i].get_text("text") or ""
except Exception:
t = ""
if t.strip():
parts.append(t)
text = "\n\n".join(parts).strip()
if text or not ocr_if_empty:
doc.close()
return text
# OCR fallback
ocr_parts: List[str] = []
for i in range(page_count):
try:
page = doc[i]
pix = page.get_pixmap(dpi=ocr_dpi)
img = _pixmap_to_pil_rgb(pix)
ocr_txt = pytesseract.image_to_string(img) or ""
if ocr_txt.strip():
ocr_parts.append(ocr_txt)
except Exception:
continue
doc.close()
return "\n\n".join(ocr_parts).strip()
def build_prompt(text: str, config: Dict[str, Any]) -> str:
projects = config.get("projects") or []
projects_block = json.dumps(projects, ensure_ascii=False)
return f"""
You are an expert technical recruiter. Evaluate the candidate resume text.
Return STRICT JSON ONLY. No markdown. No commentary.
Required JSON schema:
{{
"candidate_name": string | null,
"seniority": string | null,
"scores": {{
"skill": number,
"experience": number,
"growth": number,
"context_fit": number,
"combined": number
}},
"best_project": {{
"project_name": string | null,
"project_score": number
}},
"tags": [string, ...],
"notes": string | null
}}
Rules:
- scores are 0..10 (float allowed)
- combined must be a reasonable aggregate of the others (not random)
- best_project.project_name must be one of the provided projects' names OR null
- tags should be short
- If uncertain, be conservative.
Projects (for matching):
{projects_block}
Resume text:
{text}
""".strip()
def _coerce_score(v: Any) -> float:
try:
f = float(v)
except Exception:
return 0.0
if f < 0:
return 0.0
if f > 10:
return 10.0
return f
def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any], *, job_id: str, pdf_sha256: str, filename: str) -> Dict[str, Any]:
scores = raw.get("scores") if isinstance(raw.get("scores"), dict) else {}
norm_scores = {k: _coerce_score(scores.get(k, 0)) for k in ALLOWED_SCORE_KEYS}
best_project = raw.get("best_project") if isinstance(raw.get("best_project"), dict) else {}
project_name = best_project.get("project_name")
project_score = _coerce_score(best_project.get("project_score", 0))
allowed_project_names = {
p.get("name")
for p in (config.get("projects") or [])
if isinstance(p, dict) and p.get("name")
}
if project_name not in allowed_project_names:
project_name = None
tags = raw.get("tags")
if not isinstance(tags, list):
tags = []
tags = [str(t).strip() for t in tags if str(t).strip()]
tags = tags[:25]
model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL
return {
"schema_version": SCHEMA_VERSION,
"job_id": job_id,
"pdf_sha256": pdf_sha256,
"filename": filename,
"candidate_name": raw.get("candidate_name"),
"seniority": raw.get("seniority"),
"scores": norm_scores,
"best_project": {"project_name": project_name, "project_score": project_score},
"tags": tags,
"notes": raw.get("notes"),
"meta": {"model": model, "timestamp": _now_ts()},
}
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=8))
def llm_evaluate(text: str, config: Dict[str, Any]) -> Dict[str, Any]:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("Missing OPENAI_API_KEY (set it in HF Space Secrets).")
client = OpenAI(api_key=api_key)
model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL
prompt = build_prompt(text, config)
resp = client.responses.create(model=model, input=prompt)
content = resp.output_text
if not content or not content.strip():
raise RuntimeError("LLM returned empty response.")
try:
raw = json.loads(content)
except Exception as e:
raise RuntimeError(f"LLM did not return valid JSON. First 200 chars: {content[:200]!r}") from e
if not isinstance(raw, dict):
raise RuntimeError("LLM JSON must be an object/dict at top-level.")
return raw
def _bucket_label(combined: float, thresholds: Dict[str, float]) -> str:
top = float(thresholds.get("top", 8.0))
strong = float(thresholds.get("strong", 6.5))
maybe = float(thresholds.get("maybe", 5.0))
if combined >= top:
return "top"
if combined >= strong:
return "strong"
if combined >= maybe:
return "maybe"
return "no"
def _write_csv(path: Path, rows: List[Dict[str, Any]], fieldnames: List[str]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fieldnames)
w.writeheader()
for r in rows:
w.writerow({k: r.get(k) for k in fieldnames})
tmp.replace(path)
def _zip_dir(src_dir: Path, zip_path: Path) -> None:
if zip_path.exists():
zip_path.unlink()
import zipfile
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
for p in sorted(src_dir.rglob("*")):
if p.is_file():
z.write(p, arcname=str(p.relative_to(src_dir)))
def run_pipeline(
input_files: List[str],
config: Dict[str, Any],
*,
output_root: Optional[str] = None,
job_id: str,
) -> Dict[str, Any]:
"""
Stable contract:
- output_root contains persistent state (manifest)
- job_id creates isolated job folder under output_root/jobs/{job_id}
- returns job_dir + zip_path + counts
"""
output_root_path = Path(output_root or "/tmp/resume_eval_root").resolve()
output_root_path.mkdir(parents=True, exist_ok=True)
# Persistent manifest across runs (dedupe state)
manifest_path = output_root_path / "processed_manifest.json"
manifest: Dict[str, Any] = _read_json(manifest_path, default={})
if not isinstance(manifest, dict):
manifest = {}
# Job layout
jobs_root = output_root_path / "jobs"
job_dir = jobs_root / job_id
input_dir = job_dir / "input"
text_dir = job_dir / "extracted_text"
eval_dir = job_dir / "evaluations"
reports_dir = job_dir / "reports"
for d in [input_dir, text_dir, eval_dir, reports_dir]:
d.mkdir(parents=True, exist_ok=True)
rewrite = bool(config.get("rewrite", False))
projects = config.get("projects") or [{"name": "STANDARD"}]
default_project_name = (projects[0] or {}).get("name", "STANDARD")
ocr_max_pages = int(config.get("ocr_max_pages", 8))
ocr_dpi = int(config.get("ocr_dpi", 200))
thresholds = config.get("bucket_thresholds") or {"top": 8.0, "strong": 6.5, "maybe": 5.0}
top_n = int(config.get("top_n", 25))
per_job_index: List[Dict[str, Any]] = []
evaluations: List[Dict[str, Any]] = []
counts = {"total": 0, "success": 0, "skipped": 0, "failed": 0}
for src_path in input_files or []:
counts["total"] += 1
src_path = str(Path(src_path).resolve())
filename = os.path.basename(src_path)
# Copy into job input/ (this is important for later SFTP job contract)
dst_pdf = input_dir / filename
try:
shutil.copy2(src_path, dst_pdf)
except Exception:
# if copy fails, still try reading original
dst_pdf = Path(src_path)
sha = _sha256_file(str(dst_pdf))
record = {
"schema_version": SCHEMA_VERSION,
"job_id": job_id,
"pdf_sha256": sha,
"filename": filename,
"candidate_name": None,
"project": default_project_name,
"model": config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL,
"status": None, # success|skipped|failed
"error": None,
"created_at": _now_ts(),
"output_json": None, # relative to job_dir
"extracted_text": None, # relative to job_dir
}
# Dedupe via persistent manifest
if not rewrite and sha in manifest:
record["status"] = "skipped"
record["error"] = "duplicate_pdf_sha256"
counts["skipped"] += 1
per_job_index.append(record)
continue
try:
text = extract_text_from_pdf(
str(dst_pdf),
ocr_if_empty=True,
max_pages=ocr_max_pages,
ocr_dpi=ocr_dpi,
)
if not text.strip():
raise RuntimeError("No extractable text (even after OCR).")
text_name = f"{_safe_slug(Path(filename).stem)}__{sha[:12]}.txt"
tpath = text_dir / text_name
_atomic_write_text(tpath, text)
record["extracted_text"] = str(tpath.relative_to(job_dir))
raw = llm_evaluate(text, config)
ev = normalize_eval(raw, config, job_id=job_id, pdf_sha256=sha, filename=filename)
safe_name = _safe_slug(ev.get("candidate_name") or Path(filename).stem)
out_path = eval_dir / f"{safe_name}__{sha[:12]}.json"
_atomic_write_json(out_path, ev)
record["status"] = "success"
record["candidate_name"] = ev.get("candidate_name")
record["output_json"] = str(out_path.relative_to(job_dir))
counts["success"] += 1
evaluations.append(ev)
# update global manifest
manifest[sha] = {
"pdf_sha256": sha,
"first_seen_at": manifest.get(sha, {}).get("first_seen_at", _now_ts()),
"last_seen_at": _now_ts(),
"last_job_id": job_id,
"filename": filename,
"status": "success",
}
except Exception as e:
record["status"] = "failed"
record["error"] = f"{type(e).__name__}: {e}"
counts["failed"] += 1
manifest[sha] = {
"pdf_sha256": sha,
"first_seen_at": manifest.get(sha, {}).get("first_seen_at", _now_ts()),
"last_seen_at": _now_ts(),
"last_job_id": job_id,
"filename": filename,
"status": "failed",
"error": record["error"],
}
per_job_index.append(record)
# Write per-job index
_atomic_write_json(job_dir / "resumes_index.json", per_job_index)
# Reports: project buckets + top candidates + candidate analysis
bucket_rows: List[Dict[str, Any]] = []
top_rows: List[Dict[str, Any]] = []
analysis_rows: List[Dict[str, Any]] = []
for ev in evaluations:
combined = float(ev.get("scores", {}).get("combined", 0.0))
b = _bucket_label(combined, thresholds)
project = (ev.get("best_project") or {}).get("project_name") or default_project_name
bucket_rows.append({
"job_id": job_id,
"pdf_sha256": ev.get("pdf_sha256"),
"candidate_name": ev.get("candidate_name"),
"seniority": ev.get("seniority"),
"project": project,
"bucket": b,
"combined": combined,
})
analysis_rows.append({
"job_id": job_id,
"pdf_sha256": ev.get("pdf_sha256"),
"candidate_name": ev.get("candidate_name"),
"seniority": ev.get("seniority"),
"project": project,
"skill": ev.get("scores", {}).get("skill"),
"experience": ev.get("scores", {}).get("experience"),
"growth": ev.get("scores", {}).get("growth"),
"context_fit": ev.get("scores", {}).get("context_fit"),
"combined": combined,
"tags": ",".join(ev.get("tags") or []),
})
# sort for top candidates
evaluations_sorted = sorted(
evaluations,
key=lambda x: float((x.get("scores") or {}).get("combined", 0.0)),
reverse=True,
)[:max(0, top_n)]
for ev in evaluations_sorted:
combined = float(ev.get("scores", {}).get("combined", 0.0))
project = (ev.get("best_project") or {}).get("project_name") or default_project_name
top_rows.append({
"job_id": job_id,
"pdf_sha256": ev.get("pdf_sha256"),
"candidate_name": ev.get("candidate_name"),
"seniority": ev.get("seniority"),
"project": project,
"combined": combined,
})
# Write report files
_atomic_write_json(reports_dir / "project_buckets.json", bucket_rows)
_write_csv(
reports_dir / "project_buckets.csv",
bucket_rows,
["job_id", "pdf_sha256", "candidate_name", "seniority", "project", "bucket", "combined"],
)
_atomic_write_json(reports_dir / "top_candidates.json", top_rows)
_write_csv(
reports_dir / "top_candidates.csv",
top_rows,
["job_id", "pdf_sha256", "candidate_name", "seniority", "project", "combined"],
)
_write_csv(
reports_dir / "candidate_analysis.csv",
analysis_rows,
["job_id", "pdf_sha256", "candidate_name", "seniority", "project",
"skill", "experience", "growth", "context_fit", "combined", "tags"],
)
# Job + artifacts descriptors
job_json = {
"schema_version": SCHEMA_VERSION,
"job_id": job_id,
"created_at": _now_ts(),
"model": config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL,
"counts": counts,
"paths": {
"input_dir": "input/",
"extracted_text_dir": "extracted_text/",
"evaluations_dir": "evaluations/",
"reports_dir": "reports/",
},
}
_atomic_write_json(job_dir / "job.json", job_json)
artifacts = {
"schema_version": SCHEMA_VERSION,
"job_id": job_id,
"files": {
"job_json": "job.json",
"resumes_index": "resumes_index.json",
"reports": {
"project_buckets_json": "reports/project_buckets.json",
"project_buckets_csv": "reports/project_buckets.csv",
"top_candidates_json": "reports/top_candidates.json",
"top_candidates_csv": "reports/top_candidates.csv",
"candidate_analysis_csv": "reports/candidate_analysis.csv",
},
},
}
_atomic_write_json(job_dir / "artifacts.json", artifacts)
# Persist manifest last (atomic)
_atomic_write_json(manifest_path, manifest)
# Zip the job folder ONLY
zip_path = output_root_path / f"{job_id}.zip"
_zip_dir(job_dir, zip_path)
return {
"job_id": job_id,
"job_dir": str(job_dir),
"zip_path": str(zip_path),
"counts": counts,
}