hchevva commited on
Commit
8b1e2a5
·
verified ·
1 Parent(s): e46893f

Create pipeline.py

Browse files
Files changed (1) hide show
  1. toxra_core/pipeline.py +619 -0
toxra_core/pipeline.py ADDED
@@ -0,0 +1,619 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ toxra_core.pipeline — robust grounded extraction core for TOXRA.AI
3
+
4
+ Implements:
5
+ - PDF text extraction (text-based PDFs only)
6
+ - Page-aware chunking with overlap
7
+ - Keyword-based chunk selection to fit context limits
8
+ - OpenAI Responses API structured extraction (json_schema)
9
+ - Rich schema builder from Field Spec + Controlled Vocab
10
+ - Endpoint filtering: families + specific OECD TGs
11
+ - Row-mode logic: one_row_per_paper vs one_row_per_chemical_endpoint (policy + heuristics)
12
+ - Evidence management: per-field quote + page; verification against provided context
13
+ - Post-processing: normalize records, clamp confidence, cap runaway outputs
14
+ """
15
+
16
+ from __future__ import annotations
17
+
18
+ import os
19
+ import re
20
+ import json
21
+ import time
22
+ import hashlib
23
+ from dataclasses import dataclass
24
+ from typing import Any, Dict, List, Tuple, Optional
25
+
26
+ import pandas as pd
27
+ from pypdf import PdfReader
28
+
29
+ try:
30
+ from openai import OpenAI
31
+ except Exception: # pragma: no cover
32
+ OpenAI = None # type: ignore
33
+
34
+
35
+ # =============================
36
+ # Tunables (env overrides)
37
+ # =============================
38
+ DEFAULT_CHUNK_SIZE = int(os.getenv("TOXRA_CHUNK_SIZE", "3200"))
39
+ DEFAULT_CHUNK_OVERLAP = int(os.getenv("TOXRA_CHUNK_OVERLAP", "250"))
40
+ DEFAULT_MAX_RECORDS_PER_PDF = int(os.getenv("TOXRA_MAX_RECORDS_PER_PDF", "120"))
41
+ ENABLE_CHEM_SCAN = os.getenv("TOXRA_ENABLE_CHEM_SCAN", "1").strip() == "1" # robust but costs extra call
42
+
43
+ CAS_RE = re.compile(r"\b\d{2,7}-\d{2}-\d\b")
44
+ WS_RE = re.compile(r"\s+")
45
+
46
+ RISK_STANCE_ENUM = ["acceptable", "acceptable_with_uncertainty", "not_acceptable", "insufficient_data"]
47
+
48
+
49
+ @dataclass
50
+ class Chunk:
51
+ chunk_id: str
52
+ page: int
53
+ text: str
54
+
55
+
56
+ # =============================
57
+ # Utility
58
+ # =============================
59
+ def _clean_text(t: str) -> str:
60
+ t = (t or "").replace("\x00", " ")
61
+ t = WS_RE.sub(" ", t).strip()
62
+ return t
63
+
64
+
65
+ def _sha1(s: str) -> str:
66
+ return hashlib.sha1((s or "").encode("utf-8", errors="ignore")).hexdigest()[:12]
67
+
68
+
69
+ def _safe_json_loads(s: str, fallback: Any) -> Any:
70
+ try:
71
+ return json.loads(s) if s else fallback
72
+ except Exception:
73
+ return fallback
74
+
75
+
76
+ # =============================
77
+ # PDF extraction
78
+ # =============================
79
+ def extract_pages(pdf_path: str, max_pages: int) -> Tuple[List[Tuple[int, str]], int]:
80
+ reader = PdfReader(pdf_path)
81
+ total = len(reader.pages)
82
+ n = min(total, max_pages) if max_pages and max_pages > 0 else total
83
+ out: List[Tuple[int, str]] = []
84
+ for i in range(n):
85
+ try:
86
+ txt = reader.pages[i].extract_text() or ""
87
+ except Exception:
88
+ txt = ""
89
+ out.append((i + 1, txt))
90
+ return out, total
91
+
92
+
93
+ def is_text_based(pages: List[Tuple[int, str]]) -> bool:
94
+ joined = " ".join([_clean_text(t) for _, t in pages if _clean_text(t)])
95
+ return len(joined) >= 200
96
+
97
+
98
+ def chunk_pages(
99
+ pages: List[Tuple[int, str]],
100
+ chunk_size: int = DEFAULT_CHUNK_SIZE,
101
+ overlap: int = DEFAULT_CHUNK_OVERLAP,
102
+ ) -> List[Chunk]:
103
+ chunks: List[Chunk] = []
104
+ for pno, raw in pages:
105
+ txt = _clean_text(raw)
106
+ if not txt:
107
+ continue
108
+ if len(txt) <= chunk_size:
109
+ chunks.append(Chunk(chunk_id=f"p{pno}_{_sha1(txt)}", page=pno, text=txt))
110
+ continue
111
+
112
+ start = 0
113
+ while start < len(txt):
114
+ end = min(len(txt), start + chunk_size)
115
+ seg = txt[start:end]
116
+ chunks.append(Chunk(chunk_id=f"p{pno}_{start}_{end}", page=pno, text=seg))
117
+ if end >= len(txt):
118
+ break
119
+ start = max(0, end - overlap)
120
+ return chunks
121
+
122
+
123
+ def select_chunks(
124
+ chunks: List[Chunk],
125
+ max_context_chars: int,
126
+ query_terms: List[str],
127
+ always_take_first_page: bool = True,
128
+ ) -> Tuple[List[Chunk], Dict[str, Any]]:
129
+ if not chunks:
130
+ return [], {"reason": "no_chunks"}
131
+
132
+ q = [t.lower() for t in (query_terms or []) if t and t.strip()]
133
+
134
+ scored = []
135
+ for ch in chunks:
136
+ t = ch.text.lower()
137
+ score = 0
138
+ for term in q:
139
+ if term in t:
140
+ score += 1
141
+ scored.append((score, ch))
142
+
143
+ scored.sort(key=lambda x: (x[0], -len(x[1].text)), reverse=True)
144
+
145
+ selected: List[Chunk] = []
146
+ used = 0
147
+
148
+ if always_take_first_page:
149
+ first = [c for c in chunks if c.page == 1]
150
+ if first:
151
+ c0 = first[0]
152
+ if used + len(c0.text) + 60 <= max_context_chars:
153
+ selected.append(c0)
154
+ used += len(c0.text) + 60
155
+
156
+ for score, ch in scored:
157
+ if ch in selected:
158
+ continue
159
+ block_len = len(ch.text) + 60
160
+ if used + block_len > max_context_chars:
161
+ continue
162
+ selected.append(ch)
163
+ used += block_len
164
+ if used >= max_context_chars:
165
+ break
166
+
167
+ if not selected and chunks:
168
+ ch = chunks[0]
169
+ clip = ch.text[: max(0, max_context_chars - 60)]
170
+ selected = [Chunk(chunk_id=ch.chunk_id, page=ch.page, text=clip)]
171
+
172
+ debug = {
173
+ "max_context_chars": max_context_chars,
174
+ "query_terms": query_terms,
175
+ "selected_count": len(selected),
176
+ "selected_pages": sorted(list({c.page for c in selected})),
177
+ }
178
+ return selected, debug
179
+
180
+
181
+ def build_context(selected_chunks: List[Chunk], file_name: str) -> str:
182
+ parts = [f"[FILE] {file_name}"]
183
+ selected_chunks = sorted(selected_chunks, key=lambda c: (c.page, c.chunk_id))
184
+ for ch in selected_chunks:
185
+ parts.append(f"\n[PAGE {ch.page} | {ch.chunk_id}]\n{ch.text}\n")
186
+ return "\n".join(parts).strip()
187
+
188
+
189
+ # =============================
190
+ # Admin JSON parsing + schema building
191
+ # =============================
192
+ def parse_admin_json(vocab_json: str, spec_json: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
193
+ vocab_default = {
194
+ "risk_stance_enum": RISK_STANCE_ENUM,
195
+ "approach_enum": ["in_vivo", "in_vitro", "in_silico", "nams", "mixed", "not_reported"],
196
+ "genotoxicity_oecd_tg_in_vitro_enum": [],
197
+ "genotoxicity_oecd_tg_in_vivo_enum": [],
198
+ }
199
+ vocab = _safe_json_loads(vocab_json, vocab_default)
200
+ spec = _safe_json_loads(spec_json, [])
201
+
202
+ if not isinstance(vocab, dict):
203
+ vocab = vocab_default
204
+ if not isinstance(spec, list):
205
+ spec = []
206
+ return vocab, spec
207
+
208
+
209
+ def _resolve_enum_list(vocab: Dict[str, Any], enum_values: str) -> List[str]:
210
+ enum_values = (enum_values or "").strip()
211
+ if not enum_values:
212
+ return []
213
+ if enum_values in vocab and isinstance(vocab[enum_values], list):
214
+ return [str(x) for x in vocab[enum_values]]
215
+ return [x.strip() for x in enum_values.split(",") if x.strip()]
216
+
217
+
218
+ def build_output_schema(vocab: Dict[str, Any], spec: List[Dict[str, Any]]) -> Dict[str, Any]:
219
+ """
220
+ Strict JSON schema for OpenAI Responses API.
221
+ NOTE: required MUST include every property key (OpenAI validator requirement).
222
+ """
223
+
224
+ def field_schema(f: Dict[str, Any]) -> Dict[str, Any]:
225
+ ftype = (f.get("type") or "str").strip()
226
+ enum_values = (f.get("enum_values") or "").strip()
227
+
228
+ if ftype == "str":
229
+ return {"type": ["string", "null"]}
230
+ if ftype == "num":
231
+ return {"type": ["number", "null"]}
232
+ if ftype == "bool":
233
+ return {"type": ["boolean", "null"]}
234
+ if ftype == "list[str]":
235
+ return {"type": ["array", "null"], "items": {"type": "string"}}
236
+ if ftype == "list[num]":
237
+ return {"type": ["array", "null"], "items": {"type": "number"}}
238
+ if ftype == "enum":
239
+ enum_list = _resolve_enum_list(vocab, enum_values)
240
+ return {"type": ["string", "null"], "enum": enum_list}
241
+ if ftype == "list[enum]":
242
+ enum_list = _resolve_enum_list(vocab, enum_values)
243
+ return {"type": ["array", "null"], "items": {"type": "string", "enum": enum_list}}
244
+ return {"type": ["string", "null"]}
245
+
246
+ record_props: Dict[str, Any] = {
247
+ "file": {"type": "string"},
248
+ "row_mode": {"type": "string", "enum": ["one_row_per_paper", "one_row_per_chemical_endpoint"]},
249
+ "chemical": {"type": ["string", "null"]},
250
+ "endpoint": {"type": ["string", "null"]},
251
+ }
252
+
253
+ for f in spec:
254
+ name = (f.get("field") or "").strip()
255
+ if not name:
256
+ continue
257
+ record_props[name] = field_schema(f)
258
+
259
+ required_keys = list(record_props.keys())
260
+
261
+ schema = {
262
+ "type": "object",
263
+ "properties": {
264
+ "records": {
265
+ "type": "array",
266
+ "items": {
267
+ "type": "object",
268
+ "properties": record_props,
269
+ "required": required_keys,
270
+ "additionalProperties": False,
271
+ },
272
+ },
273
+ "evidence": {
274
+ "type": "array",
275
+ "items": {
276
+ "type": "object",
277
+ "properties": {
278
+ "record_index": {"type": "integer"},
279
+ "field": {"type": "string"},
280
+ "page": {"type": "integer"},
281
+ "quote": {"type": "string"},
282
+ },
283
+ "required": ["record_index", "field", "page", "quote"],
284
+ "additionalProperties": False,
285
+ },
286
+ },
287
+ "notes": {"type": "string"},
288
+ },
289
+ "required": ["records", "evidence", "notes"],
290
+ "additionalProperties": False,
291
+ }
292
+ return schema
293
+
294
+
295
+ # =============================
296
+ # Selection → query term expansion (for chunk selection)
297
+ # =============================
298
+ def keyword_terms_for_selection(endpoint_families: List[str], oecd_tgs: List[str], vocab: Dict[str, Any]) -> List[str]:
299
+ terms: List[str] = []
300
+ for f in endpoint_families or []:
301
+ terms.append(f)
302
+
303
+ for tg in oecd_tgs or []:
304
+ terms.append(tg)
305
+ m = re.search(r"\b(\d{3})\b", tg)
306
+ if m:
307
+ terms.append(m.group(1))
308
+
309
+ # NAMs/in silico cues
310
+ terms += ["in silico", "QSAR", "read-across", "NAMs", "NAMS", "AOP", "pathway", "transcript", "omics"]
311
+
312
+ # Pull common TG vocab terms to help ranking
313
+ for k in ["genotoxicity_oecd_tg_in_vitro_enum", "genotoxicity_oecd_tg_in_vivo_enum"]:
314
+ if k in vocab and isinstance(vocab[k], list):
315
+ for v in vocab[k][:25]:
316
+ terms.append(str(v))
317
+
318
+ # dedupe
319
+ out: List[str] = []
320
+ seen = set()
321
+ for t in terms:
322
+ tt = (t or "").strip()
323
+ if not tt:
324
+ continue
325
+ low = tt.lower()
326
+ if low in seen:
327
+ continue
328
+ seen.add(low)
329
+ out.append(tt)
330
+ return out
331
+
332
+
333
+ # =============================
334
+ # OpenAI client + calls
335
+ # =============================
336
+ def get_openai_client(api_key: str) -> OpenAI:
337
+ if OpenAI is None:
338
+ raise RuntimeError("openai package not installed in toxra_core runtime.")
339
+ key = (api_key or "").strip() or os.getenv("OPENAI_API_KEY", "").strip()
340
+ if not key:
341
+ raise ValueError("Missing OpenAI API key. Provide it or set OPENAI_API_KEY.")
342
+ return OpenAI(api_key=key)
343
+
344
+
345
+ def openai_structured_extract(
346
+ client: OpenAI,
347
+ model: str,
348
+ schema: Dict[str, Any],
349
+ system_prompt: str,
350
+ user_prompt: str,
351
+ ) -> Dict[str, Any]:
352
+ resp = client.responses.create(
353
+ model=model,
354
+ input=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}],
355
+ response_format={"type": "json_schema", "json_schema": {"name": "toxra_extraction", "schema": schema, "strict": True}},
356
+ )
357
+ txt = (resp.output_text or "").strip()
358
+ return json.loads(txt)
359
+
360
+
361
+ # =============================
362
+ # Optional: quick chemical scan (robust row-mode seed)
363
+ # =============================
364
+ def quick_chem_scan(
365
+ client: OpenAI,
366
+ model: str,
367
+ context: str,
368
+ ) -> Dict[str, Any]:
369
+ chem_schema = {
370
+ "type": "object",
371
+ "properties": {
372
+ "chemicals": {
373
+ "type": "array",
374
+ "items": {
375
+ "type": "object",
376
+ "properties": {"name": {"type": "string"}, "cas": {"type": ["string", "null"]}},
377
+ "required": ["name", "cas"],
378
+ "additionalProperties": False,
379
+ },
380
+ },
381
+ "notes": {"type": "string"},
382
+ },
383
+ "required": ["chemicals", "notes"],
384
+ "additionalProperties": False,
385
+ }
386
+
387
+ sys = (
388
+ "Extract primary chemical names mentioned in the provided text. "
389
+ "Return up to 10 chemicals. Stay grounded; if unsure, omit."
390
+ )
391
+ user = f"TEXT:\n{context}\n\nReturn JSON per schema."
392
+
393
+ out = openai_structured_extract(client, model, chem_schema, sys, user)
394
+ return out if isinstance(out, dict) else {"chemicals": [], "notes": "invalid"}
395
+
396
+
397
+ # =============================
398
+ # Evidence verification
399
+ # =============================
400
+ def verify_evidence_quotes(evidence: List[Dict[str, Any]], selected_chunks: List[Chunk]) -> Dict[str, Any]:
401
+ hay = "\n".join([c.text for c in selected_chunks]).lower()
402
+ bad = 0
403
+ for e in evidence:
404
+ q = (e.get("quote") or "").strip().lower()
405
+ if q and q not in hay:
406
+ bad += 1
407
+ return {"evidence_items": len(evidence), "unverified_quotes": bad}
408
+
409
+
410
+ # =============================
411
+ # Normalization / post-processing
412
+ # =============================
413
+ def normalize_record(rec: Dict[str, Any], file_name: str, fallback_row_mode: str) -> Dict[str, Any]:
414
+ rec = dict(rec or {})
415
+ rec["file"] = rec.get("file") or file_name
416
+
417
+ rm = rec.get("row_mode") or fallback_row_mode
418
+ if rm not in ("one_row_per_paper", "one_row_per_chemical_endpoint"):
419
+ rm = fallback_row_mode
420
+ rec["row_mode"] = rm
421
+
422
+ if "chemical" not in rec:
423
+ rec["chemical"] = None
424
+ if "endpoint" not in rec:
425
+ rec["endpoint"] = None
426
+
427
+ if "risk_stance" in rec and rec["risk_stance"] is not None:
428
+ if rec["risk_stance"] not in RISK_STANCE_ENUM:
429
+ rec["risk_stance"] = "insufficient_data"
430
+
431
+ if "risk_confidence" in rec and rec["risk_confidence"] is not None:
432
+ try:
433
+ x = float(rec["risk_confidence"])
434
+ rec["risk_confidence"] = max(0.0, min(1.0, x))
435
+ except Exception:
436
+ rec["risk_confidence"] = None
437
+
438
+ # Clean "null" strings
439
+ for k, v in list(rec.items()):
440
+ if isinstance(v, str) and v.strip().lower() == "null":
441
+ rec[k] = None
442
+
443
+ return rec
444
+
445
+
446
+ def cap_records(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
447
+ return records[:DEFAULT_MAX_RECORDS_PER_PDF] if len(records) > DEFAULT_MAX_RECORDS_PER_PDF else records
448
+
449
+
450
+ def build_overview_df(records: List[Dict[str, Any]]) -> pd.DataFrame:
451
+ if not records:
452
+ return pd.DataFrame(columns=["file", "paper_title", "risk_stance", "risk_confidence", "row_mode", "chemical", "endpoint"])
453
+ df = pd.DataFrame(records)
454
+ cols = [c for c in ["file", "paper_title", "risk_stance", "risk_confidence", "row_mode", "chemical", "endpoint"] if c in df.columns]
455
+ if "chemicals" in df.columns and "chemical" not in cols:
456
+ cols.append("chemicals")
457
+ return df[cols].copy() if cols else df.head(50)
458
+
459
+
460
+ # =============================
461
+ # Entrypoint called by app.py
462
+ # =============================
463
+ def run_extraction(
464
+ files,
465
+ api_key: str,
466
+ model: str,
467
+ max_pages: int,
468
+ max_context_chars: int,
469
+ endpoint_families: List[str],
470
+ oecd_tgs: List[str],
471
+ vocab_json: str,
472
+ spec_json: str,
473
+ ) -> Tuple[Dict[str, Any], str, pd.DataFrame, str, str]:
474
+ """
475
+ Returns:
476
+ run_state (dict), status (str), overview_df (pd.DataFrame), csv_path (str), details_path (str)
477
+ """
478
+ if not files:
479
+ empty = {"records": [], "evidence": [], "details": []}
480
+ return empty, "Upload at least one PDF.", build_overview_df([]), "", ""
481
+
482
+ vocab, spec = parse_admin_json(vocab_json, spec_json)
483
+ schema = build_output_schema(vocab, spec)
484
+ client = get_openai_client(api_key)
485
+
486
+ query_terms = keyword_terms_for_selection(endpoint_families, oecd_tgs, vocab)
487
+
488
+ system_prompt = (
489
+ "You are a toxicology literature extraction assistant for an industry safety assessor.\n"
490
+ "Hard rules:\n"
491
+ "1) Stay strictly grounded to provided PAGE text. If missing, use null or 'not_reported'.\n"
492
+ "2) Neutral synthesis only; do not over-interpret.\n"
493
+ "3) Row-mode policy:\n"
494
+ " - If paper focuses on a single primary chemical => one_row_per_paper.\n"
495
+ " - If multiple chemicals => one_row_per_chemical_endpoint.\n"
496
+ "4) Endpoint filtering:\n"
497
+ " - Only include endpoint-related records for user-selected endpoint families / OECD TGs.\n"
498
+ " - If TGs are provided, prefer them; do not invent TG numbers.\n"
499
+ "5) Evidence:\n"
500
+ " - Provide evidence quotes with page numbers for key fields (risk_stance, risk_summary, key_findings, conclusion, OECD TG fields).\n"
501
+ "6) For one_row_per_chemical_endpoint:\n"
502
+ " - Each record should map to ONE chemical and ONE endpoint (family or TG).\n"
503
+ " - Put the chemical name in 'chemical' and the endpoint label in 'endpoint'.\n"
504
+ "7) For one_row_per_paper:\n"
505
+ " - Use 'chemical' only if the primary chemical is explicit; 'endpoint' may be null.\n"
506
+ )
507
+
508
+ all_records: List[Dict[str, Any]] = []
509
+ all_evidence: List[Dict[str, Any]] = []
510
+ details: List[Dict[str, Any]] = []
511
+
512
+ for f in files:
513
+ pdf_path = f.name
514
+ file_name = os.path.basename(pdf_path)
515
+
516
+ pages, total_pages = extract_pages(pdf_path, max_pages=max_pages)
517
+ if not is_text_based(pages):
518
+ rec = {"file": file_name, "row_mode": "one_row_per_paper", "chemical": None, "endpoint": None}
519
+ for row in spec:
520
+ fld = (row.get("field") or "").strip()
521
+ if not fld:
522
+ continue
523
+ rec[fld] = "insufficient_data" if fld == "risk_stance" else None
524
+ all_records.append(rec)
525
+ details.append({"file": file_name, "text_based": False, "pages_total": total_pages, "pages_indexed": 0, "reason": "no_extractable_text"})
526
+ continue
527
+
528
+ chunks = chunk_pages(pages, chunk_size=DEFAULT_CHUNK_SIZE, overlap=DEFAULT_CHUNK_OVERLAP)
529
+ selected_chunks, sel_debug = select_chunks(chunks, max_context_chars=max_context_chars, query_terms=query_terms)
530
+ context = build_context(selected_chunks, file_name=file_name)
531
+
532
+ # heuristic seed using CAS hits
533
+ cas_hits = sorted(list({m.group(0) for _, t in pages for m in CAS_RE.finditer(t or "")}))
534
+ fallback_row_mode = "one_row_per_paper" if len(cas_hits) <= 1 else "one_row_per_chemical_endpoint"
535
+
536
+ # optional LLM chem scan to seed row-mode more robustly
537
+ chem_scan = {"chemicals": [], "notes": "disabled"}
538
+ if ENABLE_CHEM_SCAN:
539
+ # keep scan context smaller
540
+ scan_ctx = context[: min(len(context), 12000)]
541
+ try:
542
+ chem_scan = quick_chem_scan(client, model, scan_ctx)
543
+ names = [c.get("name") for c in (chem_scan.get("chemicals") or []) if isinstance(c, dict)]
544
+ names = [n for n in names if isinstance(n, str) and n.strip()]
545
+ if len(names) > 1:
546
+ fallback_row_mode = "one_row_per_chemical_endpoint"
547
+ except Exception as e:
548
+ chem_scan = {"chemicals": [], "notes": f"scan_failed: {e}"}
549
+
550
+ user_prompt = (
551
+ f"USER_SELECTED_ENDPOINTS:\n{json.dumps({'families': endpoint_families or [], 'oecd_tgs': oecd_tgs or []}, indent=2)}\n\n"
552
+ f"CHEM_SCAN_HINT:\n{json.dumps(chem_scan, indent=2)}\n\n"
553
+ f"FIELD_SPEC:\n{json.dumps(spec, indent=2)}\n\n"
554
+ f"PAGE_TEXT:\n{context}\n\n"
555
+ "Return JSON matching the schema."
556
+ )
557
+
558
+ t0 = time.time()
559
+ parsed = openai_structured_extract(client, model, schema, system_prompt, user_prompt)
560
+ dt = time.time() - t0
561
+
562
+ recs = cap_records([(r or {}) for r in (parsed.get("records") or [])])
563
+ ev = (parsed.get("evidence") or []) if isinstance(parsed.get("evidence"), list) else []
564
+
565
+ recs2 = [normalize_record(r, file_name, fallback_row_mode) for r in recs]
566
+
567
+ base_index = len(all_records)
568
+ all_records.extend(recs2)
569
+
570
+ ev2: List[Dict[str, Any]] = []
571
+ for e in ev:
572
+ if not isinstance(e, dict):
573
+ continue
574
+ try:
575
+ ridx = int(e.get("record_index", 0))
576
+ except Exception:
577
+ ridx = 0
578
+ e2 = dict(e)
579
+ e2["record_index"] = base_index + max(0, min(ridx, len(recs2) - 1 if recs2 else 0))
580
+ try:
581
+ e2["page"] = int(e2.get("page", 0))
582
+ except Exception:
583
+ e2["page"] = 0
584
+ e2["field"] = str(e2.get("field", ""))
585
+ e2["quote"] = str(e2.get("quote", "")).strip()
586
+ if e2["quote"]:
587
+ ev2.append(e2)
588
+
589
+ all_evidence.extend(ev2)
590
+ ver = verify_evidence_quotes(ev2, selected_chunks)
591
+
592
+ details.append({
593
+ "file": file_name,
594
+ "text_based": True,
595
+ "pages_total": total_pages,
596
+ "pages_indexed": min(total_pages, max_pages) if max_pages and max_pages > 0 else total_pages,
597
+ "chunk_size": DEFAULT_CHUNK_SIZE,
598
+ "chunk_overlap": DEFAULT_CHUNK_OVERLAP,
599
+ "selection": sel_debug,
600
+ "runtime_s": round(dt, 2),
601
+ "cas_hits": cas_hits[:30],
602
+ "chem_scan_notes": chem_scan.get("notes", ""),
603
+ "evidence_verification": ver,
604
+ "notes": parsed.get("notes", ""),
605
+ })
606
+
607
+ overview_df = build_overview_df(all_records)
608
+
609
+ ts = int(time.time())
610
+ csv_path = f"/tmp/toxra_extraction_{ts}.csv"
611
+ details_path = f"/tmp/toxra_details_{ts}.json"
612
+
613
+ pd.DataFrame(all_records).to_csv(csv_path, index=False)
614
+ with open(details_path, "w", encoding="utf-8") as f:
615
+ json.dump({"records": all_records, "evidence": all_evidence, "details": details}, f, indent=2)
616
+
617
+ status = f"✅ Extracted {len(all_records)} record(s) from {len(files)} PDF(s)."
618
+ run_state = {"records": all_records, "evidence": all_evidence, "details": details, "csv_path": csv_path, "details_path": details_path}
619
+ return run_state, status, overview_df, csv_path, details_path