Avinashnalla7 commited on
Commit
e786373
·
verified ·
1 Parent(s): fea1ab9

with OCR + dedupe + atomic index + schema

Browse files
Files changed (1) hide show
  1. pipeline.py +19 -64
pipeline.py CHANGED
@@ -3,7 +3,6 @@ import os
3
  import re
4
  import hashlib
5
  import shutil
6
- import time
7
  from datetime import datetime, timezone
8
  from pathlib import Path
9
  from typing import Any, Dict, List, Optional
@@ -15,25 +14,16 @@ from openai import OpenAI
15
  from tenacity import retry, stop_after_attempt, wait_exponential
16
 
17
 
18
- # -----------------------------
19
- # Schema / Constants
20
- # -----------------------------
21
-
22
  SCHEMA_VERSION = "1.0"
 
23
 
24
  ALLOWED_SCORE_KEYS = ["skill", "experience", "growth", "context_fit", "combined"]
25
 
26
- DEFAULT_MODEL = "gpt-4o-mini"
27
-
28
  INDEX_FILENAME = "resumes_index.json"
29
  EVAL_DIRNAME = "EVALUATIONS"
30
  TEXT_DIRNAME = "EXTRACTED_TEXT"
31
 
32
 
33
- # -----------------------------
34
- # Utilities
35
- # -----------------------------
36
-
37
  def _now_ts() -> str:
38
  return datetime.now(timezone.utc).isoformat()
39
 
@@ -57,7 +47,7 @@ def _atomic_write_json(path: Path, obj: Any) -> None:
57
  path.parent.mkdir(parents=True, exist_ok=True)
58
  tmp = path.with_suffix(path.suffix + ".tmp")
59
  tmp.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8")
60
- tmp.replace(path) # atomic on same filesystem
61
 
62
 
63
  def _load_index(index_path: Path) -> List[Dict[str, Any]]:
@@ -66,7 +56,6 @@ def _load_index(index_path: Path) -> List[Dict[str, Any]]:
66
  try:
67
  return json.loads(index_path.read_text(encoding="utf-8"))
68
  except Exception:
69
- # If corrupted, do not crash the whole pipeline. Start fresh but keep the old file.
70
  backup = index_path.with_suffix(".corrupt.json")
71
  try:
72
  shutil.copy2(index_path, backup)
@@ -76,12 +65,12 @@ def _load_index(index_path: Path) -> List[Dict[str, Any]]:
76
 
77
 
