Spaces:
Sleeping
Sleeping
| import csv | |
| import json | |
| import os | |
| import re | |
| import hashlib | |
| import shutil | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import fitz # pymupdf | |
| import pytesseract | |
| from PIL import Image | |
| from openai import OpenAI | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| SCHEMA_VERSION = "1.0" | |
| DEFAULT_MODEL = "gpt-4o-mini" | |
| ALLOWED_SCORE_KEYS = ["skill", "experience", "growth", "context_fit", "combined"] | |
| def _now_ts() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def _safe_slug(s: str, max_len: int = 80) -> str: | |
| s = (s or "").strip() | |
| s = re.sub(r"\s+", "_", s) | |
| s = re.sub(r"[^A-Za-z0-9_\-]+", "", s) | |
| return s[:max_len] if s else "UNKNOWN" | |
| def _sha256_file(path: str) -> str: | |
| h = hashlib.sha256() | |
| with open(path, "rb") as f: | |
| for chunk in iter(lambda: f.read(1024 * 1024), b""): | |
| h.update(chunk) | |
| return h.hexdigest() | |
| def _atomic_write_text(path: Path, text: str) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = path.with_suffix(path.suffix + ".tmp") | |
| tmp.write_text(text, encoding="utf-8") | |
| tmp.replace(path) | |
| def _atomic_write_json(path: Path, obj: Any) -> None: | |
| _atomic_write_text(path, json.dumps(obj, ensure_ascii=False, indent=2)) | |
| def _read_json(path: Path, default: Any) -> Any: | |
| if not path.exists(): | |
| return default | |
| try: | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| except Exception: | |
| # keep a backup of corrupt state and start fresh | |
| try: | |
| shutil.copy2(path, path.with_suffix(path.suffix + ".corrupt")) | |
| except Exception: | |
| pass | |
| return default | |
| def _pixmap_to_pil_rgb(pix: "fitz.Pixmap") -> Image.Image: | |
| if pix.alpha: | |
| pix = fitz.Pixmap(pix, 0) | |
| return Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| def extract_text_from_pdf( | |
| pdf_path: str, | |
| *, | |
| ocr_if_empty: bool = True, | |
| max_pages: int = 8, | |
| ocr_dpi: int = 200, | |
| ) -> str: | |
| """ | |
| 1) Extract text with PyMuPDF. | |
| 2) If empty and ocr_if_empty: OCR first max_pages pages. | |
| """ | |
| try: | |
| doc = fitz.open(pdf_path) | |
| except Exception: | |
| return "" | |
| parts: List[str] = [] | |
| page_count = min(len(doc), max_pages) | |
| # normal extraction | |
| for i in range(page_count): | |
| try: | |
| t = doc[i].get_text("text") or "" | |
| except Exception: | |
| t = "" | |
| if t.strip(): | |
| parts.append(t) | |
| text = "\n\n".join(parts).strip() | |
| if text or not ocr_if_empty: | |
| doc.close() | |
| return text | |
| # OCR fallback | |
| ocr_parts: List[str] = [] | |
| for i in range(page_count): | |
| try: | |
| page = doc[i] | |
| pix = page.get_pixmap(dpi=ocr_dpi) | |
| img = _pixmap_to_pil_rgb(pix) | |
| ocr_txt = pytesseract.image_to_string(img) or "" | |
| if ocr_txt.strip(): | |
| ocr_parts.append(ocr_txt) | |
| except Exception: | |
| continue | |
| doc.close() | |
| return "\n\n".join(ocr_parts).strip() | |
| def build_prompt(text: str, config: Dict[str, Any]) -> str: | |
| projects = config.get("projects") or [] | |
| projects_block = json.dumps(projects, ensure_ascii=False) | |
| return f""" | |
| You are an expert technical recruiter. Evaluate the candidate resume text. | |
| Return STRICT JSON ONLY. No markdown. No commentary. | |
| Required JSON schema: | |
| {{ | |
| "candidate_name": string | null, | |
| "seniority": string | null, | |
| "scores": {{ | |
| "skill": number, | |
| "experience": number, | |
| "growth": number, | |
| "context_fit": number, | |
| "combined": number | |
| }}, | |
| "best_project": {{ | |
| "project_name": string | null, | |
| "project_score": number | |
| }}, | |
| "tags": [string, ...], | |
| "notes": string | null | |
| }} | |
| Rules: | |
| - scores are 0..10 (float allowed) | |
| - combined must be a reasonable aggregate of the others (not random) | |
| - best_project.project_name must be one of the provided projects' names OR null | |
| - tags should be short | |
| - If uncertain, be conservative. | |
| Projects (for matching): | |
| {projects_block} | |
| Resume text: | |
| {text} | |
| """.strip() | |
| def _coerce_score(v: Any) -> float: | |
| try: | |
| f = float(v) | |
| except Exception: | |
| return 0.0 | |
| if f < 0: | |
| return 0.0 | |
| if f > 10: | |
| return 10.0 | |
| return f | |
| def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any], *, job_id: str, pdf_sha256: str, filename: str) -> Dict[str, Any]: | |
| scores = raw.get("scores") if isinstance(raw.get("scores"), dict) else {} | |
| norm_scores = {k: _coerce_score(scores.get(k, 0)) for k in ALLOWED_SCORE_KEYS} | |
| best_project = raw.get("best_project") if isinstance(raw.get("best_project"), dict) else {} | |
| project_name = best_project.get("project_name") | |
| project_score = _coerce_score(best_project.get("project_score", 0)) | |
| allowed_project_names = { | |
| p.get("name") | |
| for p in (config.get("projects") or []) | |
| if isinstance(p, dict) and p.get("name") | |
| } | |
| if project_name not in allowed_project_names: | |
| project_name = None | |
| tags = raw.get("tags") | |
| if not isinstance(tags, list): | |
| tags = [] | |
| tags = [str(t).strip() for t in tags if str(t).strip()] | |
| tags = tags[:25] | |
| model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL | |
| return { | |
| "schema_version": SCHEMA_VERSION, | |
| "job_id": job_id, | |
| "pdf_sha256": pdf_sha256, | |
| "filename": filename, | |
| "candidate_name": raw.get("candidate_name"), | |
| "seniority": raw.get("seniority"), | |
| "scores": norm_scores, | |
| "best_project": {"project_name": project_name, "project_score": project_score}, | |
| "tags": tags, | |
| "notes": raw.get("notes"), | |
| "meta": {"model": model, "timestamp": _now_ts()}, | |
| } | |
| def llm_evaluate(text: str, config: Dict[str, Any]) -> Dict[str, Any]: | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| raise RuntimeError("Missing OPENAI_API_KEY (set it in HF Space Secrets).") | |
| client = OpenAI(api_key=api_key) | |
| model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL | |
| prompt = build_prompt(text, config) | |
| resp = client.responses.create(model=model, input=prompt) | |
| content = resp.output_text | |
| if not content or not content.strip(): | |
| raise RuntimeError("LLM returned empty response.") | |
| try: | |
| raw = json.loads(content) | |
| except Exception as e: | |
| raise RuntimeError(f"LLM did not return valid JSON. First 200 chars: {content[:200]!r}") from e | |
| if not isinstance(raw, dict): | |
| raise RuntimeError("LLM JSON must be an object/dict at top-level.") | |
| return raw | |
| def _bucket_label(combined: float, thresholds: Dict[str, float]) -> str: | |
| top = float(thresholds.get("top", 8.0)) | |
| strong = float(thresholds.get("strong", 6.5)) | |
| maybe = float(thresholds.get("maybe", 5.0)) | |
| if combined >= top: | |
| return "top" | |
| if combined >= strong: | |
| return "strong" | |
| if combined >= maybe: | |
| return "maybe" | |
| return "no" | |
| def _write_csv(path: Path, rows: List[Dict[str, Any]], fieldnames: List[str]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| tmp = path.with_suffix(path.suffix + ".tmp") | |
| with tmp.open("w", newline="", encoding="utf-8") as f: | |
| w = csv.DictWriter(f, fieldnames=fieldnames) | |
| w.writeheader() | |
| for r in rows: | |
| w.writerow({k: r.get(k) for k in fieldnames}) | |
| tmp.replace(path) | |
| def _zip_dir(src_dir: Path, zip_path: Path) -> None: | |
| if zip_path.exists(): | |
| zip_path.unlink() | |
| import zipfile | |
| with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z: | |
| for p in sorted(src_dir.rglob("*")): | |
| if p.is_file(): | |
| z.write(p, arcname=str(p.relative_to(src_dir))) | |
| def run_pipeline( | |
| input_files: List[str], | |
| config: Dict[str, Any], | |
| *, | |
| output_root: Optional[str] = None, | |
| job_id: str, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Stable contract: | |
| - output_root contains persistent state (manifest) | |
| - job_id creates isolated job folder under output_root/jobs/{job_id} | |
| - returns job_dir + zip_path + counts | |
| """ | |
| output_root_path = Path(output_root or "/tmp/resume_eval_root").resolve() | |
| output_root_path.mkdir(parents=True, exist_ok=True) | |
| # Persistent manifest across runs (dedupe state) | |
| manifest_path = output_root_path / "processed_manifest.json" | |
| manifest: Dict[str, Any] = _read_json(manifest_path, default={}) | |
| if not isinstance(manifest, dict): | |
| manifest = {} | |
| # Job layout | |
| jobs_root = output_root_path / "jobs" | |
| job_dir = jobs_root / job_id | |
| input_dir = job_dir / "input" | |
| text_dir = job_dir / "extracted_text" | |
| eval_dir = job_dir / "evaluations" | |
| reports_dir = job_dir / "reports" | |
| for d in [input_dir, text_dir, eval_dir, reports_dir]: | |
| d.mkdir(parents=True, exist_ok=True) | |
| rewrite = bool(config.get("rewrite", False)) | |
| projects = config.get("projects") or [{"name": "STANDARD"}] | |
| default_project_name = (projects[0] or {}).get("name", "STANDARD") | |
| ocr_max_pages = int(config.get("ocr_max_pages", 8)) | |
| ocr_dpi = int(config.get("ocr_dpi", 200)) | |
| thresholds = config.get("bucket_thresholds") or {"top": 8.0, "strong": 6.5, "maybe": 5.0} | |
| top_n = int(config.get("top_n", 25)) | |
| per_job_index: List[Dict[str, Any]] = [] | |
| evaluations: List[Dict[str, Any]] = [] | |
| counts = {"total": 0, "success": 0, "skipped": 0, "failed": 0} | |
| for src_path in input_files or []: | |
| counts["total"] += 1 | |
| src_path = str(Path(src_path).resolve()) | |
| filename = os.path.basename(src_path) | |
| # Copy into job input/ (this is important for later SFTP job contract) | |
| dst_pdf = input_dir / filename | |
| try: | |
| shutil.copy2(src_path, dst_pdf) | |
| except Exception: | |
| # if copy fails, still try reading original | |
| dst_pdf = Path(src_path) | |
| sha = _sha256_file(str(dst_pdf)) | |
| record = { | |
| "schema_version": SCHEMA_VERSION, | |
| "job_id": job_id, | |
| "pdf_sha256": sha, | |
| "filename": filename, | |
| "candidate_name": None, | |
| "project": default_project_name, | |
| "model": config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL, | |
| "status": None, # success|skipped|failed | |
| "error": None, | |
| "created_at": _now_ts(), | |
| "output_json": None, # relative to job_dir | |
| "extracted_text": None, # relative to job_dir | |
| } | |
| # Dedupe via persistent manifest | |
| if not rewrite and sha in manifest: | |
| record["status"] = "skipped" | |
| record["error"] = "duplicate_pdf_sha256" | |
| counts["skipped"] += 1 | |
| per_job_index.append(record) | |
| continue | |
| try: | |
| text = extract_text_from_pdf( | |
| str(dst_pdf), | |
| ocr_if_empty=True, | |
| max_pages=ocr_max_pages, | |
| ocr_dpi=ocr_dpi, | |
| ) | |
| if not text.strip(): | |
| raise RuntimeError("No extractable text (even after OCR).") | |
| text_name = f"{_safe_slug(Path(filename).stem)}__{sha[:12]}.txt" | |
| tpath = text_dir / text_name | |
| _atomic_write_text(tpath, text) | |
| record["extracted_text"] = str(tpath.relative_to(job_dir)) | |
| raw = llm_evaluate(text, config) | |
| ev = normalize_eval(raw, config, job_id=job_id, pdf_sha256=sha, filename=filename) | |
| safe_name = _safe_slug(ev.get("candidate_name") or Path(filename).stem) | |
| out_path = eval_dir / f"{safe_name}__{sha[:12]}.json" | |
| _atomic_write_json(out_path, ev) | |
| record["status"] = "success" | |
| record["candidate_name"] = ev.get("candidate_name") | |
| record["output_json"] = str(out_path.relative_to(job_dir)) | |
| counts["success"] += 1 | |
| evaluations.append(ev) | |
| # update global manifest | |
| manifest[sha] = { | |
| "pdf_sha256": sha, | |
| "first_seen_at": manifest.get(sha, {}).get("first_seen_at", _now_ts()), | |
| "last_seen_at": _now_ts(), | |
| "last_job_id": job_id, | |
| "filename": filename, | |
| "status": "success", | |
| } | |
| except Exception as e: | |
| record["status"] = "failed" | |
| record["error"] = f"{type(e).__name__}: {e}" | |
| counts["failed"] += 1 | |
| manifest[sha] = { | |
| "pdf_sha256": sha, | |
| "first_seen_at": manifest.get(sha, {}).get("first_seen_at", _now_ts()), | |
| "last_seen_at": _now_ts(), | |
| "last_job_id": job_id, | |
| "filename": filename, | |
| "status": "failed", | |
| "error": record["error"], | |
| } | |
| per_job_index.append(record) | |
| # Write per-job index | |
| _atomic_write_json(job_dir / "resumes_index.json", per_job_index) | |
| # Reports: project buckets + top candidates + candidate analysis | |
| bucket_rows: List[Dict[str, Any]] = [] | |
| top_rows: List[Dict[str, Any]] = [] | |
| analysis_rows: List[Dict[str, Any]] = [] | |
| for ev in evaluations: | |
| combined = float(ev.get("scores", {}).get("combined", 0.0)) | |
| b = _bucket_label(combined, thresholds) | |
| project = (ev.get("best_project") or {}).get("project_name") or default_project_name | |
| bucket_rows.append({ | |
| "job_id": job_id, | |
| "pdf_sha256": ev.get("pdf_sha256"), | |
| "candidate_name": ev.get("candidate_name"), | |
| "seniority": ev.get("seniority"), | |
| "project": project, | |
| "bucket": b, | |
| "combined": combined, | |
| }) | |
| analysis_rows.append({ | |
| "job_id": job_id, | |
| "pdf_sha256": ev.get("pdf_sha256"), | |
| "candidate_name": ev.get("candidate_name"), | |
| "seniority": ev.get("seniority"), | |
| "project": project, | |
| "skill": ev.get("scores", {}).get("skill"), | |
| "experience": ev.get("scores", {}).get("experience"), | |
| "growth": ev.get("scores", {}).get("growth"), | |
| "context_fit": ev.get("scores", {}).get("context_fit"), | |
| "combined": combined, | |
| "tags": ",".join(ev.get("tags") or []), | |
| }) | |
| # sort for top candidates | |
| evaluations_sorted = sorted( | |
| evaluations, | |
| key=lambda x: float((x.get("scores") or {}).get("combined", 0.0)), | |
| reverse=True, | |
| )[:max(0, top_n)] | |
| for ev in evaluations_sorted: | |
| combined = float(ev.get("scores", {}).get("combined", 0.0)) | |
| project = (ev.get("best_project") or {}).get("project_name") or default_project_name | |
| top_rows.append({ | |
| "job_id": job_id, | |
| "pdf_sha256": ev.get("pdf_sha256"), | |
| "candidate_name": ev.get("candidate_name"), | |
| "seniority": ev.get("seniority"), | |
| "project": project, | |
| "combined": combined, | |
| }) | |
| # Write report files | |
| _atomic_write_json(reports_dir / "project_buckets.json", bucket_rows) | |
| _write_csv( | |
| reports_dir / "project_buckets.csv", | |
| bucket_rows, | |
| ["job_id", "pdf_sha256", "candidate_name", "seniority", "project", "bucket", "combined"], | |
| ) | |
| _atomic_write_json(reports_dir / "top_candidates.json", top_rows) | |
| _write_csv( | |
| reports_dir / "top_candidates.csv", | |
| top_rows, | |
| ["job_id", "pdf_sha256", "candidate_name", "seniority", "project", "combined"], | |
| ) | |
| _write_csv( | |
| reports_dir / "candidate_analysis.csv", | |
| analysis_rows, | |
| ["job_id", "pdf_sha256", "candidate_name", "seniority", "project", | |
| "skill", "experience", "growth", "context_fit", "combined", "tags"], | |
| ) | |
| # Job + artifacts descriptors | |
| job_json = { | |
| "schema_version": SCHEMA_VERSION, | |
| "job_id": job_id, | |
| "created_at": _now_ts(), | |
| "model": config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL, | |
| "counts": counts, | |
| "paths": { | |
| "input_dir": "input/", | |
| "extracted_text_dir": "extracted_text/", | |
| "evaluations_dir": "evaluations/", | |
| "reports_dir": "reports/", | |
| }, | |
| } | |
| _atomic_write_json(job_dir / "job.json", job_json) | |
| artifacts = { | |
| "schema_version": SCHEMA_VERSION, | |
| "job_id": job_id, | |
| "files": { | |
| "job_json": "job.json", | |
| "resumes_index": "resumes_index.json", | |
| "reports": { | |
| "project_buckets_json": "reports/project_buckets.json", | |
| "project_buckets_csv": "reports/project_buckets.csv", | |
| "top_candidates_json": "reports/top_candidates.json", | |
| "top_candidates_csv": "reports/top_candidates.csv", | |
| "candidate_analysis_csv": "reports/candidate_analysis.csv", | |
| }, | |
| }, | |
| } | |
| _atomic_write_json(job_dir / "artifacts.json", artifacts) | |
| # Persist manifest last (atomic) | |
| _atomic_write_json(manifest_path, manifest) | |
| # Zip the job folder ONLY | |
| zip_path = output_root_path / f"{job_id}.zip" | |
| _zip_dir(job_dir, zip_path) | |
| return { | |
| "job_id": job_id, | |
| "job_dir": str(job_dir), | |
| "zip_path": str(zip_path), | |
| "counts": counts, | |
| } |