import csv import json import os import re import hashlib import shutil from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import fitz # pymupdf import pytesseract from PIL import Image from openai import OpenAI from tenacity import retry, stop_after_attempt, wait_exponential SCHEMA_VERSION = "1.0" DEFAULT_MODEL = "gpt-4o-mini" ALLOWED_SCORE_KEYS = ["skill", "experience", "growth", "context_fit", "combined"] def _now_ts() -> str: return datetime.now(timezone.utc).isoformat() def _safe_slug(s: str, max_len: int = 80) -> str: s = (s or "").strip() s = re.sub(r"\s+", "_", s) s = re.sub(r"[^A-Za-z0-9_\-]+", "", s) return s[:max_len] if s else "UNKNOWN" def _sha256_file(path: str) -> str: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def _atomic_write_text(path: Path, text: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(text, encoding="utf-8") tmp.replace(path) def _atomic_write_json(path: Path, obj: Any) -> None: _atomic_write_text(path, json.dumps(obj, ensure_ascii=False, indent=2)) def _read_json(path: Path, default: Any) -> Any: if not path.exists(): return default try: return json.loads(path.read_text(encoding="utf-8")) except Exception: # keep a backup of corrupt state and start fresh try: shutil.copy2(path, path.with_suffix(path.suffix + ".corrupt")) except Exception: pass return default def _pixmap_to_pil_rgb(pix: "fitz.Pixmap") -> Image.Image: if pix.alpha: pix = fitz.Pixmap(pix, 0) return Image.frombytes("RGB", [pix.width, pix.height], pix.samples) def extract_text_from_pdf( pdf_path: str, *, ocr_if_empty: bool = True, max_pages: int = 8, ocr_dpi: int = 200, ) -> str: """ 1) Extract text with PyMuPDF. 2) If empty and ocr_if_empty: OCR first max_pages pages. """ try: doc = fitz.open(pdf_path) except Exception: return "" parts: List[str] = [] page_count = min(len(doc), max_pages) # normal extraction for i in range(page_count): try: t = doc[i].get_text("text") or "" except Exception: t = "" if t.strip(): parts.append(t) text = "\n\n".join(parts).strip() if text or not ocr_if_empty: doc.close() return text # OCR fallback ocr_parts: List[str] = [] for i in range(page_count): try: page = doc[i] pix = page.get_pixmap(dpi=ocr_dpi) img = _pixmap_to_pil_rgb(pix) ocr_txt = pytesseract.image_to_string(img) or "" if ocr_txt.strip(): ocr_parts.append(ocr_txt) except Exception: continue doc.close() return "\n\n".join(ocr_parts).strip() def build_prompt(text: str, config: Dict[str, Any]) -> str: projects = config.get("projects") or [] projects_block = json.dumps(projects, ensure_ascii=False) return f""" You are an expert technical recruiter. Evaluate the candidate resume text. Return STRICT JSON ONLY. No markdown. No commentary. Required JSON schema: {{ "candidate_name": string | null, "seniority": string | null, "scores": {{ "skill": number, "experience": number, "growth": number, "context_fit": number, "combined": number }}, "best_project": {{ "project_name": string | null, "project_score": number }}, "tags": [string, ...], "notes": string | null }} Rules: - scores are 0..10 (float allowed) - combined must be a reasonable aggregate of the others (not random) - best_project.project_name must be one of the provided projects' names OR null - tags should be short - If uncertain, be conservative. Projects (for matching): {projects_block} Resume text: {text} """.strip() def _coerce_score(v: Any) -> float: try: f = float(v) except Exception: return 0.0 if f < 0: return 0.0 if f > 10: return 10.0 return f def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any], *, job_id: str, pdf_sha256: str, filename: str) -> Dict[str, Any]: scores = raw.get("scores") if isinstance(raw.get("scores"), dict) else {} norm_scores = {k: _coerce_score(scores.get(k, 0)) for k in ALLOWED_SCORE_KEYS} best_project = raw.get("best_project") if isinstance(raw.get("best_project"), dict) else {} project_name = best_project.get("project_name") project_score = _coerce_score(best_project.get("project_score", 0)) allowed_project_names = { p.get("name") for p in (config.get("projects") or []) if isinstance(p, dict) and p.get("name") } if project_name not in allowed_project_names: project_name = None tags = raw.get("tags") if not isinstance(tags, list): tags = [] tags = [str(t).strip() for t in tags if str(t).strip()] tags = tags[:25] model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL return { "schema_version": SCHEMA_VERSION, "job_id": job_id, "pdf_sha256": pdf_sha256, "filename": filename, "candidate_name": raw.get("candidate_name"), "seniority": raw.get("seniority"), "scores": norm_scores, "best_project": {"project_name": project_name, "project_score": project_score}, "tags": tags, "notes": raw.get("notes"), "meta": {"model": model, "timestamp": _now_ts()}, } @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=8)) def llm_evaluate(text: str, config: Dict[str, Any]) -> Dict[str, Any]: api_key = os.getenv("OPENAI_API_KEY") if not api_key: raise RuntimeError("Missing OPENAI_API_KEY (set it in HF Space Secrets).") client = OpenAI(api_key=api_key) model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL prompt = build_prompt(text, config) resp = client.responses.create(model=model, input=prompt) content = resp.output_text if not content or not content.strip(): raise RuntimeError("LLM returned empty response.") try: raw = json.loads(content) except Exception as e: raise RuntimeError(f"LLM did not return valid JSON. First 200 chars: {content[:200]!r}") from e if not isinstance(raw, dict): raise RuntimeError("LLM JSON must be an object/dict at top-level.") return raw def _bucket_label(combined: float, thresholds: Dict[str, float]) -> str: top = float(thresholds.get("top", 8.0)) strong = float(thresholds.get("strong", 6.5)) maybe = float(thresholds.get("maybe", 5.0)) if combined >= top: return "top" if combined >= strong: return "strong" if combined >= maybe: return "maybe" return "no" def _write_csv(path: Path, rows: List[Dict[str, Any]], fieldnames: List[str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with tmp.open("w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fieldnames) w.writeheader() for r in rows: w.writerow({k: r.get(k) for k in fieldnames}) tmp.replace(path) def _zip_dir(src_dir: Path, zip_path: Path) -> None: if zip_path.exists(): zip_path.unlink() import zipfile with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z: for p in sorted(src_dir.rglob("*")): if p.is_file(): z.write(p, arcname=str(p.relative_to(src_dir))) def run_pipeline( input_files: List[str], config: Dict[str, Any], *, output_root: Optional[str] = None, job_id: str, ) -> Dict[str, Any]: """ Stable contract: - output_root contains persistent state (manifest) - job_id creates isolated job folder under output_root/jobs/{job_id} - returns job_dir + zip_path + counts """ output_root_path = Path(output_root or "/tmp/resume_eval_root").resolve() output_root_path.mkdir(parents=True, exist_ok=True) # Persistent manifest across runs (dedupe state) manifest_path = output_root_path / "processed_manifest.json" manifest: Dict[str, Any] = _read_json(manifest_path, default={}) if not isinstance(manifest, dict): manifest = {} # Job layout jobs_root = output_root_path / "jobs" job_dir = jobs_root / job_id input_dir = job_dir / "input" text_dir = job_dir / "extracted_text" eval_dir = job_dir / "evaluations" reports_dir = job_dir / "reports" for d in [input_dir, text_dir, eval_dir, reports_dir]: d.mkdir(parents=True, exist_ok=True) rewrite = bool(config.get("rewrite", False)) projects = config.get("projects") or [{"name": "STANDARD"}] default_project_name = (projects[0] or {}).get("name", "STANDARD") ocr_max_pages = int(config.get("ocr_max_pages", 8)) ocr_dpi = int(config.get("ocr_dpi", 200)) thresholds = config.get("bucket_thresholds") or {"top": 8.0, "strong": 6.5, "maybe": 5.0} top_n = int(config.get("top_n", 25)) per_job_index: List[Dict[str, Any]] = [] evaluations: List[Dict[str, Any]] = [] counts = {"total": 0, "success": 0, "skipped": 0, "failed": 0} for src_path in input_files or []: counts["total"] += 1 src_path = str(Path(src_path).resolve()) filename = os.path.basename(src_path) # Copy into job input/ (this is important for later SFTP job contract) dst_pdf = input_dir / filename try: shutil.copy2(src_path, dst_pdf) except Exception: # if copy fails, still try reading original dst_pdf = Path(src_path) sha = _sha256_file(str(dst_pdf)) record = { "schema_version": SCHEMA_VERSION, "job_id": job_id, "pdf_sha256": sha, "filename": filename, "candidate_name": None, "project": default_project_name, "model": config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL, "status": None, # success|skipped|failed "error": None, "created_at": _now_ts(), "output_json": None, # relative to job_dir "extracted_text": None, # relative to job_dir } # Dedupe via persistent manifest if not rewrite and sha in manifest: record["status"] = "skipped" record["error"] = "duplicate_pdf_sha256" counts["skipped"] += 1 per_job_index.append(record) continue try: text = extract_text_from_pdf( str(dst_pdf), ocr_if_empty=True, max_pages=ocr_max_pages, ocr_dpi=ocr_dpi, ) if not text.strip(): raise RuntimeError("No extractable text (even after OCR).") text_name = f"{_safe_slug(Path(filename).stem)}__{sha[:12]}.txt" tpath = text_dir / text_name _atomic_write_text(tpath, text) record["extracted_text"] = str(tpath.relative_to(job_dir)) raw = llm_evaluate(text, config) ev = normalize_eval(raw, config, job_id=job_id, pdf_sha256=sha, filename=filename) safe_name = _safe_slug(ev.get("candidate_name") or Path(filename).stem) out_path = eval_dir / f"{safe_name}__{sha[:12]}.json" _atomic_write_json(out_path, ev) record["status"] = "success" record["candidate_name"] = ev.get("candidate_name") record["output_json"] = str(out_path.relative_to(job_dir)) counts["success"] += 1 evaluations.append(ev) # update global manifest manifest[sha] = { "pdf_sha256": sha, "first_seen_at": manifest.get(sha, {}).get("first_seen_at", _now_ts()), "last_seen_at": _now_ts(), "last_job_id": job_id, "filename": filename, "status": "success", } except Exception as e: record["status"] = "failed" record["error"] = f"{type(e).__name__}: {e}" counts["failed"] += 1 manifest[sha] = { "pdf_sha256": sha, "first_seen_at": manifest.get(sha, {}).get("first_seen_at", _now_ts()), "last_seen_at": _now_ts(), "last_job_id": job_id, "filename": filename, "status": "failed", "error": record["error"], } per_job_index.append(record) # Write per-job index _atomic_write_json(job_dir / "resumes_index.json", per_job_index) # Reports: project buckets + top candidates + candidate analysis bucket_rows: List[Dict[str, Any]] = [] top_rows: List[Dict[str, Any]] = [] analysis_rows: List[Dict[str, Any]] = [] for ev in evaluations: combined = float(ev.get("scores", {}).get("combined", 0.0)) b = _bucket_label(combined, thresholds) project = (ev.get("best_project") or {}).get("project_name") or default_project_name bucket_rows.append({ "job_id": job_id, "pdf_sha256": ev.get("pdf_sha256"), "candidate_name": ev.get("candidate_name"), "seniority": ev.get("seniority"), "project": project, "bucket": b, "combined": combined, }) analysis_rows.append({ "job_id": job_id, "pdf_sha256": ev.get("pdf_sha256"), "candidate_name": ev.get("candidate_name"), "seniority": ev.get("seniority"), "project": project, "skill": ev.get("scores", {}).get("skill"), "experience": ev.get("scores", {}).get("experience"), "growth": ev.get("scores", {}).get("growth"), "context_fit": ev.get("scores", {}).get("context_fit"), "combined": combined, "tags": ",".join(ev.get("tags") or []), }) # sort for top candidates evaluations_sorted = sorted( evaluations, key=lambda x: float((x.get("scores") or {}).get("combined", 0.0)), reverse=True, )[:max(0, top_n)] for ev in evaluations_sorted: combined = float(ev.get("scores", {}).get("combined", 0.0)) project = (ev.get("best_project") or {}).get("project_name") or default_project_name top_rows.append({ "job_id": job_id, "pdf_sha256": ev.get("pdf_sha256"), "candidate_name": ev.get("candidate_name"), "seniority": ev.get("seniority"), "project": project, "combined": combined, }) # Write report files _atomic_write_json(reports_dir / "project_buckets.json", bucket_rows) _write_csv( reports_dir / "project_buckets.csv", bucket_rows, ["job_id", "pdf_sha256", "candidate_name", "seniority", "project", "bucket", "combined"], ) _atomic_write_json(reports_dir / "top_candidates.json", top_rows) _write_csv( reports_dir / "top_candidates.csv", top_rows, ["job_id", "pdf_sha256", "candidate_name", "seniority", "project", "combined"], ) _write_csv( reports_dir / "candidate_analysis.csv", analysis_rows, ["job_id", "pdf_sha256", "candidate_name", "seniority", "project", "skill", "experience", "growth", "context_fit", "combined", "tags"], ) # Job + artifacts descriptors job_json = { "schema_version": SCHEMA_VERSION, "job_id": job_id, "created_at": _now_ts(), "model": config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL, "counts": counts, "paths": { "input_dir": "input/", "extracted_text_dir": "extracted_text/", "evaluations_dir": "evaluations/", "reports_dir": "reports/", }, } _atomic_write_json(job_dir / "job.json", job_json) artifacts = { "schema_version": SCHEMA_VERSION, "job_id": job_id, "files": { "job_json": "job.json", "resumes_index": "resumes_index.json", "reports": { "project_buckets_json": "reports/project_buckets.json", "project_buckets_csv": "reports/project_buckets.csv", "top_candidates_json": "reports/top_candidates.json", "top_candidates_csv": "reports/top_candidates.csv", "candidate_analysis_csv": "reports/candidate_analysis.csv", }, }, } _atomic_write_json(job_dir / "artifacts.json", artifacts) # Persist manifest last (atomic) _atomic_write_json(manifest_path, manifest) # Zip the job folder ONLY zip_path = output_root_path / f"{job_id}.zip" _zip_dir(job_dir, zip_path) return { "job_id": job_id, "job_dir": str(job_dir), "zip_path": str(zip_path), "counts": counts, }