78
  def _index_by_sha(index: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
79
- m: Dict[str, Dict[str, Any]] = {}
80
  for r in index:
81
  sha = r.get("pdf_sha256")
82
  if sha:
83
- m[sha] = r
84
- return m
85
 
86
 
87
  def _coerce_score(v: Any) -> float:
@@ -96,12 +85,7 @@ def _coerce_score(v: Any) -> float:
96
  return f
97
 
98
 
99
- # -----------------------------
100
- # PDF text extraction + OCR fallback
101
- # -----------------------------
102
-
103
  def _pixmap_to_pil_rgb(pix: "fitz.Pixmap") -> Image.Image:
104
- # Ensure RGB (no alpha)
105
  if pix.alpha:
106
  pix = fitz.Pixmap(pix, 0)
107
  return Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
@@ -115,17 +99,18 @@ def extract_text_from_pdf(
115
  ocr_dpi: int = 200,
116
  ) -> str:
117
  """
118
- 1) Try normal text extraction via PyMuPDF.
119
- 2) If empty and ocr_if_empty: render pages -> pytesseract OCR.
120
  """
121
  try:
122
  doc = fitz.open(pdf_path)
123
  except Exception:
124
  return ""
125
 
126
- # Fast text extraction
127
  parts: List[str] = []
128
  page_count = min(len(doc), max_pages)
 
 
129
  for i in range(page_count):
130
  try:
131
  t = doc[i].get_text("text") or ""
@@ -156,10 +141,6 @@ def extract_text_from_pdf(
156
  return "\n\n".join(ocr_parts).strip()
157
 
158
 
159
- # -----------------------------
160
- # LLM prompt + normalization
161
- # -----------------------------
162
-
163
  def build_prompt(text: str, config: Dict[str, Any]) -> str:
164
  projects = config.get("projects") or []
165
  projects_block = json.dumps(projects, ensure_ascii=False)
@@ -192,7 +173,7 @@ Rules:
192
  - scores are 0..10 (float allowed)
193
  - combined must be a reasonable aggregate of the others (not random)
194
  - best_project.project_name must be one of the provided projects' names OR null
195
- - tags should be short, e.g. "Backend", "Data", "Python", "Senior", "Leadership"
196
  - If uncertain, be conservative.
197
 
198
  Projects (for matching):
@@ -227,7 +208,7 @@ def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any
227
 
228
  model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL
229
 
230
- out = {
231
  "schema_version": SCHEMA_VERSION,
232
  "candidate_name": raw.get("candidate_name"),
233
  "seniority": raw.get("seniority"),
@@ -235,12 +216,8 @@ def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any
235
  "best_project": {"project_name": project_name, "project_score": project_score},
236
  "tags": tags,
237
  "notes": raw.get("notes"),
238
- "meta": {
239
- "model": model,
240
- "timestamp": _now_ts(),
241
- },
242
  }
243
- return out
244
 
245
 
246
  @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=8))
@@ -254,7 +231,6 @@ def llm_evaluate(text: str, config: Dict[str, Any]) -> Dict[str, Any]:
254
  prompt = build_prompt(text, config)
255
 
256
  resp = client.responses.create(model=model, input=prompt)
257
-
258
  content = resp.output_text
259
  if not content or not content.strip():
260
  raise RuntimeError("LLM returned empty response.")
@@ -272,10 +248,6 @@ def llm_evaluate(text: str, config: Dict[str, Any]) -> Dict[str, Any]:
272
  return raw
273
 
274
 
275
- # -----------------------------
276
- # Pipeline (Notebook parity)
277
- # -----------------------------
278
-
279
  def _make_record_base(pdf_path: str, config: Dict[str, Any], project_name: str) -> Dict[str, Any]:
280
  filename = os.path.basename(pdf_path)
281
  model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL
@@ -286,11 +258,11 @@ def _make_record_base(pdf_path: str, config: Dict[str, Any], project_name: str)
286
  "candidate_name": None,
287
  "project": project_name,
288
  "model": model,
289
- "status": None, # success|skipped|failed
290
  "error": None,
291
  "created_at": _now_ts(),
292
- "output_json": None, # relative path under output_dir
293
- "extracted_text": None, # relative path under output_dir
294
  }
295
 
296
 
@@ -299,20 +271,6 @@ def run_pipeline(
299
  config: Dict[str, Any],
300
  output_dir: Optional[str] = None,
301
  ) -> str:
302
- """
303
- Writes outputs into output_dir:
304
- - resumes_index.json (append-only audit trail)
305
- - EVALUATIONS/*.json (per resume normalized evaluation)
306
- - EXTRACTED_TEXT/*.txt (extracted text/OCR text)
307
- Implements:
308
- - OCR fallback
309
- - dedupe by pdf_sha256 unless config["rewrite"] == True
310
- - atomic writes to index
311
- - consistent schema versioning
312
- Returns:
313
- output_dir (string path)
314
- """
315
-
316
  base_out = Path(output_dir or "/tmp/resume_eval_out").resolve()
317
  base_out.mkdir(parents=True, exist_ok=True)
318
 
