import json from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import pandas as pd from .contracts import ClauseEvaluation, RegulatoryClause FRAMEWORK_TO_FILE = { "FDA CTP": "fda_ctp_v2024_06.json", "EPA": "epa_cancer_v2005.json", } EMPTY_STRINGS = {"", "not_reported", "insufficient_data", "none", "na", "n/a", "null"} def _is_non_empty(v: Any) -> bool: if v is None: return False if isinstance(v, list): vals = [str(x).strip() for x in v if str(x).strip()] if not vals: return False return not all(x.lower() in EMPTY_STRINGS for x in vals) s = str(v).strip() if not s: return False return s.lower() not in EMPTY_STRINGS def _normalize_payload(extraction_payload: Any) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: if isinstance(extraction_payload, dict): papers = extraction_payload.get("papers", []) if isinstance(papers, list): ext = extraction_payload.get("toxra_extensions", {}) return papers, (ext if isinstance(ext, dict) else {}) if isinstance(extraction_payload, list): return extraction_payload, {} raise ValueError("Unsupported extraction payload format. Expected list or object with papers.") def load_framework_catalog(framework: str, catalog_dir: str = "regulatory_catalog") -> List[RegulatoryClause]: fname = FRAMEWORK_TO_FILE.get(framework) if not fname: raise ValueError(f"Unsupported framework: {framework}") path = Path(catalog_dir) / fname if not path.exists(): raise FileNotFoundError(f"Catalog not found: {path}") data = json.loads(path.read_text(encoding="utf-8")) clauses = data.get("clauses", []) if isinstance(data, dict) else [] out: List[RegulatoryClause] = [] for c in clauses: out.append( RegulatoryClause( clause_id=str(c.get("clause_id", "")).strip(), framework=str(c.get("framework", framework)).strip(), title=str(c.get("title", "")).strip(), description=str(c.get("description", "")).strip(), required_fields=list(c.get("required_fields", []) or []), required_evidence_terms=list(c.get("required_evidence_terms", []) or []), acceptance_rule=str(c.get("acceptance_rule", "all_required_fields")).strip(), applicability=dict(c.get("applicability", {}) or {}), source_reference=str(c.get("source_reference", "")).strip(), ) ) return out def _clause_applicable(extracted: Dict[str, Any], clause: RegulatoryClause) -> bool: app = clause.applicability or {} if not app: return True field = str(app.get("field", "")).strip() equals = app.get("equals", None) if not field: return True val = extracted.get(field) if isinstance(val, list): vals = [str(x).strip().lower() for x in val] return str(equals).strip().lower() in vals return str(val).strip().lower() == str(equals).strip().lower() def _evaluate_clause( extracted: Dict[str, Any], evidence: List[Dict[str, Any]], clause: RegulatoryClause, override_notes: str = "", ) -> ClauseEvaluation: if not _clause_applicable(extracted, clause): return ClauseEvaluation( clause_id=clause.clause_id, framework=clause.framework, status="not_applicable", reason="Applicability condition not met.", ) present: List[str] = [] missing: List[str] = [] for f in clause.required_fields: if _is_non_empty(extracted.get(f)): present.append(f) else: missing.append(f) evidence_hits: List[str] = [] ev_text = " ".join([str(x.get("quote", "")) for x in evidence]).lower() for term in clause.required_evidence_terms: t = str(term).strip().lower() if t and t in ev_text: evidence_hits.append(term) if clause.required_fields: if clause.acceptance_rule == "any_required_fields": field_ok = len(present) > 0 else: field_ok = len(missing) == 0 else: field_ok = True evidence_ok = True if clause.required_evidence_terms: evidence_ok = len(evidence_hits) > 0 if field_ok and evidence_ok: status = "covered" elif present or evidence_hits: status = "partial" else: status = "missing" missing_prompt = "" if status in {"missing", "partial"}: need_fields = ", ".join(missing) if missing else "additional corroborating evidence" missing_prompt = ( f"Provide evidence for clause {clause.clause_id} ({clause.title}). " f"Missing: {need_fields}." ) if override_notes.strip(): missing_prompt += f" Notes: {override_notes.strip()}" return ClauseEvaluation( clause_id=clause.clause_id, framework=clause.framework, status=status, fields_present=present, missing_fields=missing, evidence_hits=evidence_hits, prompt=missing_prompt, reason="", ) def _paper_record_id(paper: Dict[str, Any]) -> str: file_name = str(paper.get("_file", "unknown.pdf")) extracted = paper.get("extracted", {}) or {} chems = extracted.get("chemicals", []) chem = "-" if isinstance(chems, list) and chems: chem = str(chems[0]).strip() or "-" return f"{file_name} | {chem} | Paper" def map_extraction_to_framework( extraction_payload: Any, framework: str, catalog_dir: str = "regulatory_catalog", override_notes: str = "", ) -> Tuple[pd.DataFrame, Dict[str, Any], str]: papers, existing_ext = _normalize_payload(extraction_payload) clauses = load_framework_catalog(framework, catalog_dir=catalog_dir) rows: List[Dict[str, Any]] = [] status_counts = {"covered": 0, "partial": 0, "missing": 0, "not_applicable": 0} prompts: List[str] = [] for p in papers: extracted = p.get("extracted", {}) or {} evidence = p.get("evidence", []) or [] rec_id = _paper_record_id(p) file_name = str(p.get("_file", "")) title = str(p.get("paper_title", "")) for clause in clauses: ev = _evaluate_clause(extracted, evidence, clause, override_notes=override_notes) status_counts[ev.status] = status_counts.get(ev.status, 0) + 1 if ev.prompt: prompts.append(ev.prompt) rows.append( { "framework": framework, "clause_id": clause.clause_id, "clause_title": clause.title, "file": file_name, "paper_title": title, "record_id": rec_id, "status": ev.status, "fields_present": "; ".join(ev.fields_present), "missing_fields": "; ".join(ev.missing_fields), "evidence_hits": "; ".join(ev.evidence_hits), "prompt": ev.prompt, "source_reference": clause.source_reference, } ) df = pd.DataFrame( rows, columns=[ "framework", "clause_id", "clause_title", "file", "paper_title", "record_id", "status", "fields_present", "missing_fields", "evidence_hits", "prompt", "source_reference", ], ) report = { "framework": framework, "summary": status_counts, "missing_prompts": prompts, "existing_toxra_extensions": existing_ext, } md_lines = [ f"# {framework} Regulatory Gap Assessment", "", "## Status Summary", f"- Covered: {status_counts.get('covered', 0)}", f"- Partial: {status_counts.get('partial', 0)}", f"- Missing: {status_counts.get('missing', 0)}", f"- Not applicable: {status_counts.get('not_applicable', 0)}", "", "## Priority Data Gaps", ] if prompts: for p in prompts[:50]: md_lines.append(f"- {p}") else: md_lines.append("- No immediate gaps identified.") markdown = "\n".join(md_lines) return df, report, markdown