Avinashnalla7 commited on
Commit
fea1ab9
·
verified ·
1 Parent(s): a917544

Update pipeline.py

Browse files
Files changed (1) hide show
  1. pipeline.py +236 -74
pipeline.py CHANGED
@@ -1,22 +1,41 @@
1
  import json
2
  import os
3
  import re
 
4
  import shutil
5
  import time
6
- from dataclasses import dataclass
7
  from pathlib import Path
8
- from typing import Any, Dict, List, Optional, Tuple
9
 
10
- from pypdf import PdfReader
 
 
11
  from openai import OpenAI
12
  from tenacity import retry, stop_after_attempt, wait_exponential
13
 
14
 
 
 
 
 
 
 
15
  ALLOWED_SCORE_KEYS = ["skill", "experience", "growth", "context_fit", "combined"]
16
 
 
 
 
 
 
 
 
 
 
 
17
 
18
  def _now_ts() -> str:
19
- return time.strftime("%Y-%m-%d %H:%M:%S")
20
 
21
 
22
  def _safe_slug(s: str, max_len: int = 80) -> str:
@@ -26,21 +45,122 @@ def _safe_slug(s: str, max_len: int = 80) -> str:
26
  return s[:max_len] if s else "UNKNOWN"
27
 
28
 
29
- def extract_text_from_pdf(pdf_path: str) -> str:
30
- reader = PdfReader(pdf_path)
31
- parts = []
32
- for page in reader.pages:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  try:
34
- t = page.extract_text() or ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  except Exception:
36
  t = ""
37
  if t.strip():
38
  parts.append(t)
39
- return "\n\n".join(parts).strip()
40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
  def build_prompt(text: str, config: Dict[str, Any]) -> str:
43
- # You can extend this later with per-project criteria.
44
  projects = config.get("projects") or []
45
  projects_block = json.dumps(projects, ensure_ascii=False)
46
 
