Spaces:
Runtime error
Runtime error
| import json | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import pandas as pd | |
| from .contracts import ClauseEvaluation, RegulatoryClause | |
| FRAMEWORK_TO_FILE = { | |
| "FDA CTP": "fda_ctp_v2024_06.json", | |
| "EPA": "epa_cancer_v2005.json", | |
| } | |
| EMPTY_STRINGS = {"", "not_reported", "insufficient_data", "none", "na", "n/a", "null"} | |
| def _is_non_empty(v: Any) -> bool: | |
| if v is None: | |
| return False | |
| if isinstance(v, list): | |
| vals = [str(x).strip() for x in v if str(x).strip()] | |
| if not vals: | |
| return False | |
| return not all(x.lower() in EMPTY_STRINGS for x in vals) | |
| s = str(v).strip() | |
| if not s: | |
| return False | |
| return s.lower() not in EMPTY_STRINGS | |
| def _normalize_payload(extraction_payload: Any) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: | |
| if isinstance(extraction_payload, dict): | |
| papers = extraction_payload.get("papers", []) | |
| if isinstance(papers, list): | |
| ext = extraction_payload.get("toxra_extensions", {}) | |
| return papers, (ext if isinstance(ext, dict) else {}) | |
| if isinstance(extraction_payload, list): | |
| return extraction_payload, {} | |
| raise ValueError("Unsupported extraction payload format. Expected list or object with papers.") | |
| def load_framework_catalog(framework: str, catalog_dir: str = "regulatory_catalog") -> List[RegulatoryClause]: | |
| fname = FRAMEWORK_TO_FILE.get(framework) | |
| if not fname: | |
| raise ValueError(f"Unsupported framework: {framework}") | |
| path = Path(catalog_dir) / fname | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Catalog not found: {path}") | |
| data = json.loads(path.read_text(encoding="utf-8")) | |
| clauses = data.get("clauses", []) if isinstance(data, dict) else [] | |
| out: List[RegulatoryClause] = [] | |
| for c in clauses: | |
| out.append( | |
| RegulatoryClause( | |
| clause_id=str(c.get("clause_id", "")).strip(), | |
| framework=str(c.get("framework", framework)).strip(), | |
| title=str(c.get("title", "")).strip(), | |
| description=str(c.get("description", "")).strip(), | |
| required_fields=list(c.get("required_fields", []) or []), | |
| required_evidence_terms=list(c.get("required_evidence_terms", []) or []), | |
| acceptance_rule=str(c.get("acceptance_rule", "all_required_fields")).strip(), | |
| applicability=dict(c.get("applicability", {}) or {}), | |
| source_reference=str(c.get("source_reference", "")).strip(), | |
| ) | |
| ) | |
| return out | |
| def _clause_applicable(extracted: Dict[str, Any], clause: RegulatoryClause) -> bool: | |
| app = clause.applicability or {} | |
| if not app: | |
| return True | |
| field = str(app.get("field", "")).strip() | |
| equals = app.get("equals", None) | |
| if not field: | |
| return True | |
| val = extracted.get(field) | |
| if isinstance(val, list): | |
| vals = [str(x).strip().lower() for x in val] | |
| return str(equals).strip().lower() in vals | |
| return str(val).strip().lower() == str(equals).strip().lower() | |
| def _evaluate_clause( | |
| extracted: Dict[str, Any], | |
| evidence: List[Dict[str, Any]], | |
| clause: RegulatoryClause, | |
| override_notes: str = "", | |
| ) -> ClauseEvaluation: | |
| if not _clause_applicable(extracted, clause): | |
| return ClauseEvaluation( | |
| clause_id=clause.clause_id, | |
| framework=clause.framework, | |
| status="not_applicable", | |
| reason="Applicability condition not met.", | |
| ) | |
| present: List[str] = [] | |
| missing: List[str] = [] | |
| for f in clause.required_fields: | |
| if _is_non_empty(extracted.get(f)): | |
| present.append(f) | |
| else: | |
| missing.append(f) | |
| evidence_hits: List[str] = [] | |
| ev_text = " ".join([str(x.get("quote", "")) for x in evidence]).lower() | |
| for term in clause.required_evidence_terms: | |
| t = str(term).strip().lower() | |
| if t and t in ev_text: | |
| evidence_hits.append(term) | |
| if clause.required_fields: | |
| if clause.acceptance_rule == "any_required_fields": | |
| field_ok = len(present) > 0 | |
| else: | |
| field_ok = len(missing) == 0 | |
| else: | |
| field_ok = True | |
| evidence_ok = True | |
| if clause.required_evidence_terms: | |
| evidence_ok = len(evidence_hits) > 0 | |
| if field_ok and evidence_ok: | |
| status = "covered" | |
| elif present or evidence_hits: | |
| status = "partial" | |
| else: | |
| status = "missing" | |
| missing_prompt = "" | |
| if status in {"missing", "partial"}: | |
| need_fields = ", ".join(missing) if missing else "additional corroborating evidence" | |
| missing_prompt = ( | |
| f"Provide evidence for clause {clause.clause_id} ({clause.title}). " | |
| f"Missing: {need_fields}." | |
| ) | |
| if override_notes.strip(): | |
| missing_prompt += f" Notes: {override_notes.strip()}" | |
| return ClauseEvaluation( | |
| clause_id=clause.clause_id, | |
| framework=clause.framework, | |
| status=status, | |
| fields_present=present, | |
| missing_fields=missing, | |
| evidence_hits=evidence_hits, | |
| prompt=missing_prompt, | |
| reason="", | |
| ) | |
| def _paper_record_id(paper: Dict[str, Any]) -> str: | |
| file_name = str(paper.get("_file", "unknown.pdf")) | |
| extracted = paper.get("extracted", {}) or {} | |
| chems = extracted.get("chemicals", []) | |
| chem = "-" | |
| if isinstance(chems, list) and chems: | |
| chem = str(chems[0]).strip() or "-" | |
| return f"{file_name} | {chem} | Paper" | |
| def map_extraction_to_framework( | |
| extraction_payload: Any, | |
| framework: str, | |
| catalog_dir: str = "regulatory_catalog", | |
| override_notes: str = "", | |
| ) -> Tuple[pd.DataFrame, Dict[str, Any], str]: | |
| papers, existing_ext = _normalize_payload(extraction_payload) | |
| clauses = load_framework_catalog(framework, catalog_dir=catalog_dir) | |
| rows: List[Dict[str, Any]] = [] | |
| status_counts = {"covered": 0, "partial": 0, "missing": 0, "not_applicable": 0} | |
| prompts: List[str] = [] | |
| for p in papers: | |
| extracted = p.get("extracted", {}) or {} | |
| evidence = p.get("evidence", []) or [] | |
| rec_id = _paper_record_id(p) | |
| file_name = str(p.get("_file", "")) | |
| title = str(p.get("paper_title", "")) | |
| for clause in clauses: | |
| ev = _evaluate_clause(extracted, evidence, clause, override_notes=override_notes) | |
| status_counts[ev.status] = status_counts.get(ev.status, 0) + 1 | |
| if ev.prompt: | |
| prompts.append(ev.prompt) | |
| rows.append( | |
| { | |
| "framework": framework, | |
| "clause_id": clause.clause_id, | |
| "clause_title": clause.title, | |
| "file": file_name, | |
| "paper_title": title, | |
| "record_id": rec_id, | |
| "status": ev.status, | |
| "fields_present": "; ".join(ev.fields_present), | |
| "missing_fields": "; ".join(ev.missing_fields), | |
| "evidence_hits": "; ".join(ev.evidence_hits), | |
| "prompt": ev.prompt, | |
| "source_reference": clause.source_reference, | |
| } | |
| ) | |
| df = pd.DataFrame( | |
| rows, | |
| columns=[ | |
| "framework", | |
| "clause_id", | |
| "clause_title", | |
| "file", | |
| "paper_title", | |
| "record_id", | |
| "status", | |
| "fields_present", | |
| "missing_fields", | |
| "evidence_hits", | |
| "prompt", | |
| "source_reference", | |
| ], | |
| ) | |
| report = { | |
| "framework": framework, | |
| "summary": status_counts, | |
| "missing_prompts": prompts, | |
| "existing_toxra_extensions": existing_ext, | |
| } | |
| md_lines = [ | |
| f"# {framework} Regulatory Gap Assessment", | |
| "", | |
| "## Status Summary", | |
| f"- Covered: {status_counts.get('covered', 0)}", | |
| f"- Partial: {status_counts.get('partial', 0)}", | |
| f"- Missing: {status_counts.get('missing', 0)}", | |
| f"- Not applicable: {status_counts.get('not_applicable', 0)}", | |
| "", | |
| "## Priority Data Gaps", | |
| ] | |
| if prompts: | |
| for p in prompts[:50]: | |
| md_lines.append(f"- {p}") | |
| else: | |
| md_lines.append("- No immediate gaps identified.") | |
| markdown = "\n".join(md_lines) | |
| return df, report, markdown | |