@@ -330,7 +288,6 @@ def run_pipeline(
330
  projects = config.get("projects") or [{"name": "STANDARD"}]
331
  project_name = (projects[0] or {}).get("name", "STANDARD")
332
 
333
- # OCR knobs (configurable)
334
  ocr_max_pages = int(config.get("ocr_max_pages", 8))
335
  ocr_dpi = int(config.get("ocr_dpi", 200))
336
 
@@ -339,7 +296,7 @@ def run_pipeline(
339
  rec = _make_record_base(pdf_path, config, project_name)
340
  sha = rec["pdf_sha256"]
341
 
342
- # Dedupe
343
  if sha in index_map and not rewrite:
344
  rec["status"] = "skipped"
345
  rec["error"] = "duplicate_pdf_sha256"
@@ -359,14 +316,13 @@ def run_pipeline(
359
 
360
  # Persist extracted text
361
  text_name = f"{_safe_slug(Path(pdf_path).stem)}__{sha[:12]}.txt"
362
- text_path = text_dir / text_name
363
- text_path.write_text(text, encoding="utf-8")
364
- rec["extracted_text"] = str(text_path.relative_to(base_out))
365
 
366
  raw = llm_evaluate(text, config)
367
  ev = normalize_eval(raw, config)
368
 
369
- # Add file identity
370
  ev["filename"] = os.path.basename(pdf_path)
371
  ev["pdf_sha256"] = sha
372
 
@@ -383,7 +339,6 @@ def run_pipeline(
383
  rec["error"] = f"{type(e).__name__}: {e}"
384
 
385
  index.append(rec)
386
- # Persist after each file so partial progress is safe
387
  _atomic_write_json(index_path, index)
388
 
389
  return str(base_out)
 
3
  import re
4
  import hashlib
5
  import shutil
 
6
  from datetime import datetime, timezone
7
  from pathlib import Path
8
  from typing import Any, Dict, List, Optional
 
14
  from tenacity import retry, stop_after_attempt, wait_exponential
15
 
16
 
 
 
 
 
17
  SCHEMA_VERSION = "1.0"
18
+ DEFAULT_MODEL = "gpt-4o-mini"
19
 
20
  ALLOWED_SCORE_KEYS = ["skill", "experience", "growth", "context_fit", "combined"]
21
 
 
 
22
  INDEX_FILENAME = "resumes_index.json"
23
  EVAL_DIRNAME = "EVALUATIONS"
24
  TEXT_DIRNAME = "EXTRACTED_TEXT"
25
 
26
 
 
 
 
 
27
  def _now_ts() -> str:
28
  return datetime.now(timezone.utc).isoformat()
29
 
 
47
  path.parent.mkdir(parents=True, exist_ok=True)
48
  tmp = path.with_suffix(path.suffix + ".tmp")
49
  tmp.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8")
50
+ tmp.replace(path)
51
 
52
 
53
  def _load_index(index_path: Path) -> List[Dict[str, Any]]:
 
56
  try:
57
  return json.loads(index_path.read_text(encoding="utf-8"))
58
  except Exception:
 
59
  backup = index_path.with_suffix(".corrupt.json")
60
  try:
61
  shutil.copy2(index_path, backup)
 
65
 
66
 
67
  def _index_by_sha(index: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
68
+ out: Dict[str, Dict[str, Any]] = {}
69
  for r in index:
70
  sha = r.get("pdf_sha256")
71
  if sha:
72
+ out[sha] = r
73
+ return out
74
 
75
 
76
  def _coerce_score(v: Any) -> float:
 
85
  return f
86
 
87
 
 
 
 
 
88
  def _pixmap_to_pil_rgb(pix: "fitz.Pixmap") -> Image.Image:
 
89
  if pix.alpha:
90
  pix = fitz.Pixmap(pix, 0)
91
  return Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
 
99
  ocr_dpi: int = 200,
100
  ) -> str:
101
  """
102
+ 1) Extract text with PyMuPDF.
103
+ 2) If empty and ocr_if_empty: OCR first max_pages pages.
104
  """
105
  try:
106
  doc = fitz.open(pdf_path)
107
  except Exception:
108
  return ""
109
 
 
110
  parts: List[str] = []
111
  page_count = min(len(doc), max_pages)
112
+
113
+ # Normal extraction
114
  for i in range(page_count):
115
  try:
116
  t = doc[i].get_text("text") or ""
 
141
  return "\n\n".join(ocr_parts).strip()
142
 
143
 
 
 
 
 
144
  def build_prompt(text: str, config: Dict[str, Any]) -> str:
145
  projects = config.get("projects") or []
146
  projects_block = json.dumps(projects, ensure_ascii=False)
 
173
  - scores are 0..10 (float allowed)
174
  - combined must be a reasonable aggregate of the others (not random)
175
  - best_project.project_name must be one of the provided projects' names OR null
176
+ - tags should be short
177
  - If uncertain, be conservative.
178
 
179
  Projects (for matching):
 
208
 
209
  model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL
210
 
211
+ return {
212
  "schema_version": SCHEMA_VERSION,
213
  "candidate_name": raw.get("candidate_name"),
214
  "seniority": raw.get("seniority"),
 
216
  "best_project": {"project_name": project_name, "project_score": project_score},
217
  "tags": tags,
218
  "notes": raw.get("notes"),
219
+ "meta": {"model": model, "timestamp": _now_ts()},
 
 
 
220
  }
 
221
 
222
 
223
  @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=8))
 
231
  prompt = build_prompt(text, config)
232
 
233
  resp = client.responses.create(model=model, input=prompt)
 
234
  content = resp.output_text
235
  if not content or not content.strip():
236
  raise RuntimeError("LLM returned empty response.")
 
248
  return raw
249
 
250
 
 
 
 
 
251
  def _make_record_base(pdf_path: str, config: Dict[str, Any], project_name: str) -> Dict[str, Any]:
252
  filename = os.path.basename(pdf_path)
253
  model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL
 
258
  "candidate_name": None,
259
  "project": project_name,
260
  "model": model,
261
+ "status": None, # success|skipped|failed
262
  "error": None,
263
  "created_at": _now_ts(),
264
+ "output_json": None, # relative under output_dir
265
+ "extracted_text": None, # relative under output_dir
266
  }
267
 
268
 
 
271
  config: Dict[str, Any],
272
  output_dir: Optional[str] = None,
273
  ) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