@@ -83,18 +203,6 @@ Resume text:
83
  """.strip()
84
 
85
 
86
- def _coerce_score(v: Any) -> float:
87
- try:
88
- f = float(v)
89
- except Exception:
90
- return 0.0
91
- if f < 0:
92
- return 0.0
93
- if f > 10:
94
- return 10.0
95
- return f
96
-
97
-
98
  def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
99
  scores = raw.get("scores") if isinstance(raw.get("scores"), dict) else {}
100
  norm_scores = {k: _coerce_score(scores.get(k, 0)) for k in ALLOWED_SCORE_KEYS}
@@ -103,7 +211,11 @@ def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any
103
  project_name = best_project.get("project_name")
104
  project_score = _coerce_score(best_project.get("project_score", 0))
105
 
106
- allowed_project_names = {p.get("name") for p in (config.get("projects") or []) if isinstance(p, dict)}
 
 
 
 
107
  if project_name not in allowed_project_names:
108
  project_name = None
109
 
@@ -113,7 +225,10 @@ def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any
113
  tags = [str(t).strip() for t in tags if str(t).strip()]
114
  tags = tags[:25]
115
 
 
 
116
  out = {
 
117
  "candidate_name": raw.get("candidate_name"),
118
  "seniority": raw.get("seniority"),
119
  "scores": norm_scores,
@@ -121,7 +236,7 @@ def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any
121
  "tags": tags,
122
  "notes": raw.get("notes"),
123
  "meta": {
124
- "model": config.get("model"),
125
  "timestamp": _now_ts(),
126
  },
127
  }
@@ -135,25 +250,21 @@ def llm_evaluate(text: str, config: Dict[str, Any]) -> Dict[str, Any]:
135
  raise RuntimeError("Missing OPENAI_API_KEY (set it in HF Space Secrets).")
136
 
137
  client = OpenAI(api_key=api_key)
138
- model = config.get("model") or os.getenv("OPENAI_MODEL") or "gpt-4o-mini"
139
-
140
  prompt = build_prompt(text, config)
141
 
142
- # Use Responses API. Enforce JSON by instruction + parsing.
143
- resp = client.responses.create(
144
- model=model,
145
- input=prompt,
146
- )
147
 
148
  content = resp.output_text
149
  if not content or not content.strip():
150
  raise RuntimeError("LLM returned empty response.")
151
 
152
- # Hard parse JSON (no tolerance for garbage)
153
  try:
154
  raw = json.loads(content)
155
  except Exception as e:
156
- raise RuntimeError(f"LLM did not return valid JSON. First 200 chars: {content[:200]!r}") from e
 
 
157
 
158
  if not isinstance(raw, dict):
159
  raise RuntimeError("LLM JSON must be an object/dict at top-level.")
@@ -161,67 +272,118 @@ def llm_evaluate(text: str, config: Dict[str, Any]) -> Dict[str, Any]:
161
  return raw
162
 
163
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
164
  def run_pipeline(
165
  input_files: List[str],
166
  config: Dict[str, Any],
167
- base_out_dir: Optional[str] = None,
168
  ) -> str:
169
- base_out = Path(base_out_dir or "/tmp/resume_eval_out").resolve()
170
- if base_out.exists():
171
- shutil.rmtree(base_out)
 
 
 
 
 
 
 
 
 
 
 
 
172
  base_out.mkdir(parents=True, exist_ok=True)
173
 
174
- eval_dir = base_out / "EVALUATIONS"
175
  eval_dir.mkdir(parents=True, exist_ok=True)
176
 
177
- evaluations: List[Dict[str, Any]] = []
178
- errors: List[Dict[str, Any]] = []
 
 
 
 
179
 
180
- for pdf_path in input_files:
 
 
 
 
 
 
 
 
181
  pdf_path = str(Path(pdf_path).resolve())
182
- filename = os.path.basename(pdf_path)
 
 
 
 
 
 
 
 
 
183
 
184
  try:
185
- text = extract_text_from_pdf(pdf_path)
186
- if not text:
187
- raise RuntimeError("No extractable text from PDF (scanned image / empty).")
 
 
 
 
 
 
 
 
 
 
 
188
 
189
  raw = llm_evaluate(text, config)
190
  ev = normalize_eval(raw, config)
191
 
192
  # Add file identity
193
- ev["filename"] = filename
 
194
 
195
- # Write per-file json
196
- safe_name = _safe_slug(ev.get("candidate_name") or Path(filename).stem)
197
- out_path = eval_dir / f"{safe_name}__{Path(filename).stem}.json"
198
  out_path.write_text(json.dumps(ev, ensure_ascii=False, indent=2), encoding="utf-8")
199
 
200
- evaluations.append(ev)
 
 
201
 
202
  except Exception as e:
203
- err = {"filename": filename, "error": str(e)}
204
- errors.append(err)
205
-
206
- master = {
207
- "count": len(evaluations),
208
- "errors_count": len(errors),
209
- "evaluations": evaluations,
210
- "errors": errors,
211
- "meta": {
212
- "model": config.get("model") or os.getenv("OPENAI_MODEL") or "gpt-4o-mini",
213
- "timestamp": _now_ts(),
214
- },
215
- }
216
-
217
- (base_out / "master_index.json").write_text(
218
- json.dumps(master, ensure_ascii=False, indent=2),
219
- encoding="utf-8",
220
- )
221
 
222
- zip_path = str(base_out.parent / "results.zip")
223
- if os.path.exists(zip_path):
224
- os.remove(zip_path)
225
 
226
- shutil.make_archive(zip_path.replace(".zip", ""), "zip", str(base_out))
227
- return zip_path
 
1
  import json
2
  import os
3
  import re
4
+ import hashlib
5
  import shutil
6
  import time
7
+ from datetime import datetime, timezone
8
  from pathlib import Path
9
+ from typing import Any, Dict, List, Optional
10
 
11
+ import fitz # pymupdf
12
+ import pytesseract
13
+ from PIL import Image
14
  from openai import OpenAI
15
  from tenacity import retry, stop_after_attempt, wait_exponential
16
 
17
 
18
+ # -----------------------------
19
+ # Schema / Constants
20
+ # -----------------------------
21
+
22
+ SCHEMA_VERSION = "1.0"
23
+
24
  ALLOWED_SCORE_KEYS = ["skill", "experience", "growth", "context_fit", "combined"]
25
 
26
+ DEFAULT_MODEL = "gpt-4o-mini"
27
+
28
+ INDEX_FILENAME = "resumes_index.json"
29
+ EVAL_DIRNAME = "EVALUATIONS"
30
+ TEXT_DIRNAME = "EXTRACTED_TEXT"
31
+
32
+
33
+ # -----------------------------
34
+ # Utilities
35
+ # -----------------------------
36
 
37
  def _now_ts() -> str:
38
+ return datetime.now(timezone.utc).isoformat()
39
 
40
 
41
  def _safe_slug(s: str, max_len: int = 80) -> str:
 
45
  return s[:max_len] if s else "UNKNOWN"
46
 
47
 
48
+ def _sha256_file(path: str) -> str:
49
+ h = hashlib.sha256()
50
+ with open(path, "rb") as f:
51
+ for chunk in iter(lambda: f.read(1024 * 1024), b""):
52
+ h.update(chunk)
53
+ return h.hexdigest()
54
+
55
+
56
+ def _atomic_write_json(path: Path, obj: Any) -> None:
57
+ path.parent.mkdir(parents=True, exist_ok=True)
58
+ tmp = path.with_suffix(path.suffix + ".tmp")
59
+ tmp.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8")
60
+ tmp.replace(path) # atomic on same filesystem
61
+
62
+
63
+ def _load_index(index_path: Path) -> List[Dict[str, Any]]:
64
+ if not index_path.exists():
65
+ return []
66
+ try:
67
+ return json.loads(index_path.read_text(encoding="utf-8"))
68
+ except Exception:
69
+ # If corrupted, do not crash the whole pipeline. Start fresh but keep the old file.
70
+ backup = index_path.with_suffix(".corrupt.json")
71
  try:
72
+ shutil.copy2(index_path, backup)
73
+ except Exception:
74
+ pass
75
+ return []
76
+
77
+
78
+ def _index_by_sha(index: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
79
+ m: Dict[str, Dict[str, Any]] = {}
80
+ for r in index:
81
+ sha = r.get("pdf_sha256")
82
+ if sha:
83
+ m[sha] = r
84
+ return m
85
+
86
+
87
+ def _coerce_score(v: Any) -> float:
88
+ try:
89
+ f = float(v)
90
+ except Exception:
91
+ return 0.0
92
+ if f < 0:
93
+ return 0.0
94
+ if f > 10:
95
+ return 10.0
96
+ return f
97
+
98
+
99
+ # -----------------------------
100
+ # PDF text extraction + OCR fallback
101
+ # -----------------------------
102
+
103
+ def _pixmap_to_pil_rgb(pix: "fitz.Pixmap") -> Image.Image:
104
+ # Ensure RGB (no alpha)
105
+ if pix.alpha:
106
+ pix = fitz.Pixmap(pix, 0)
107
+ return Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
108
+
109
+
110
+ def extract_text_from_pdf(
111
+ pdf_path: str,
112
+ *,
113
+ ocr_if_empty: bool = True,
114
+ max_pages: int = 8,
115
+ ocr_dpi: int = 200,
116
+ ) -> str:
117
+ """
118
+ 1) Try normal text extraction via PyMuPDF.
119
+ 2) If empty and ocr_if_empty: render pages -> pytesseract OCR.
120
+ """
121
+ try:
122
+ doc = fitz.open(pdf_path)
123
+ except Exception:
124
+ return ""
125
+
126
+ # Fast text extraction
127
+ parts: List[str] = []
128
+ page_count = min(len(doc), max_pages)
129
+ for i in range(page_count):
130
+ try:
131
+ t = doc[i].get_text("text") or ""
132
  except Exception:
133
  t = ""
134
  if t.strip():
135
  parts.append(t)
 
136
 
137
+ text = "\n\n".join(parts).strip()
138
+ if text or not ocr_if_empty:
139
+ doc.close()
140
+ return text
141
+
142
+ # OCR fallback
143
+ ocr_parts: List[str] = []
144
+ for i in range(page_count):
145
+ try:
146
+ page = doc[i]
147
+ pix = page.get_pixmap(dpi=ocr_dpi)
148
+ img = _pixmap_to_pil_rgb(pix)
149
+ ocr_txt = pytesseract.image_to_string(img) or ""
150
+ if ocr_txt.strip():
151
+ ocr_parts.append(ocr_txt)
152
+ except Exception:
153
+ continue
154
+
155
+ doc.close()
156
+ return "\n\n".join(ocr_parts).strip()
157
+
158
+
159
+ # -----------------------------
160
+ # LLM prompt + normalization
161
+ # -----------------------------
162
 
163
  def build_prompt(text: str, config: Dict[str, Any]) -> str:
 
164
  projects = config.get("projects") or []
165
  projects_block = json.dumps(projects, ensure_ascii=False)
166
 
 
203
  """.strip()
