Avinashnalla7 commited on
Commit
45de952
·
verified ·
1 Parent(s): 101c278

Update pipeline.py

Browse files
Files changed (1) hide show
  1. pipeline.py +248 -309
pipeline.py CHANGED
@@ -4,9 +4,10 @@ import os
4
  import re
5
  import hashlib
6
  import shutil
 
7
  from datetime import datetime, timezone
8
  from pathlib import Path
9
- from typing import Any, Dict, List, Optional
10
 
11
  import fitz # pymupdf
12
  import pytesseract
@@ -15,34 +16,12 @@ from openai import OpenAI
15
  from tenacity import retry, stop_after_attempt, wait_exponential
16
 
17
 
18
- # =========================
19
- # Constants / Contract
20
- # =========================
21
  SCHEMA_VERSION = "1.0"
22
  DEFAULT_MODEL = "gpt-4o-mini"
23
 
24
  ALLOWED_SCORE_KEYS = ["skill", "experience", "growth", "context_fit", "combined"]
25
 
26
- # New job folder layout (stable for future FastAPI/worker/SFTP)
27
- JOBS_DIRNAME = "jobs"
28
 
29
- INPUT_DIRNAME = "input"
30
- TEXT_DIRNAME = "extracted_text"
31
- EVAL_DIRNAME = "evaluations"
32
- REPORTS_DIRNAME = "reports"
33
-
34
- JOB_JSON_NAME = "job.json"
35
- JOB_INDEX_NAME = "resumes_index.json"
36
- ARTIFACTS_JSON_NAME = "artifacts.json"
37
-
38
- # Global persistent state (idempotency across runs)
39
- GLOBAL_REPORTS_DIRNAME = "reports"
40
- GLOBAL_MANIFEST_NAME = "processed_manifest.json"
41
-
42
-
43
- # =========================
44
- # Helpers
45
- # =========================
46
  def _now_ts() -> str:
47
  return datetime.now(timezone.utc).isoformat()
48
 
@@ -62,48 +41,37 @@ def _sha256_file(path: str) -> str:
62
  return h.hexdigest()
63
 
64
 
65
- def _atomic_write_json(path: Path, obj: Any) -> None:
66
  path.parent.mkdir(parents=True, exist_ok=True)
67
  tmp = path.with_suffix(path.suffix + ".tmp")
68
- tmp.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8")
69
  tmp.replace(path)
70
 
71
 
 
 
 
 
72
  def _read_json(path: Path, default: Any) -> Any:
73
  if not path.exists():
74
  return default
75
  try:
76
  return json.loads(path.read_text(encoding="utf-8"))
77
  except Exception:
78
- backup = path.with_suffix(path.suffix + ".corrupt.json")
79
  try:
80
- shutil.copy2(path, backup)
81
  except Exception:
82
  pass
83
  return default
84
 
85
 
86
- def _coerce_score(v: Any) -> float:
87
- try:
88
- f = float(v)
89
- except Exception:
90
- return 0.0
91
- if f < 0:
92
- return 0.0
93
- if f > 10:
94
- return 10.0
95
- return f
96
-
97
-
98
  def _pixmap_to_pil_rgb(pix: "fitz.Pixmap") -> Image.Image:
99
  if pix.alpha:
100
  pix = fitz.Pixmap(pix, 0)
101
  return Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
102
 
103
 