274
  base_out = Path(output_dir or "/tmp/resume_eval_out").resolve()
275
  base_out.mkdir(parents=True, exist_ok=True)
276
 
 
288
  projects = config.get("projects") or [{"name": "STANDARD"}]
289
  project_name = (projects[0] or {}).get("name", "STANDARD")
290
 
 
291
  ocr_max_pages = int(config.get("ocr_max_pages", 8))
292
  ocr_dpi = int(config.get("ocr_dpi", 200))
293
 
 
296
  rec = _make_record_base(pdf_path, config, project_name)
297
  sha = rec["pdf_sha256"]
298
 
299
+ # dedupe
300
  if sha in index_map and not rewrite:
301
  rec["status"] = "skipped"
302
  rec["error"] = "duplicate_pdf_sha256"
 
316
 
317
  # Persist extracted text
318
  text_name = f"{_safe_slug(Path(pdf_path).stem)}__{sha[:12]}.txt"
319
+ tpath = text_dir / text_name
320
+ tpath.write_text(text, encoding="utf-8")
321
+ rec["extracted_text"] = str(tpath.relative_to(base_out))
322
 
323
  raw = llm_evaluate(text, config)
324
  ev = normalize_eval(raw, config)
325
 
 
326
  ev["filename"] = os.path.basename(pdf_path)
327
  ev["pdf_sha256"] = sha
328
 
 
339
  rec["error"] = f"{type(e).__name__}: {e}"
340
 
341
  index.append(rec)
 
342
  _atomic_write_json(index_path, index)
343
 
344
  return str(base_out)