204
 
205
 
 
 
 
 
 
 
 
 
 
 
 
 
206
  def normalize_eval(raw: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
207
  scores = raw.get("scores") if isinstance(raw.get("scores"), dict) else {}
208
  norm_scores = {k: _coerce_score(scores.get(k, 0)) for k in ALLOWED_SCORE_KEYS}
 
211
  project_name = best_project.get("project_name")
212
  project_score = _coerce_score(best_project.get("project_score", 0))
213
 
214
+ allowed_project_names = {
215
+ p.get("name")
216
+ for p in (config.get("projects") or [])
217
+ if isinstance(p, dict) and p.get("name")
218
+ }
219
  if project_name not in allowed_project_names:
220
  project_name = None
221
 
 
225
  tags = [str(t).strip() for t in tags if str(t).strip()]
226
  tags = tags[:25]
227
 
228
+ model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL
229
+
230
  out = {
231
+ "schema_version": SCHEMA_VERSION,
232
  "candidate_name": raw.get("candidate_name"),
233
  "seniority": raw.get("seniority"),
234
  "scores": norm_scores,
 
236
  "tags": tags,
237
  "notes": raw.get("notes"),
238
  "meta": {
239
+ "model": model,
240
  "timestamp": _now_ts(),
241
  },
242
  }
 
250
  raise RuntimeError("Missing OPENAI_API_KEY (set it in HF Space Secrets).")
251
 
252
  client = OpenAI(api_key=api_key)
253
+ model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL
 
254
  prompt = build_prompt(text, config)
255
 
256
+ resp = client.responses.create(model=model, input=prompt)
 
 
 
 
257
 
258
  content = resp.output_text
259
  if not content or not content.strip():
260
  raise RuntimeError("LLM returned empty response.")
261
 
 
262
  try:
263
  raw = json.loads(content)
264
  except Exception as e:
265
+ raise RuntimeError(
266
+ f"LLM did not return valid JSON. First 200 chars: {content[:200]!r}"
267
+ ) from e
268
 
269
  if not isinstance(raw, dict):
270
  raise RuntimeError("LLM JSON must be an object/dict at top-level.")
 
272
  return raw
273
 
274
 
275
+ # -----------------------------
276
+ # Pipeline (Notebook parity)
277
+ # -----------------------------
278
+
279
+ def _make_record_base(pdf_path: str, config: Dict[str, Any], project_name: str) -> Dict[str, Any]:
280
+ filename = os.path.basename(pdf_path)
281
+ model = config.get("model") or os.getenv("OPENAI_MODEL") or DEFAULT_MODEL
282
+ return {
283
+ "schema_version": SCHEMA_VERSION,
284
+ "pdf_sha256": _sha256_file(pdf_path),
285
+ "filename": filename,
286
+ "candidate_name": None,
287
+ "project": project_name,
288
+ "model": model,
289
+ "status": None, # success|skipped|failed
290
+ "error": None,
291
+ "created_at": _now_ts(),
292
+ "output_json": None, # relative path under output_dir
293
+ "extracted_text": None, # relative path under output_dir
294
+ }
295
+
296
+
297
  def run_pipeline(
298
  input_files: List[str],
299
  config: Dict[str, Any],
300
+ output_dir: Optional[str] = None,
301
  ) -> str:
302
+ """
303
+ Writes outputs into output_dir:
304
+ - resumes_index.json (append-only audit trail)
305
+ - EVALUATIONS/*.json (per resume normalized evaluation)
306
+ - EXTRACTED_TEXT/*.txt (extracted text/OCR text)
307
+ Implements:
308
+ - OCR fallback
309
+ - dedupe by pdf_sha256 unless config["rewrite"] == True
310
+ - atomic writes to index
311
+ - consistent schema versioning
312
+ Returns:
313
+ output_dir (string path)
314
+ """
315
+
316
+ base_out = Path(output_dir or "/tmp/resume_eval_out").resolve()
317
  base_out.mkdir(parents=True, exist_ok=True)
318
 
319
+ eval_dir = base_out / EVAL_DIRNAME
320
  eval_dir.mkdir(parents=True, exist_ok=True)
321
 
322
+ text_dir = base_out / TEXT_DIRNAME
323
+ text_dir.mkdir(parents=True, exist_ok=True)
324
+
325
+ index_path = base_out / INDEX_FILENAME
326
+ index = _load_index(index_path)
327
+ index_map = _index_by_sha(index)
328
 
329
+ rewrite = bool(config.get("rewrite", False))
330
+ projects = config.get("projects") or [{"name": "STANDARD"}]
331
+ project_name = (projects[0] or {}).get("name", "STANDARD")
332
+
333
+ # OCR knobs (configurable)
334
+ ocr_max_pages = int(config.get("ocr_max_pages", 8))
335
+ ocr_dpi = int(config.get("ocr_dpi", 200))
336
+
337
+ for pdf_path in input_files or []:
338
  pdf_path = str(Path(pdf_path).resolve())
339
+ rec = _make_record_base(pdf_path, config, project_name)
340
+ sha = rec["pdf_sha256"]
341
+
342
+ # Dedupe
343
+ if sha in index_map and not rewrite:
344
+ rec["status"] = "skipped"
345
+ rec["error"] = "duplicate_pdf_sha256"
346
+ index.append(rec)
347
+ _atomic_write_json(index_path, index)
348
+ continue
349
 
350
  try:
351
+ text = extract_text_from_pdf(
352
+ pdf_path,
353
+ ocr_if_empty=True,
354
+ max_pages=ocr_max_pages,
355
+ ocr_dpi=ocr_dpi,
356
+ )
357
+ if not text.strip():
358
+ raise RuntimeError("No extractable text (even after OCR).")
359
+
360
+ # Persist extracted text
361
+ text_name = f"{_safe_slug(Path(pdf_path).stem)}__{sha[:12]}.txt"
362
+ text_path = text_dir / text_name
363
+ text_path.write_text(text, encoding="utf-8")
364
+ rec["extracted_text"] = str(text_path.relative_to(base_out))
365
 
366
  raw = llm_evaluate(text, config)
367
  ev = normalize_eval(raw, config)
368
 
369
  # Add file identity
370
+ ev["filename"] = os.path.basename(pdf_path)
371
+ ev["pdf_sha256"] = sha
372
 
373
+ safe_name = _safe_slug(ev.get("candidate_name") or Path(pdf_path).stem)
374
+ out_path = eval_dir / f"{safe_name}__{sha[:12]}.json"
 
375
  out_path.write_text(json.dumps(ev, ensure_ascii=False, indent=2), encoding="utf-8")
376
 
377
+ rec["status"] = "success"
378
+ rec["candidate_name"] = ev.get("candidate_name")
379
+ rec["output_json"] = str(out_path.relative_to(base_out))
380
 
381
  except Exception as e:
382
+ rec["status"] = "failed"
383
+ rec["error"] = f"{type(e).__name__}: {e}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
384
 
385
+ index.append(rec)
386
+ # Persist after each file so partial progress is safe
387
+ _atomic_write_json(index_path, index)
388
 
389
+ return str(base_out)