104
- # =========================
105
- # Text extraction (PyMuPDF + OCR fallback)
106
- # =========================
107
  def extract_text_from_pdf(
108
  pdf_path: str,
109
  *,
@@ -123,7 +91,7 @@ def extract_text_from_pdf(
123
  parts: List[str] = []
124
  page_count = min(len(doc), max_pages)
125
 
126
- # Normal extraction
127
  for i in range(page_count):
128
  try:
129
  t = doc[i].get_text("text") or ""
@@ -154,9 +122,6 @@ def extract_text_from_pdf(
154
  return "\n\n".join(ocr_parts).strip()
155
 
156
 
157
- # =========================
158
- # LLM evaluation
159
- # =========================
160
  def build_prompt(text: str, config: Dict[str, Any]) -> str:
161
  projects = config.get("projects") or []
162
  projects_block = json.dumps(projects, ensure_ascii=False)
@@ -200,7 +165,19 @@ Resume text:
200
  """.strip()
201
 
202
 
203
- def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
204
  scores = raw.get("scores") if isinstance(raw.get("scores"), dict) else {}
205
  norm_scores = {k: _coerce_score(scores.get(k, 0)) for k in ALLOWED_SCORE_KEYS}
206
 
@@ -226,6 +203,9 @@ def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any
226
 
227
  return {
228
  "schema_version": SCHEMA_VERSION,
 
 
 
229
  "candidate_name": raw.get("candidate_name"),
230
  "seniority": raw.get("seniority"),
231
  "scores": norm_scores,
@@ -254,9 +234,7 @@ def llm_evaluate(text: str, config: Dict[str, Any]) -> Dict[str, Any]:
254
  try:
255
  raw = json.loads(content)
256
  except Exception as e:
257
- raise RuntimeError(
258
- f"LLM did not return valid JSON. First 200 chars: {content[:200]!r}"
259
- ) from e
260
 
261
  if not isinstance(raw, dict):
262
  raise RuntimeError("LLM JSON must be an object/dict at top-level.")
@@ -264,190 +242,130 @@ def llm_evaluate(text: str, config: Dict[str, Any]) -> Dict[str, Any]:
264
  return raw
265
 
266
 
267
- # =========================
268
- # Records / Derived Reports
269
- # =========================
270
- def _make_record_base(pdf_path: str, config: Dict[str, Any], project_name: str, sha: str) -> Dict[str, Any]:
271
- filename = os.path.basename(pdf_path)
272
- model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL
273
- return {
274
- "schema_version": SCHEMA_VERSION,
275
- "pdf_sha256": sha,
276
- "filename": filename,
277
- "candidate_name": None,
278
- "project": project_name,
279
- "model": model,
280
- "status": None, # success|skipped|failed
281
- "error": None,
282
- "created_at": _now_ts(),
283
- "output_json": None, # relative under job_dir
284
- "extracted_text": None, # relative under job_dir
285
- }
286
-
287
-
288
- def _bucket_for_score(score: float) -> str:
289
- # Adjust thresholds as needed; keep deterministic.
290
- if score >= 8.0:
291
  return "top"
292
- if score >= 6.5:
293
  return "strong"
294
- if score >= 5.0:
295
  return "maybe"
296
  return "no"
297
 
298
 
299
- def _compute_reports(job_index: List[Dict[str, Any]]) -> Dict[str, Any]:
300
- """
301
- Derive:
302
- - project_buckets.{json,csv}
303
- - top_candidates.{json,csv}
304
- - candidate_analysis.csv
305
- from successful evaluations only.
306
- """
307
- rows = [r for r in job_index if r.get("status") == "success"]
308
-
309
- # project buckets (by "combined")
310
- buckets: Dict[str, Dict[str, List[Dict[str, Any]]]] = {}
311
- for r in rows:
312
- project = r.get("project") or "UNKNOWN"
313
- combined = r.get("combined_score", 0.0)
314
- b = _bucket_for_score(float(combined or 0.0))
315
- buckets.setdefault(project, {}).setdefault(b, []).append(r)
316
-
317
- # top candidates: sort by combined desc
318
- ranked = sorted(rows, key=lambda x: float(x.get("combined_score", 0.0)), reverse=True)
319
- top = ranked[:25]
320
-
321
- return {
322
- "project_buckets": buckets,
323
- "top_candidates": top,
324
- "ranked": ranked,
325
- }
326
-
327
-
328
- def _write_csv(path: Path, fieldnames: List[str], rows: List[Dict[str, Any]]) -> None:
329
  path.parent.mkdir(parents=True, exist_ok=True)
330
- with open(path, "w", newline="", encoding="utf-8") as f:
331
- w = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
 
332
  w.writeheader()
333
  for r in rows:
334
- w.writerow(r)
 
 
 
 
 
 
 
 
 
 
 
335
 
336
 
337
- # =========================
338
- # Main Pipeline (NEW CONTRACT)
339
- # =========================
340
  def run_pipeline(
341
  input_files: List[str],
342
  config: Dict[str, Any],
343
- output_dir: Optional[str] = None,
344
- job_id: Optional[str] = None,
 
345
  ) -> Dict[str, Any]:
346
  """
347
- New stable contract:
348
- - output_dir: persistent root (do not delete; holds global manifest)
349
- - job_id: required by UI/API; results written under output_dir/jobs/{job_id}/...
350
-
351
- Returns:
352
- {
353
- "job_id": str,
354
- "job_dir": str,
355
- "zip_path": Optional[str],
356
- "counts": {"total": int, "success": int, "skipped": int, "failed": int},
357
- "artifacts": {...}
358
- }
359
  """
360
- if not job_id:
361
- raise ValueError("job_id is required (must be provided by UI/API).")
362
-
363
- output_root = Path(output_dir or "/tmp/resume_eval_out").resolve()
364
- output_root.mkdir(parents=True, exist_ok=True)
365
 
366
- # Global manifest lives outside jobs so it persists across runs
367
- global_reports = output_root / GLOBAL_REPORTS_DIRNAME
368
- global_reports.mkdir(parents=True, exist_ok=True)
369
- manifest_path = global_reports / GLOBAL_MANIFEST_NAME
370
- manifest = _read_json(manifest_path, default={"schema_version": SCHEMA_VERSION, "by_sha": {}})
371
  if not isinstance(manifest, dict):
372
- manifest = {"schema_version": SCHEMA_VERSION, "by_sha": {}}
373
- manifest.setdefault("by_sha", {})
374
-
375
- # Job directory
376
- job_dir = (output_root / JOBS_DIRNAME / job_id).resolve()
377
- if job_dir.exists():
378
- # If job_id collides, fail fast. Don't silently overwrite.
379
- raise RuntimeError(f"job_dir already exists for job_id={job_id}: {job_dir}")
380
- job_dir.mkdir(parents=True, exist_ok=False)
381
-
382
- # Job subfolders
383
- input_dir = job_dir / INPUT_DIRNAME
384
- text_dir = job_dir / TEXT_DIRNAME
385
- eval_dir = job_dir / EVAL_DIRNAME
386
- reports_dir = job_dir / REPORTS_DIRNAME
387
-
388
- input_dir.mkdir(parents=True, exist_ok=True)
389
- text_dir.mkdir(parents=True, exist_ok=True)
390
- eval_dir.mkdir(parents=True, exist_ok=True)
391
- reports_dir.mkdir(parents=True, exist_ok=True)
392
-
393
- # Config knobs
394
  rewrite = bool(config.get("rewrite", False))
395
  projects = config.get("projects") or [{"name": "STANDARD"}]
396
- project_name = (projects[0] or {}).get("name", "STANDARD")
397
 
398
  ocr_max_pages = int(config.get("ocr_max_pages", 8))
399
  ocr_dpi = int(config.get("ocr_dpi", 200))
400
 
401
- job_index: List[Dict[str, Any]] = []
 
402
 
403
- counts = {"total": 0, "success": 0, "skipped": 0, "failed": 0}
 
404
 
405
- # Job metadata begins
406
- job_json = {
407
- "schema_version": SCHEMA_VERSION,
408
- "job_id": job_id,
409
- "created_at": _now_ts(),
410
- "status": "running",
411
- "config": {
412
- "model": config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL,
413
- "rewrite": rewrite,
414
- "projects": projects,
415
- "ocr_max_pages": ocr_max_pages,
416
- "ocr_dpi": ocr_dpi,
417
- },
418
- "counts": dict(counts),
419
- }
420
- _atomic_write_json(job_dir / JOB_JSON_NAME, job_json)
421
 
422
- # Process each pdf
423
- for pdf_path in input_files or []:
424
- pdf_path = str(Path(pdf_path).resolve())
425
  counts["total"] += 1
426
 
427
- sha = _sha256_file(pdf_path)
428
- rec = _make_record_base(pdf_path, config, project_name, sha)
429
 
430
- # Optional: copy original into job input/
 
431
  try:
432
- dst_pdf = input_dir / f"{_safe_slug(Path(pdf_path).stem)}__{sha[:12]}.pdf"
433
- shutil.copy2(pdf_path, dst_pdf)
434
- rec["input_pdf"] = str(dst_pdf.relative_to(job_dir))
435
  except Exception:
436
- rec["input_pdf"] = None
437
-
438
- # Dedupe via global manifest
439
- if (sha in manifest["by_sha"]) and (not rewrite):
440
- rec["status"] = "skipped"
441
- rec["error"] = "duplicate_pdf_sha256"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
442
  counts["skipped"] += 1
443
-
444
- job_index.append(rec)
445
- _atomic_write_json(job_dir / JOB_INDEX_NAME, job_index)
446
  continue
447
 
448
  try:
449
  text = extract_text_from_pdf(
450
- pdf_path,
451
  ocr_if_empty=True,
452
  max_pages=ocr_max_pages,
453
  ocr_dpi=ocr_dpi,
@@ -455,152 +373,173 @@ def run_pipeline(
455
  if not text.strip():
456
  raise RuntimeError("No extractable text (even after OCR).")
457
 
458
- # Write extracted text
459
- text_name = f"{_safe_slug(Path(pdf_path).stem)}__{sha[:12]}.txt"
460
  tpath = text_dir / text_name
461
- tpath.write_text(text, encoding="utf-8")
462
- rec["extracted_text"] = str(tpath.relative_to(job_dir))
463
 
464
- # LLM eval
465
  raw = llm_evaluate(text, config)
466
- ev = normalize_eval(raw, config)
467
 
468
- ev["filename"] = os.path.basename(pdf_path)
469
- ev["pdf_sha256"] = sha
470
- ev["job_id"] = job_id
471
- ev["project"] = project_name
472
-
473
- safe_name = _safe_slug(ev.get("candidate_name") or Path(pdf_path).stem)
474
  out_path = eval_dir / f"{safe_name}__{sha[:12]}.json"
475
- out_path.write_text(json.dumps(ev, ensure_ascii=False, indent=2), encoding="utf-8")
476
-
477
- rec["status"] = "success"
478
- rec["candidate_name"] = ev.get("candidate_name")
479
- rec["output_json"] = str(out_path.relative_to(job_dir))
480
 
481
- # Pull combined score into index row for report sorting
482
- combined = (ev.get("scores") or {}).get("combined", 0.0)
483
- rec["combined_score"] = float(_coerce_score(combined))
484
 
485
  counts["success"] += 1
 
486
 
487
- # Update global manifest only on success
488
- manifest["by_sha"][sha] = {
489
  "pdf_sha256": sha,
490
- "first_seen_at": manifest["by_sha"].get(sha, {}).get("first_seen_at") or _now_ts(),
491
- "last_processed_at": _now_ts(),
492
  "last_job_id": job_id,
 
493
  "status": "success",
494
  }
495
- _atomic_write_json(manifest_path, manifest)
496
 
497
  except Exception as e:
498
- rec["status"] = "failed"
499
- rec["error"] = f"{type(e).__name__}: {e}"
500
  counts["failed"] += 1
501
 
502
- job_index.append(rec)
503
- _atomic_write_json(job_dir / JOB_INDEX_NAME, job_index)
504
-
505
- # update job.json counts continuously
506
- job_json["counts"] = dict(counts)
507
- _atomic_write_json(job_dir / JOB_JSON_NAME, job_json)
508
-
509
- # Derived reports
510
- reports = _compute_reports(job_index)
511
-
512
- # project_buckets.json
513
- project_buckets_json = reports_dir / "project_buckets.json"
514
- _atomic_write_json(project_buckets_json, reports["project_buckets"])
515
-
516
- # top_candidates.json
517
- top_candidates_json = reports_dir / "top_candidates.json"
518
- _atomic_write_json(top_candidates_json, reports["top_candidates"])
519
-
520
- # candidate_analysis.csv (flat)
521
- candidate_analysis_csv = reports_dir / "candidate_analysis.csv"
522
- flat_rows: List[Dict[str, Any]] = []
523
- for r in reports["ranked"]:
524
- flat_rows.append({
525
- "pdf_sha256": r.get("pdf_sha256"),
526
- "filename": r.get("filename"),
527
- "candidate_name": r.get("candidate_name"),
528
- "project": r.get("project"),
529
- "status": r.get("status"),
530
- "combined_score": r.get("combined_score"),
531
- "output_json": r.get("output_json"),
532
- "extracted_text": r.get("extracted_text"),
533
- "created_at": r.get("created_at"),
 
534
  })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
535
  _write_csv(
536
- candidate_analysis_csv,
537
- fieldnames=list(flat_rows[0].keys()) if flat_rows else [
538
- "pdf_sha256","filename","candidate_name","project","status","combined_score",
539
- "output_json","extracted_text","created_at"
540
- ],
541
- rows=flat_rows,
542
  )
543
 
544
- # project_buckets.csv (summary counts)
545
- project_buckets_csv = reports_dir / "project_buckets.csv"
546
- bucket_rows: List[Dict[str, Any]] = []
547
- for proj, bmap in (reports["project_buckets"] or {}).items():
548
- for bucket_name, items in (bmap or {}).items():
549
- bucket_rows.append({
550
- "project": proj,
551
- "bucket": bucket_name,
552
- "count": len(items),
553
- })
554
- _write_csv(project_buckets_csv, fieldnames=["project","bucket","count"], rows=bucket_rows)
555
-
556
- # top_candidates.csv
557
- top_candidates_csv = reports_dir / "top_candidates.csv"
558
- tc_rows: List[Dict[str, Any]] = []
559
- for r in reports["top_candidates"]:
560
- tc_rows.append({
561
- "candidate_name": r.get("candidate_name"),
562
- "filename": r.get("filename"),
563
- "project": r.get("project"),
564
- "combined_score": r.get("combined_score"),
565
- "output_json": r.get("output_json"),
566
- })
567
  _write_csv(
568
- top_candidates_csv,
569
- fieldnames=["candidate_name","filename","project","combined_score","output_json"],
570
- rows=tc_rows,
571
  )
572
 
573
- # artifacts.json
574
- artifacts = {
 
 
 
 
 
 
 
575
  "schema_version": SCHEMA_VERSION,
576
  "job_id": job_id,
577
  "created_at": _now_ts(),
 
 
578
  "paths": {
579
- "job_json": JOB_JSON_NAME,
580
- "resumes_index": JOB_INDEX_NAME,
581
- "evaluations_dir": EVAL_DIRNAME,
582
- "extracted_text_dir": TEXT_DIRNAME,
583
- "reports_dir": REPORTS_DIRNAME,
584
- "project_buckets_json": str(project_buckets_json.relative_to(job_dir)),
585
- "project_buckets_csv": str(project_buckets_csv.relative_to(job_dir)),
586
- "top_candidates_json": str(top_candidates_json.relative_to(job_dir)),
587
- "top_candidates_csv": str(top_candidates_csv.relative_to(job_dir)),
588
- "candidate_analysis_csv": str(candidate_analysis_csv.relative_to(job_dir)),
 
 
 
 
 
 
 
 
 
 
 
589
  },
590
- "counts": dict(counts),
591
  }
592
- _atomic_write_json(job_dir / ARTIFACTS_JSON_NAME, artifacts)
 
 
 
593
 
594
- # finalize job.json
595
- job_json["status"] = "done"
596
- job_json["finished_at"] = _now_ts()
597
- job_json["counts"] = dict(counts)
598
- _atomic_write_json(job_dir / JOB_JSON_NAME, job_json)
599
 
600
  return {
601
  "job_id": job_id,
602
  "job_dir": str(job_dir),
603
- "zip_path": None, # UI zips job_dir; worker/API might zip here later
604
- "counts": dict(counts),
605
- "artifacts": artifacts,
606
  }
 
4
  import re
5
  import hashlib
6
  import shutil
7
+ from dataclasses import dataclass
8
  from datetime import datetime, timezone
9
  from pathlib import Path
10
+ from typing import Any, Dict, List, Optional, Tuple
11
 
12
  import fitz # pymupdf
13
  import pytesseract
 
16
  from tenacity import retry, stop_after_attempt, wait_exponential
17
 
18
 
 
 
 
19
  SCHEMA_VERSION = "1.0"
20
  DEFAULT_MODEL = "gpt-4o-mini"
21
 
22
  ALLOWED_SCORE_KEYS = ["skill", "experience", "growth", "context_fit", "combined"]
23
 
 
 
24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  def _now_ts() -> str:
26
  return datetime.now(timezone.utc).isoformat()
27
 
 
41
  return h.hexdigest()
42
 
43
 
44
+ def _atomic_write_text(path: Path, text: str) -> None:
45
  path.parent.mkdir(parents=True, exist_ok=True)
46
  tmp = path.with_suffix(path.suffix + ".tmp")
47
+ tmp.write_text(text, encoding="utf-8")
48
  tmp.replace(path)
49
 
50
 
51
+ def _atomic_write_json(path: Path, obj: Any) -> None:
52
+ _atomic_write_text(path, json.dumps(obj, ensure_ascii=False, indent=2))
53
+
54
+
55
  def _read_json(path: Path, default: Any) -> Any:
56
  if not path.exists():
57
  return default
58
  try:
59
  return json.loads(path.read_text(encoding="utf-8"))
60
  except Exception:
61
+ # keep a backup of corrupt state and start fresh
62
  try:
63
+ shutil.copy2(path, path.with_suffix(path.suffix + ".corrupt"))
64
  except Exception:
65
  pass
66
  return default
67
 
68
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  def _pixmap_to_pil_rgb(pix: "fitz.Pixmap") -> Image.Image:
70
  if pix.alpha:
71
  pix = fitz.Pixmap(pix, 0)
72
  return Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
73
 
74
 
 
 
 
75
  def extract_text_from_pdf(
76
  pdf_path: str,
77
  *,
 
91
  parts: List[str] = []
92
  page_count = min(len(doc), max_pages)
93
 
94
+ # normal extraction
95
  for i in range(page_count):
96
  try:
97
  t = doc[i].get_text("text") or ""
 
122
  return "\n\n".join(ocr_parts).strip()
123
 
124
 
 
 
 
125
  def build_prompt(text: str, config: Dict[str, Any]) -> str:
126
  projects = config.get("projects") or []
127
  projects_block = json.dumps(projects, ensure_ascii=False)
 
165
  """.strip()
166
 
167
 
168
+ def _coerce_score(v: Any) -> float:
169
+ try:
170
+ f = float(v)
171
+ except Exception:
172
+ return 0.0
173
+ if f < 0:
174
+ return 0.0
175
+ if f > 10:
176
+ return 10.0
177
+ return f
178
+
179
+
180
+ def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any], *, job_id: str, pdf_sha256: str, filename: str) -> Dict[str, Any]:
181
  scores = raw.get("scores") if isinstance(raw.get("scores"), dict) else {}
182
  norm_scores = {k: _coerce_score(scores.get(k, 0)) for k in ALLOWED_SCORE_KEYS}
183
 
 
203
 
204
  return {
205
  "schema_version": SCHEMA_VERSION,
206
+ "job_id": job_id,
207
+ "pdf_sha256": pdf_sha256,
208
+ "filename": filename,
209
  "candidate_name": raw.get("candidate_name"),
210
  "seniority": raw.get("seniority"),
211
  "scores": norm_scores,
 
234
  try:
235
  raw = json.loads(content)
236
  except Exception as e:
237
+ raise RuntimeError(f"LLM did not return valid JSON. First 200 chars: {content[:200]!r}") from e
 
 
238
 
239
  if not isinstance(raw, dict):
240
  raise RuntimeError("LLM JSON must be an object/dict at top-level.")
 
242
  return raw
243
 
244
 
245
+ def _bucket_label(combined: float, thresholds: Dict[str, float]) -> str:
246
+ top = float(thresholds.get("top", 8.0))
247
+ strong = float(thresholds.get("strong", 6.5))
248
+ maybe = float(thresholds.get("maybe", 5.0))
249
+ if combined >= top:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
250
  return "top"
251
+ if combined >= strong:
252
  return "strong"
253
+ if combined >= maybe:
254
  return "maybe"
255
  return "no"
256
 
257
 
258
+ def _write_csv(path: Path, rows: List[Dict[str, Any]], fieldnames: List[str]) -> None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
259
  path.parent.mkdir(parents=True, exist_ok=True)
260
+ tmp = path.with_suffix(path.suffix + ".tmp")
261
+ with tmp.open("w", newline="", encoding="utf-8") as f:
262
+ w = csv.DictWriter(f, fieldnames=fieldnames)
263
  w.writeheader()
264
  for r in rows:
265
+ w.writerow({k: r.get(k) for k in fieldnames})
266
+ tmp.replace(path)
267
+
268
+
269
+ def _zip_dir(src_dir: Path, zip_path: Path) -> None:
270
+ if zip_path.exists():
271
+ zip_path.unlink()
272
+ import zipfile
273
+ with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
274
+ for p in sorted(src_dir.rglob("*")):
275
+ if p.is_file():
276
+ z.write(p, arcname=str(p.relative_to(src_dir)))
277
 
278
 
 
 
 
279
  def run_pipeline(
280
  input_files: List[str],
281
  config: Dict[str, Any],
282
+ *,
283
+ output_root: Optional[str] = None,
284
+ job_id: str,
285
  ) -> Dict[str, Any]:
286
  """
287
+ Stable contract:
288
+ - output_root contains persistent state (manifest)
289
+ - job_id creates isolated job folder under output_root/jobs/{job_id}
290
+ - returns job_dir + zip_path + counts
 
 
 
 
 
 
 
 
291
  """
292
+ output_root_path = Path(output_root or "/tmp/resume_eval_root").resolve()
293
+ output_root_path.mkdir(parents=True, exist_ok=True)
 
 
 
294
 
295
+ # Persistent manifest across runs (dedupe state)
296
+ manifest_path = output_root_path / "processed_manifest.json"
297
+ manifest: Dict[str, Any] = _read_json(manifest_path, default={})
 
 
298
  if not isinstance(manifest, dict):
299
+ manifest = {}
300
+
301
+ # Job layout
302
+ jobs_root = output_root_path / "jobs"
303
+ job_dir = jobs_root / job_id
304
+ input_dir = job_dir / "input"
305
+ text_dir = job_dir / "extracted_text"
306
+ eval_dir = job_dir / "evaluations"
307
+ reports_dir = job_dir / "reports"
308
+
309
+ for d in [input_dir, text_dir, eval_dir, reports_dir]:
310
+ d.mkdir(parents=True, exist_ok=True)
311
+
 
 
 
 
 
 
 
 
 
312
  rewrite = bool(config.get("rewrite", False))
313
  projects = config.get("projects") or [{"name": "STANDARD"}]
314
+ default_project_name = (projects[0] or {}).get("name", "STANDARD")
315
 
316
  ocr_max_pages = int(config.get("ocr_max_pages", 8))
317
  ocr_dpi = int(config.get("ocr_dpi", 200))
318
 
319
+ thresholds = config.get("bucket_thresholds") or {"top": 8.0, "strong": 6.5, "maybe": 5.0}
320
+ top_n = int(config.get("top_n", 25))
321
 
322
+ per_job_index: List[Dict[str, Any]] = []
323
+ evaluations: List[Dict[str, Any]] = []
324
 
325
+ counts = {"total": 0, "success": 0, "skipped": 0, "failed": 0}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
326
 
327
+ for src_path in input_files or []:
 
 
328
  counts["total"] += 1
329
 
330
+ src_path = str(Path(src_path).resolve())
331
+ filename = os.path.basename(src_path)
332
 
333
+ # Copy into job input/ (this is important for later SFTP job contract)
334
+ dst_pdf = input_dir / filename
335
  try:
336
+ shutil.copy2(src_path, dst_pdf)
 
 
337
  except Exception:
338
+ # if copy fails, still try reading original
339
+ dst_pdf = Path(src_path)
340
+
341
+ sha = _sha256_file(str(dst_pdf))
342
+
343
+ record = {
344
+ "schema_version": SCHEMA_VERSION,
345
+ "job_id": job_id,
346
+ "pdf_sha256": sha,
347
+ "filename": filename,
348
+ "candidate_name": None,
349
+ "project": default_project_name,
350
+ "model": config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL,
351
+ "status": None, # success|skipped|failed
352
+ "error": None,
353
+ "created_at": _now_ts(),
354
+ "output_json": None, # relative to job_dir
355
+ "extracted_text": None, # relative to job_dir
356
+ }
357
+
358
+ # Dedupe via persistent manifest
359
+ if not rewrite and sha in manifest:
360
+ record["status"] = "skipped"
361
+ record["error"] = "duplicate_pdf_sha256"
362
  counts["skipped"] += 1
363
+ per_job_index.append(record)
 
 
364
  continue
365
 
366
  try:
367
  text = extract_text_from_pdf(
368
+ str(dst_pdf),
369
  ocr_if_empty=True,
370
  max_pages=ocr_max_pages,
371
  ocr_dpi=ocr_dpi,
 
373
  if not text.strip():
374
  raise RuntimeError("No extractable text (even after OCR).")
375
 
376
+ text_name = f"{_safe_slug(Path(filename).stem)}__{sha[:12]}.txt"
 
377
  tpath = text_dir / text_name
378
+ _atomic_write_text(tpath, text)
379
+ record["extracted_text"] = str(tpath.relative_to(job_dir))
380
 
 
381
  raw = llm_evaluate(text, config)
382
+ ev = normalize_eval(raw, config, job_id=job_id, pdf_sha256=sha, filename=filename)
383
 
384
+ safe_name = _safe_slug(ev.get("candidate_name") or Path(filename).stem)
 
 
 
 
 
385
  out_path = eval_dir / f"{safe_name}__{sha[:12]}.json"
386
+ _atomic_write_json(out_path, ev)
 
 
 
 
387
 
388
+ record["status"] = "success"
389
+ record["candidate_name"] = ev.get("candidate_name")
390
+ record["output_json"] = str(out_path.relative_to(job_dir))
391
 
392
  counts["success"] += 1
393
+ evaluations.append(ev)
394
 
395
+ # update global manifest
396
+ manifest[sha] = {
397
  "pdf_sha256": sha,
398
+ "first_seen_at": manifest.get(sha, {}).get("first_seen_at", _now_ts()),
399
+ "last_seen_at": _now_ts(),
400
  "last_job_id": job_id,
401
+ "filename": filename,
402
  "status": "success",
403
  }
 
404
 
405
  except Exception as e:
406
+ record["status"] = "failed"
407
+ record["error"] = f"{type(e).__name__}: {e}"
408
  counts["failed"] += 1
409
 
410
+ manifest[sha] = {
411
+ "pdf_sha256": sha,
412
+ "first_seen_at": manifest.get(sha, {}).get("first_seen_at", _now_ts()),
413
+ "last_seen_at": _now_ts(),
414
+ "last_job_id": job_id,
415
+ "filename": filename,
416
+ "status": "failed",
417
+ "error": record["error"],
418
+ }
419
+
420
+ per_job_index.append(record)
421
+
422
+ # Write per-job index
423
+ _atomic_write_json(job_dir / "resumes_index.json", per_job_index)
424
+
425
+ # Reports: project buckets + top candidates + candidate analysis
426
+ bucket_rows: List[Dict[str, Any]] = []
427
+ top_rows: List[Dict[str, Any]] = []
428
+ analysis_rows: List[Dict[str, Any]] = []
429
+
430
+ for ev in evaluations:
431
+ combined = float(ev.get("scores", {}).get("combined", 0.0))
432
+ b = _bucket_label(combined, thresholds)
433
+ project = (ev.get("best_project") or {}).get("project_name") or default_project_name
434
+
435
+ bucket_rows.append({
436
+ "job_id": job_id,
437
+ "pdf_sha256": ev.get("pdf_sha256"),
438
+ "candidate_name": ev.get("candidate_name"),
439
+ "seniority": ev.get("seniority"),
440
+ "project": project,
441
+ "bucket": b,
442
+ "combined": combined,
443
  })
444
+
445
+ analysis_rows.append({
446
+ "job_id": job_id,
447
+ "pdf_sha256": ev.get("pdf_sha256"),
448
+ "candidate_name": ev.get("candidate_name"),
449
+ "seniority": ev.get("seniority"),
450
+ "project": project,
451
+ "skill": ev.get("scores", {}).get("skill"),
452
+ "experience": ev.get("scores", {}).get("experience"),
453
+ "growth": ev.get("scores", {}).get("growth"),
454
+ "context_fit": ev.get("scores", {}).get("context_fit"),
455
+ "combined": combined,
456
+ "tags": ",".join(ev.get("tags") or []),
457
+ })
458
+
459
+ # sort for top candidates
460
+ evaluations_sorted = sorted(
461
+ evaluations,
462
+ key=lambda x: float((x.get("scores") or {}).get("combined", 0.0)),
463
+ reverse=True,
464
+ )[:max(0, top_n)]
465
+
466
+ for ev in evaluations_sorted:
467
+ combined = float(ev.get("scores", {}).get("combined", 0.0))
468
+ project = (ev.get("best_project") or {}).get("project_name") or default_project_name
469
+ top_rows.append({
470
+ "job_id": job_id,
471
+ "pdf_sha256": ev.get("pdf_sha256"),
472
+ "candidate_name": ev.get("candidate_name"),
473
+ "seniority": ev.get("seniority"),
474
+ "project": project,
475
+ "combined": combined,
476
+ })
477
+
478
+ # Write report files
479
+ _atomic_write_json(reports_dir / "project_buckets.json", bucket_rows)
480
  _write_csv(
481
+ reports_dir / "project_buckets.csv",
482
+ bucket_rows,
483
+ ["job_id", "pdf_sha256", "candidate_name", "seniority", "project", "bucket", "combined"],
 
 
 
484
  )
485
 
486
+ _atomic_write_json(reports_dir / "top_candidates.json", top_rows)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
487
  _write_csv(
488
+ reports_dir / "top_candidates.csv",
489
+ top_rows,
490
+ ["job_id", "pdf_sha256", "candidate_name", "seniority", "project", "combined"],
491
  )
492
 
493
+ _write_csv(
494
+ reports_dir / "candidate_analysis.csv",
495
+ analysis_rows,
496
+ ["job_id", "pdf_sha256", "candidate_name", "seniority", "project",
497
+ "skill", "experience", "growth", "context_fit", "combined", "tags"],
498
+ )
499
+
500
+ # Job + artifacts descriptors
501
+ job_json = {
502
  "schema_version": SCHEMA_VERSION,
503
  "job_id": job_id,
504
  "created_at": _now_ts(),
505
+ "model": config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL,
506
+ "counts": counts,
507
  "paths": {
508
+ "input_dir": "input/",
509
+ "extracted_text_dir": "extracted_text/",
510
+ "evaluations_dir": "evaluations/",
511
+ "reports_dir": "reports/",
512
+ },
513
+ }
514
+ _atomic_write_json(job_dir / "job.json", job_json)
515
+
516
+ artifacts = {
517
+ "schema_version": SCHEMA_VERSION,
518
+ "job_id": job_id,
519
+ "files": {
520
+ "job_json": "job.json",
521
+ "resumes_index": "resumes_index.json",
522
+ "reports": {
523
+ "project_buckets_json": "reports/project_buckets.json",
524
+ "project_buckets_csv": "reports/project_buckets.csv",
525
+ "top_candidates_json": "reports/top_candidates.json",
526
+ "top_candidates_csv": "reports/top_candidates.csv",
527
+ "candidate_analysis_csv": "reports/candidate_analysis.csv",
528
+ },
529
  },
 
530
  }
531
+ _atomic_write_json(job_dir / "artifacts.json", artifacts)
532
+
533
+ # Persist manifest last (atomic)
534
+ _atomic_write_json(manifest_path, manifest)
535
 
536
+ # Zip the job folder ONLY
537
+ zip_path = output_root_path / f"{job_id}.zip"
538
+ _zip_dir(job_dir, zip_path)
 
 
539
 
540
  return {
541
  "job_id": job_id,
542
  "job_dir": str(job_dir),
543
+ "zip_path": str(zip_path),
544
+ "counts": counts,
 
545
  }