NLP_Project / toxra_core /regulatory_mapper.py
hchevva's picture
Upload 43 files
630d650 verified
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from .contracts import ClauseEvaluation, RegulatoryClause
FRAMEWORK_TO_FILE = {
"FDA CTP": "fda_ctp_v2024_06.json",
"EPA": "epa_cancer_v2005.json",
}
EMPTY_STRINGS = {"", "not_reported", "insufficient_data", "none", "na", "n/a", "null"}
def _is_non_empty(v: Any) -> bool:
if v is None:
return False
if isinstance(v, list):
vals = [str(x).strip() for x in v if str(x).strip()]
if not vals:
return False
return not all(x.lower() in EMPTY_STRINGS for x in vals)
s = str(v).strip()
if not s:
return False
return s.lower() not in EMPTY_STRINGS
def _normalize_payload(extraction_payload: Any) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
if isinstance(extraction_payload, dict):
papers = extraction_payload.get("papers", [])
if isinstance(papers, list):
ext = extraction_payload.get("toxra_extensions", {})
return papers, (ext if isinstance(ext, dict) else {})
if isinstance(extraction_payload, list):
return extraction_payload, {}
raise ValueError("Unsupported extraction payload format. Expected list or object with papers.")
def load_framework_catalog(framework: str, catalog_dir: str = "regulatory_catalog") -> List[RegulatoryClause]:
fname = FRAMEWORK_TO_FILE.get(framework)
if not fname:
raise ValueError(f"Unsupported framework: {framework}")
path = Path(catalog_dir) / fname
if not path.exists():
raise FileNotFoundError(f"Catalog not found: {path}")
data = json.loads(path.read_text(encoding="utf-8"))
clauses = data.get("clauses", []) if isinstance(data, dict) else []
out: List[RegulatoryClause] = []
for c in clauses:
out.append(
RegulatoryClause(
clause_id=str(c.get("clause_id", "")).strip(),
framework=str(c.get("framework", framework)).strip(),
title=str(c.get("title", "")).strip(),
description=str(c.get("description", "")).strip(),
required_fields=list(c.get("required_fields", []) or []),
required_evidence_terms=list(c.get("required_evidence_terms", []) or []),
acceptance_rule=str(c.get("acceptance_rule", "all_required_fields")).strip(),
applicability=dict(c.get("applicability", {}) or {}),
source_reference=str(c.get("source_reference", "")).strip(),
)
)
return out
def _clause_applicable(extracted: Dict[str, Any], clause: RegulatoryClause) -> bool:
app = clause.applicability or {}
if not app:
return True
field = str(app.get("field", "")).strip()
equals = app.get("equals", None)
if not field:
return True
val = extracted.get(field)
if isinstance(val, list):
vals = [str(x).strip().lower() for x in val]
return str(equals).strip().lower() in vals
return str(val).strip().lower() == str(equals).strip().lower()
def _evaluate_clause(
extracted: Dict[str, Any],
evidence: List[Dict[str, Any]],
clause: RegulatoryClause,
override_notes: str = "",
) -> ClauseEvaluation:
if not _clause_applicable(extracted, clause):
return ClauseEvaluation(
clause_id=clause.clause_id,
framework=clause.framework,
status="not_applicable",
reason="Applicability condition not met.",
)
present: List[str] = []
missing: List[str] = []
for f in clause.required_fields:
if _is_non_empty(extracted.get(f)):
present.append(f)
else:
missing.append(f)
evidence_hits: List[str] = []
ev_text = " ".join([str(x.get("quote", "")) for x in evidence]).lower()
for term in clause.required_evidence_terms:
t = str(term).strip().lower()
if t and t in ev_text:
evidence_hits.append(term)
if clause.required_fields:
if clause.acceptance_rule == "any_required_fields":
field_ok = len(present) > 0
else:
field_ok = len(missing) == 0
else:
field_ok = True
evidence_ok = True
if clause.required_evidence_terms:
evidence_ok = len(evidence_hits) > 0
if field_ok and evidence_ok:
status = "covered"
elif present or evidence_hits:
status = "partial"
else:
status = "missing"
missing_prompt = ""
if status in {"missing", "partial"}:
need_fields = ", ".join(missing) if missing else "additional corroborating evidence"
missing_prompt = (
f"Provide evidence for clause {clause.clause_id} ({clause.title}). "
f"Missing: {need_fields}."
)
if override_notes.strip():
missing_prompt += f" Notes: {override_notes.strip()}"
return ClauseEvaluation(
clause_id=clause.clause_id,
framework=clause.framework,
status=status,
fields_present=present,
missing_fields=missing,
evidence_hits=evidence_hits,
prompt=missing_prompt,
reason="",
)
def _paper_record_id(paper: Dict[str, Any]) -> str:
file_name = str(paper.get("_file", "unknown.pdf"))
extracted = paper.get("extracted", {}) or {}
chems = extracted.get("chemicals", [])
chem = "-"
if isinstance(chems, list) and chems:
chem = str(chems[0]).strip() or "-"
return f"{file_name} | {chem} | Paper"
def map_extraction_to_framework(
extraction_payload: Any,
framework: str,
catalog_dir: str = "regulatory_catalog",
override_notes: str = "",
) -> Tuple[pd.DataFrame, Dict[str, Any], str]:
papers, existing_ext = _normalize_payload(extraction_payload)
clauses = load_framework_catalog(framework, catalog_dir=catalog_dir)
rows: List[Dict[str, Any]] = []
status_counts = {"covered": 0, "partial": 0, "missing": 0, "not_applicable": 0}
prompts: List[str] = []
for p in papers:
extracted = p.get("extracted", {}) or {}
evidence = p.get("evidence", []) or []
rec_id = _paper_record_id(p)
file_name = str(p.get("_file", ""))
title = str(p.get("paper_title", ""))
for clause in clauses:
ev = _evaluate_clause(extracted, evidence, clause, override_notes=override_notes)
status_counts[ev.status] = status_counts.get(ev.status, 0) + 1
if ev.prompt:
prompts.append(ev.prompt)
rows.append(
{
"framework": framework,
"clause_id": clause.clause_id,
"clause_title": clause.title,
"file": file_name,
"paper_title": title,
"record_id": rec_id,
"status": ev.status,
"fields_present": "; ".join(ev.fields_present),
"missing_fields": "; ".join(ev.missing_fields),
"evidence_hits": "; ".join(ev.evidence_hits),
"prompt": ev.prompt,
"source_reference": clause.source_reference,
}
)
df = pd.DataFrame(
rows,
columns=[
"framework",
"clause_id",
"clause_title",
"file",
"paper_title",
"record_id",
"status",
"fields_present",
"missing_fields",
"evidence_hits",
"prompt",
"source_reference",
],
)
report = {
"framework": framework,
"summary": status_counts,
"missing_prompts": prompts,
"existing_toxra_extensions": existing_ext,
}
md_lines = [
f"# {framework} Regulatory Gap Assessment",
"",
"## Status Summary",
f"- Covered: {status_counts.get('covered', 0)}",
f"- Partial: {status_counts.get('partial', 0)}",
f"- Missing: {status_counts.get('missing', 0)}",
f"- Not applicable: {status_counts.get('not_applicable', 0)}",
"",
"## Priority Data Gaps",
]
if prompts:
for p in prompts[:50]:
md_lines.append(f"- {p}")
else:
md_lines.append("- No immediate gaps identified.")
markdown = "\n".join(md_lines)
return df, report, markdown