Spaces:
Running
Running
| import os | |
| import re | |
| import json | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Any, Optional | |
| import gradio as gr | |
| import numpy as np | |
| import pandas as pd | |
| from pypdf import PdfReader | |
| try: | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| except Exception: # pragma: no cover - fallback path for minimal runtime | |
| TfidfVectorizer = None | |
| from openai import OpenAI | |
| from literature_explorer import build_literature_explorer_tab | |
| from toxra_core.artifacts import make_run_dir, write_dataframe_csv, write_json, write_markdown | |
| from toxra_core.calculation_client import MCPClientError, run_batch_cancer_risk | |
| from toxra_core.contracts import CANCER_RISK_TEMPLATE_COLUMNS | |
| from toxra_core.nlp_pipeline import extract_evidence_span, expand_regulatory_queries, hybrid_rank_text_items | |
| from toxra_core.regulatory_mapper import map_extraction_to_framework | |
| # ============================= | |
| # UI theme | |
| # ============================= | |
| APP_CSS = """ | |
| @import url('https://fonts.googleapis.com/css2?family=IBM+Plex+Sans:wght@400;500;600;700&display=swap'); | |
| :root { | |
| --bg: #f5f7fb; | |
| --panel: #ffffff; | |
| --ink: #0f172a; | |
| --muted: #516079; | |
| --line: #e2e8f0; | |
| --accent: #2563eb; | |
| --accent-2: #0ea5e9; | |
| --accent-soft: #e6efff; | |
| --shadow: 0 10px 28px rgba(15, 23, 42, 0.08); | |
| --radius: 14px; | |
| } | |
| .gradio-container { | |
| background: var(--bg); | |
| color: var(--ink); | |
| font-family: "IBM Plex Sans", ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, "Helvetica Neue", Arial, "Noto Sans", "Apple Color Emoji", "Segoe UI Emoji"; | |
| } | |
| .hero { | |
| background: linear-gradient(180deg, #edf3ff 0%, #f4f8ff 100%); | |
| color: var(--ink); | |
| border-radius: 16px; | |
| padding: 18px 22px; | |
| box-shadow: var(--shadow); | |
| border: 1px solid #dbe5f4; | |
| display: flex; | |
| align-items: center; | |
| justify-content: space-between; | |
| gap: 16px; | |
| flex-wrap: wrap; | |
| } | |
| .hero-left { min-width: 240px; } | |
| .hero-right { margin-left: auto; } | |
| .hero-title { font-size: 22px; font-weight: 700; letter-spacing: 0.08em; } | |
| .hero-sub { margin-top: 4px; font-size: 13px; color: #3b4b63; } | |
| .hero-pills { margin-top: 10px; display: flex; gap: 8px; flex-wrap: wrap; } | |
| .hero-pill { | |
| background: var(--accent-soft); | |
| color: #1e3a8a; | |
| border: 1px solid #d6e3f6; | |
| border-radius: 999px; | |
| padding: 4px 10px; | |
| font-size: 11px; | |
| font-weight: 600; | |
| } | |
| .hero-status { | |
| background: #ffffff; | |
| color: #334155; | |
| border: 1px solid #d9e2ef; | |
| border-radius: 999px; | |
| padding: 6px 12px; | |
| font-size: 12px; | |
| font-weight: 600; | |
| box-shadow: 0 6px 16px rgba(15, 23, 42, 0.06); | |
| } | |
| .split-row { gap: 18px; } | |
| .card { | |
| background: var(--panel); | |
| border: 1px solid var(--line); | |
| border-radius: var(--radius); | |
| padding: 16px; | |
| box-shadow: var(--shadow); | |
| } | |
| .left-rail .card + .card { margin-top: 16px; } | |
| .right-panel .card { margin-bottom: 14px; } | |
| .section-title { | |
| font-size: 12px; | |
| text-transform: uppercase; | |
| letter-spacing: 0.14em; | |
| color: var(--muted); | |
| margin-bottom: 8px; | |
| } | |
| .gradio-container input, | |
| .gradio-container textarea, | |
| .gradio-container select { | |
| border-radius: 10px !important; | |
| border-color: var(--line) !important; | |
| } | |
| .gradio-container button.primary { | |
| background: var(--accent) !important; | |
| border-color: var(--accent) !important; | |
| } | |
| .gradio-container button.primary:hover { background: #1d4ed8 !important; } | |
| .gradio-container .tab-nav { gap: 8px; } | |
| .gradio-container .tab-nav button { | |
| background: var(--panel); | |
| border: 1px solid var(--line); | |
| border-radius: 999px; | |
| padding: 6px 14px; | |
| font-size: 12px; | |
| color: var(--muted); | |
| } | |
| .gradio-container .tab-nav button.selected { | |
| background: var(--accent); | |
| border-color: var(--accent); | |
| color: #ffffff; | |
| } | |
| .gradio-container .accordion { | |
| border: 1px solid var(--line); | |
| border-radius: var(--radius); | |
| } | |
| """ | |
| # ============================= | |
| # Defaults | |
| # ============================= | |
| DEFAULT_CONTROLLED_VOCAB_JSON = """{ | |
| "risk_stance_enum": ["acceptable","acceptable_with_uncertainty","not_acceptable","insufficient_data"], | |
| "fda_ctp_tier_enum": ["Tier_1_high_priority","Tier_2_moderate_priority","Tier_3_lower_priority","enough data is not available"], | |
| "approach_enum": ["in_vivo","in_vitro","in_silico","nams","mixed","not_reported"], | |
| "in_silico_method_enum": [ | |
| "qsar","read_across","molecular_docking","molecular_dynamics","pbpk_pbtK","aop_based","ml_model","other","not_reported" | |
| ], | |
| "nams_method_enum": [ | |
| "high_throughput_screening_hts","omics_transcriptomics","omics_proteomics","omics_metabolomics", | |
| "organ_on_chip","microphysiological_system_mps","3d_tissue_model","in_chemico_assay", | |
| "in_silico_as_nams","other","not_reported" | |
| ], | |
| "exposure_route_enum": ["oral","inhalation","dermal","parenteral","multiple","not_reported"], | |
| "species_enum": ["human","rat","mouse","rabbit","dog","non_human_primate","cell_line","other","not_reported"], | |
| "genotoxicity_oecd_tg_in_vitro_enum": [ | |
| "OECD_TG_471_Bacterial Reverse mutation test(AMES test)", | |
| "OECD_TG_473_In Vitro Mammalian Chromosomal Aberration Test", | |
| "OECD_TG_476_In Vitro Mammalian Cell Gene Mutation Tests (Hprt & xprt)", | |
| "OECD_TG_487_In Vitro Mammalian Cell Micronucleus Test", | |
| "OECD_TG_490_In Vitro Mammalian Cell Gene Mutation Tests (Thymidine Kinase)", | |
| "not_reported" | |
| ], | |
| "genotoxicity_oecd_tg_in_vivo_enum": [ | |
| "OECD_TG_474_In Vivo Mammalian Erythrocyte Micronucleus Test", | |
| "OECD_TG_475_Mammalian Bone Marrow Chromosomal Aberration Test", | |
| "OECD_TG_488_Transgenic Rodent Somatic & Germ Cell Gene Mutation Assays", | |
| "OECD_TG_489_In Vivo Mammalian Alkaline Comet Assay", | |
| "not_reported" | |
| ], | |
| "genotoxicity_result_enum": ["positive","negative","equivocal","not_reported"], | |
| "binary_result_enum": ["positive","negative","equivocal","not_reported"], | |
| "carcinogenicity_result_enum": ["carcinogenic","not_carcinogenic","insufficient_data","not_reported"] | |
| }""" | |
| # ============================= | |
| # Endpoint modules (what users choose) | |
| # ============================= | |
| PRESET_CORE = [ | |
| {"field": "chemicals", "type": "list[str]", "enum_values": "", "instructions": "List chemical(s) studied. If multiple, include each separately."}, | |
| {"field": "cas_numbers", "type": "list[str]", "enum_values": "", "instructions": "Extract CAS number(s) mentioned (may be multiple)."}, | |
| {"field": "study_type", "type": "enum", "enum_values": "in_vivo,in_vitro,epidemiology,in_silico,review,methodology,other,not_reported", "instructions": "Choose best match."}, | |
| {"field": "exposure_route", "type": "enum", "enum_values": "oral,inhalation,dermal,parenteral,multiple,not_reported", "instructions": "Choose best match."}, | |
| {"field": "species", "type": "enum", "enum_values": "human,rat,mouse,rabbit,dog,non_human_primate,cell_line,other,not_reported", "instructions": "Choose best match."}, | |
| {"field": "dose_metrics", "type": "list[str]", "enum_values": "", "instructions": "Capture NOAEL/LOAEL/BMD/BMDL/LD50/LC50 etc with units and route if available."}, | |
| {"field": "key_findings", "type": "str", "enum_values": "", "instructions": "2–4 short sentences summarizing major findings. Grounded to text."}, | |
| {"field": "conclusion", "type": "str", "enum_values": "", "instructions": "Paper's conclusion about safety/risk (grounded)."}, | |
| ] | |
| PRESET_NAMS_INSILICO = [ | |
| {"field": "approach", "type": "enum", "enum_values": "in_vivo,in_vitro,in_silico,nams,mixed,not_reported", "instructions": "Identify if results are in silico or NAMs; use mixed if multiple."}, | |
| {"field": "in_silico_methods", "type": "list[enum]", "enum_values": "qsar,read_across,molecular_docking,molecular_dynamics,pbpk_pbtK,aop_based,ml_model,other,not_reported", "instructions": "If in_silico, list methods used (multiple allowed)."}, | |
| {"field": "nams_methods", "type": "list[enum]", "enum_values": "high_throughput_screening_hts,omics_transcriptomics,omics_proteomics,omics_metabolomics,organ_on_chip,microphysiological_system_mps,3d_tissue_model,in_chemico_assay,in_silico_as_nams,other,not_reported", "instructions": "If NAMs, list methods used (multiple allowed)."}, | |
| {"field": "nams_or_insilico_key_results", "type": "str", "enum_values": "", "instructions": "Summarize in silico / NAMs results and key metrics (grounded)."}, | |
| ] | |
| PRESET_GENOTOX_OECD = [ | |
| { | |
| "field": "genotox_oecd_tg_in_vitro", | |
| "type": "list[enum]", | |
| "enum_values": "OECD_TG_471_Bacterial Reverse mutation test(AMES test),OECD_TG_473_In Vitro Mammalian Chromosomal Aberration Test,OECD_TG_476_In Vitro Mammalian Cell Gene Mutation Tests (Hprt & xprt),OECD_TG_487_In Vitro Mammalian Cell Micronucleus Test,OECD_TG_490_In Vitro Mammalian Cell Gene Mutation Tests (Thymidine Kinase),not_reported", | |
| "instructions": "Select all in vitro OECD TGs explicitly reported (or clearly described). If none, use not_reported." | |
| }, | |
| { | |
| "field": "genotox_oecd_tg_in_vivo", | |
| "type": "list[enum]", | |
| "enum_values": "OECD_TG_474_In Vivo Mammalian Erythrocyte Micronucleus Test,OECD_TG_475_Mammalian Bone Marrow Chromosomal Aberration Test,OECD_TG_488_Transgenic Rodent Somatic & Germ Cell Gene Mutation Assays,OECD_TG_489_In Vivo Mammalian Alkaline Comet Assay,not_reported", | |
| "instructions": "Select all in vivo OECD TGs explicitly reported (or clearly described). If none, use not_reported." | |
| }, | |
| { | |
| "field": "fda_ctp_carcinogenicity_tier", | |
| "type": "enum", | |
| "enum_values": "Tier_1_high_priority,Tier_2_moderate_priority,Tier_3_lower_priority,enough data is not available", | |
| "instructions": "Assign FDA CTP carcinogenicity/genotoxicity tier based strictly on provided evidence. If decision cannot be made from excerpts, use exactly: enough data is not available." | |
| }, | |
| {"field": "genotoxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Classify overall genotoxicity outcome as reported. If unclear, not_reported."}, | |
| {"field": "genotoxicity_result_notes", "type": "str", "enum_values": "", "instructions": "Short explanation grounded to text + test context (e.g., AMES, micronucleus)."}, | |
| ] | |
| PRESET_ACUTE_TOX = [ | |
| {"field": "acute_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "If acute toxicity is assessed, classify as positive/negative/equivocal; otherwise not_reported."}, | |
| {"field": "acute_toxicity_key_metrics", "type": "list[str]", "enum_values": "", "instructions": "Extract LD50/LC50/EC50/IC50 etc with units/route/species if available."}, | |
| {"field": "acute_toxicity_notes", "type": "str", "enum_values": "", "instructions": "Grounded summary of acute toxicity findings."}, | |
| ] | |
| PRESET_REPEATED_DOSE = [ | |
| {"field": "repeated_dose_noael_loael", "type": "list[str]", "enum_values": "", "instructions": "Extract NOAEL/LOAEL (and study duration) with units/route if available."}, | |
| {"field": "repeated_dose_target_organs", "type": "list[str]", "enum_values": "", "instructions": "List target organs/critical effects explicitly reported."}, | |
| {"field": "repeated_dose_notes", "type": "str", "enum_values": "", "instructions": "Grounded summary of repeated-dose toxicity conclusions."}, | |
| ] | |
| PRESET_IRR_SENS = [ | |
| {"field": "skin_irritation_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Skin irritation outcome (as reported)."}, | |
| {"field": "eye_irritation_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Eye irritation outcome (as reported)."}, | |
| {"field": "skin_sensitization_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Skin sensitization outcome (as reported)."}, | |
| {"field": "irritation_sensitization_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including method/model if stated."}, | |
| ] | |
| PRESET_REPRO_DEV = [ | |
| {"field": "reproductive_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Reproductive toxicity outcome (as reported)."}, | |
| {"field": "developmental_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Developmental toxicity outcome (as reported)."}, | |
| {"field": "repro_dev_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including endpoints and study design if stated."}, | |
| ] | |
| PRESET_CARCINOGENICITY = [ | |
| {"field": "carcinogenicity_result", "type": "enum", "enum_values": "carcinogenic,not_carcinogenic,insufficient_data,not_reported", "instructions": "As reported. If evidence insufficient, insufficient_data."}, | |
| {"field": "carcinogenicity_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including species, duration, tumor findings if stated."}, | |
| ] | |
| ENDPOINT_MODULES: Dict[str, List[Dict[str, Any]]] = { | |
| "Genotoxicity (OECD TG)": PRESET_GENOTOX_OECD, | |
| "NAMs / In Silico": PRESET_NAMS_INSILICO, | |
| "Acute toxicity": PRESET_ACUTE_TOX, | |
| "Repeated dose toxicity": PRESET_REPEATED_DOSE, | |
| "Irritation / Sensitization": PRESET_IRR_SENS, | |
| "Repro / Developmental": PRESET_REPRO_DEV, | |
| "Carcinogenicity": PRESET_CARCINOGENICITY, | |
| } | |
| # Endpoint presets (requested) | |
| ENDPOINT_PRESETS: Dict[str, List[str]] = { | |
| "Required – Safety Assessor": [ | |
| "Genotoxicity (OECD TG)", | |
| "Repeated dose toxicity", | |
| "Irritation / Sensitization", | |
| "Repro / Developmental", | |
| "Acute toxicity", | |
| ], | |
| "Core only (fast)": [], | |
| "Screening – NAMs + Genotox": ["NAMs / In Silico", "Genotoxicity (OECD TG)"], | |
| "Full – All endpoints": list(ENDPOINT_MODULES.keys()), | |
| } | |
| ENDPOINT_QUERY_HINTS: Dict[str, List[str]] = { | |
| "Genotoxicity (OECD TG)": ["genotoxicity", "mutagenicity", "AMES", "micronucleus", "comet assay", "chromosomal aberration", "OECD TG 471 473 476 487 490 474 489", "carcinogenicity tiering", "FDA CTP tier"], | |
| "NAMs / In Silico": ["in silico", "QSAR", "read-across", "AOP", "PBPK", "high-throughput", "omics", "organ-on-chip", "microphysiological"], | |
| "Acute toxicity": ["acute toxicity", "LD50", "LC50", "single dose", "lethality", "mortality"], | |
| "Repeated dose toxicity": ["repeated dose", "subchronic", "chronic", "NOAEL", "LOAEL", "target organ", "90-day", "28-day"], | |
| "Irritation / Sensitization": ["skin irritation", "eye irritation", "sensitization", "LLNA", "Draize"], | |
| "Repro / Developmental": ["reproductive toxicity", "fertility", "developmental toxicity", "teratogenic", "prenatal", "postnatal"], | |
| "Carcinogenicity": ["carcinogenicity", "tumor", "neoplasm", "cancer", "two-year bioassay"], | |
| } | |
| # ============================= | |
| # PDF extraction (text-based PDFs only) | |
| # ============================= | |
| def extract_pages_from_pdf(pdf_path: str, max_pages: int = 0) -> Tuple[List[Tuple[int, str]], int]: | |
| reader = PdfReader(pdf_path) | |
| page_count = len(reader.pages) | |
| pages_to_read = page_count if (max_pages is None or max_pages <= 0) else min(page_count, int(max_pages)) | |
| pages: List[Tuple[int, str]] = [] | |
| for i in range(pages_to_read): | |
| try: | |
| t = reader.pages[i].extract_text() or "" | |
| except Exception: | |
| t = "" | |
| pages.append((i + 1, t or "")) | |
| return pages, page_count | |
| def clean_text(t: str) -> str: | |
| t = t or "" | |
| t = t.replace("\x00", " ") | |
| t = re.sub(r"\s+", " ", t).strip() | |
| return t | |
| def chunk_pages(pages: List[Tuple[int, str]], target_chars: int = 3000) -> List[Dict[str, Any]]: | |
| chunks = [] | |
| buf = [] | |
| start_page = None | |
| cur_len = 0 | |
| for pno, txt in pages: | |
| txt = clean_text(txt) | |
| if not txt: | |
| continue | |
| if start_page is None: | |
| start_page = pno | |
| if cur_len + len(txt) + 1 > target_chars and buf: | |
| end_page = pno - 1 | |
| end_page = end_page if end_page >= start_page else start_page | |
| chunks.append({"pages": f"{start_page}-{end_page}", "text": " ".join(buf)}) | |
| buf = [txt] | |
| start_page = pno | |
| cur_len = len(txt) | |
| else: | |
| buf.append(txt) | |
| cur_len += len(txt) + 1 | |
| if buf and start_page is not None: | |
| end_page = pages[-1][0] if pages else start_page | |
| chunks.append({"pages": f"{start_page}-{end_page}", "text": " ".join(buf)}) | |
| return chunks | |
| def _text_based_pdf_warning(pages: List[Tuple[int, str]]) -> bool: | |
| joined = " ".join([clean_text(t) for _, t in pages if clean_text(t)]) | |
| return len(joined.strip()) < 200 | |
| # ============================= | |
| # Lightweight retrieval (TF-IDF) | |
| # ============================= | |
| def select_relevant_chunks( | |
| chunks: List[Dict[str, Any]], | |
| queries: List[str], | |
| top_per_query: int = 2, | |
| max_chunks: int = 12 | |
| ) -> List[Dict[str, Any]]: | |
| texts = [c["text"] for c in chunks] | |
| if not texts: | |
| return [] | |
| if TfidfVectorizer is None: | |
| selected_idx: List[int] = [] | |
| for q in queries: | |
| q_tokens = set([w for w in re.findall(r"[a-zA-Z0-9\\-]+", (q or "").lower()) if len(w) >= 3]) | |
| scored = [] | |
| for i, t in enumerate(texts): | |
| tl = t.lower() | |
| scored.append((sum(1 for tok in q_tokens if tok in tl), i)) | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| for _, i in scored[:top_per_query]: | |
| if i not in selected_idx: | |
| selected_idx.append(i) | |
| if not selected_idx: | |
| selected_idx = list(range(min(len(chunks), max_chunks))) | |
| return [chunks[i] for i in selected_idx[:max_chunks]] | |
| vectorizer = TfidfVectorizer(stop_words="english", ngram_range=(1, 2), max_features=20000) | |
| X = vectorizer.fit_transform(texts) | |
| selected_idx: List[int] = [] | |
| for q in queries: | |
| q = (q or "").strip() | |
| if not q: | |
| continue | |
| qv = vectorizer.transform([q]) | |
| sims = (X @ qv.T).toarray().ravel() | |
| idx = np.argsort(sims)[::-1] | |
| for i in idx[:top_per_query]: | |
| if i not in selected_idx: | |
| selected_idx.append(i) | |
| if not selected_idx: | |
| selected_idx = list(range(min(len(chunks), max_chunks))) | |
| return [chunks[i] for i in selected_idx[:max_chunks]] | |
| def build_context(selected_chunks: List[Dict[str, Any]], max_chars: int = 20000) -> str: | |
| parts = [] | |
| total = 0 | |
| for c in selected_chunks: | |
| block = f"[pages {c['pages']}]\n{c['text']}\n" | |
| if total + len(block) > max_chars: | |
| break | |
| parts.append(block) | |
| total += len(block) | |
| return "\n".join(parts).strip() | |
| # ============================= | |
| # Spec -> JSON schema | |
| # ============================= | |
| def slugify_field(name: str) -> str: | |
| name = (name or "").strip() | |
| name = re.sub(r"[^\w\s-]", "", name) | |
| name = re.sub(r"[\s-]+", "_", name).lower() | |
| return name[:80] if name else "field" | |
| def parse_field_spec(spec: str) -> Tuple[Dict[str, Any], Dict[str, str]]: | |
| props: Dict[str, Any] = {} | |
| instr: Dict[str, str] = {} | |
| for raw_line in (spec or "").splitlines(): | |
| line = raw_line.strip() | |
| if not line or line.startswith("#"): | |
| continue | |
| parts = [p.strip() for p in line.split("|")] | |
| if len(parts) < 2: | |
| continue | |
| field_name = parts[0] | |
| ftype = parts[1] | |
| finstr = parts[2] if len(parts) >= 3 else "" | |
| key = slugify_field(field_name) | |
| instr[key] = finstr | |
| schema: Dict[str, Any] = {"type": "string"} | |
| if ftype == "str": | |
| schema = {"type": "string"} | |
| elif ftype == "num": | |
| schema = {"type": "number"} | |
| elif ftype == "bool": | |
| schema = {"type": "boolean"} | |
| elif ftype.startswith("list[enum[") and ftype.endswith("]]"): | |
| inside = ftype[len("list[enum["):-2].strip() | |
| vals = [v.strip() for v in inside.split(",") if v.strip()] | |
| schema = {"type": "array", "items": {"type": "string", "enum": vals}} | |
| elif ftype.startswith("list[str]"): | |
| schema = {"type": "array", "items": {"type": "string"}} | |
| elif ftype.startswith("list[num]"): | |
| schema = {"type": "array", "items": {"type": "number"}} | |
| elif ftype.startswith("enum[") and ftype.endswith("]"): | |
| inside = ftype[len("enum["):-1].strip() | |
| vals = [v.strip() for v in inside.split(",") if v.strip()] | |
| schema = {"type": "string", "enum": vals} | |
| else: | |
| schema = {"type": "string"} | |
| props[key] = schema | |
| return props, instr | |
| def build_extraction_schema(field_props: Dict[str, Any], vocab: Dict[str, Any]) -> Dict[str, Any]: | |
| risk_enum = vocab.get("risk_stance_enum", ["acceptable","acceptable_with_uncertainty","not_acceptable","insufficient_data"]) | |
| all_field_keys = list(field_props.keys()) | |
| return { | |
| "type": "object", | |
| "additionalProperties": False, | |
| "properties": { | |
| "paper_title": {"type": "string"}, | |
| "risk_stance": {"type": "string", "enum": risk_enum}, | |
| "risk_confidence": {"type": "number", "minimum": 0, "maximum": 1}, | |
| "risk_summary": {"type": "string"}, | |
| "extracted": { | |
| "type": "object", | |
| "additionalProperties": False, | |
| "properties": field_props, | |
| "required": all_field_keys | |
| }, | |
| "evidence": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "additionalProperties": False, | |
| "properties": { | |
| "field": {"type": "string"}, | |
| "quote": {"type": "string"}, | |
| "pages": {"type": "string"} | |
| }, | |
| "required": ["field", "quote", "pages"] | |
| } | |
| } | |
| }, | |
| "required": ["paper_title","risk_stance","risk_confidence","risk_summary","extracted","evidence"] | |
| } | |
| # ============================= | |
| # OpenAI client + extraction | |
| # ============================= | |
| def get_openai_client(api_key: str) -> OpenAI: | |
| key = (api_key or "").strip() or os.getenv("OPENAI_API_KEY", "").strip() | |
| if not key: | |
| raise ValueError("Missing OpenAI API key. Provide it in the UI or set OPENAI_API_KEY secret in Hugging Face.") | |
| return OpenAI(api_key=key) | |
| def openai_structured_extract( | |
| client: OpenAI, | |
| model: str, | |
| schema: Dict[str, Any], | |
| controlled_vocab: Dict[str, Any], | |
| field_instructions: Dict[str, str], | |
| context: str | |
| ) -> Dict[str, Any]: | |
| field_instr_lines = [f"- {k}: {v if v else '(no extra instructions)'}" for k, v in field_instructions.items()] | |
| vocab_text = json.dumps(controlled_vocab, indent=2) | |
| has_fda_tier_field = "fda_ctp_carcinogenicity_tier" in field_instructions | |
| system_msg = ( | |
| "You are a toxicology research paper data-extraction assistant for an industry safety assessor.\n" | |
| "Grounding rules (must follow):\n" | |
| "1) Use ONLY the provided excerpts; do NOT invent details.\n" | |
| "2) If a value is not explicitly stated, output empty string or empty list, OR the enum value 'not_reported'/'insufficient_data' when applicable.\n" | |
| "3) Provide evidence quotes + page ranges for extracted fields.\n" | |
| "4) risk_stance is regulatory: acceptable / acceptable_with_uncertainty / not_acceptable / insufficient_data.\n" | |
| "5) Prefer controlled vocab terms when applicable.\n" | |
| "6) Use an INTERNAL Tree-of-Thought process before finalizing JSON:\n" | |
| " - Branch evidence by endpoint/theme.\n" | |
| " - Test competing interpretations.\n" | |
| " - Prune branches that are not directly supported by excerpts.\n" | |
| " - Select the most evidence-grounded branch only.\n" | |
| " - Do NOT output reasoning traces; output JSON only.\n" | |
| "7) If the FDA CTP tier field is requested but evidence is insufficient, output exactly: 'enough data is not available'.\n" | |
| ) | |
| user_msg = ( | |
| "CONTROLLED VOCAB (JSON):\n" | |
| f"{vocab_text}\n\n" | |
| "TREE-OF-THOUGHT EXECUTION FRAMEWORK (internal only, do not output):\n" | |
| "A) Build evidence map: claims -> quotes -> page ranges.\n" | |
| "B) Generate candidate interpretations per endpoint.\n" | |
| "C) Eliminate candidates lacking direct quote support.\n" | |
| "D) Select final grounded interpretation and populate schema fields.\n" | |
| "E) For uncertain fields, use explicit fallback values from enum/instructions.\n\n" | |
| "FIELD INSTRUCTIONS:\n" | |
| + "\n".join(field_instr_lines) | |
| + "\n\n" | |
| "EXCERPTS (with page ranges):\n" | |
| f"{context}\n\n" | |
| + ( | |
| "IMPORTANT: `fda_ctp_carcinogenicity_tier` must be one of " | |
| "[Tier_1_high_priority, Tier_2_moderate_priority, Tier_3_lower_priority, enough data is not available].\n" | |
| if has_fda_tier_field else "" | |
| ) | |
| ) | |
| resp = client.responses.create( | |
| model=model, | |
| input=[ | |
| {"role": "system", "content": system_msg}, | |
| {"role": "user", "content": user_msg} | |
| ], | |
| text={ | |
| "format": { | |
| "type": "json_schema", | |
| "name": "tox_extraction", | |
| "schema": schema, | |
| "strict": True | |
| } | |
| } | |
| ) | |
| return json.loads(resp.output_text) | |
| def openai_synthesize_across_papers(client: OpenAI, model: str, rows: List[Dict[str, Any]]) -> str: | |
| system_msg = ( | |
| "You are a senior toxicology safety assessor summarizing multiple papers.\n" | |
| "Create a concise synthesis: consensus, disagreements, data gaps, and actionable next steps.\n" | |
| "Base strictly on the provided extracted JSON (which is evidence-backed).\n" | |
| ) | |
| user_msg = "EXTRACTED_ROWS_JSON:\n" + json.dumps(rows, indent=2) | |
| resp = client.responses.create(model=model, input=[{"role":"system","content":system_msg},{"role":"user","content":user_msg}]) | |
| return resp.output_text | |
| # ============================= | |
| # Controlled vocab editor helpers (lists only) + search filter | |
| # ============================= | |
| def _filter_terms_df(df: pd.DataFrame, query: str) -> pd.DataFrame: | |
| if df is None or df.empty: | |
| return pd.DataFrame(columns=["term"]) | |
| q = (query or "").strip().lower() | |
| if not q: | |
| return df[["term"]].copy() | |
| mask = df["term"].astype(str).str.lower().str.contains(q, na=False) | |
| return df.loc[mask, ["term"]].copy() | |
| def vocab_init_state(vocab_json: str): | |
| try: | |
| vocab = json.loads(vocab_json or DEFAULT_CONTROLLED_VOCAB_JSON) | |
| except Exception: | |
| vocab = json.loads(DEFAULT_CONTROLLED_VOCAB_JSON) | |
| list_keys = sorted([k for k, v in vocab.items() if isinstance(v, list)]) | |
| default_key = list_keys[0] if list_keys else None | |
| terms = vocab.get(default_key, []) if default_key else [] | |
| full_df = pd.DataFrame({"term": terms}) | |
| filtered_df = _filter_terms_df(full_df, "") | |
| return vocab, list_keys, default_key, full_df, filtered_df, json.dumps(vocab, indent=2), "✅ Vocab loaded." | |
| def vocab_reset_defaults_ui(): | |
| vocab, keys, k0, full_df, filtered_df, vjson, msg = vocab_init_state(DEFAULT_CONTROLLED_VOCAB_JSON) | |
| return vocab, gr.update(choices=keys, value=k0), full_df, filtered_df, vjson, msg, vjson | |
| def vocab_load_category(vocab_state: Dict[str, Any], category: str, search: str): | |
| if not category or category not in vocab_state: | |
| empty = pd.DataFrame(columns=["term"]) | |
| return empty, empty, "Select a category." | |
| terms = vocab_state.get(category, []) | |
| if not isinstance(terms, list): | |
| empty = pd.DataFrame(columns=["term"]) | |
| return empty, empty, "This category is not a list." | |
| full = pd.DataFrame({"term": terms}) | |
| filtered = _filter_terms_df(full, search) | |
| return full, filtered, f"Editing: {category}" | |
| def vocab_add_term(vocab_state: Dict[str, Any], category: str, term: str, search: str): | |
| term = (term or "").strip() | |
| if not term: | |
| return gr.update(), gr.update(), "", "Enter a term to add." | |
| if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list): | |
| return gr.update(), gr.update(), "", "Pick a list category first." | |
| if term not in vocab_state[category]: | |
| vocab_state[category].append(term) | |
| full = pd.DataFrame({"term": vocab_state[category]}) | |
| filtered = _filter_terms_df(full, search) | |
| return full, filtered, "", f"Added: {term}" | |
| def vocab_remove_term(vocab_state: Dict[str, Any], category: str, term: str, search: str): | |
| term = (term or "").strip() | |
| if not term: | |
| return gr.update(), gr.update(), "", "Enter a term to remove." | |
| if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list): | |
| return gr.update(), gr.update(), "", "Pick a list category first." | |
| vocab_state[category] = [t for t in vocab_state[category] if t != term] | |
| full = pd.DataFrame({"term": vocab_state[category]}) | |
| filtered = _filter_terms_df(full, search) | |
| return full, filtered, "", f"Removed: {term}" | |
| def vocab_apply_df(vocab_state: Dict[str, Any], category: str, terms_df: Any, search: str): | |
| if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list): | |
| return json.dumps(vocab_state, indent=2), pd.DataFrame(columns=["term"]), "Pick a list category first." | |
| try: | |
| df = terms_df if isinstance(terms_df, pd.DataFrame) else pd.DataFrame(terms_df, columns=["term"]) | |
| except Exception: | |
| return json.dumps(vocab_state, indent=2), pd.DataFrame(columns=["term"]), "Could not parse terms table." | |
| terms = [] | |
| for t in df.get("term", []).tolist(): | |
| t = (str(t) if t is not None else "").strip() | |
| if t and t not in terms: | |
| terms.append(t) | |
| vocab_state[category] = terms | |
| vjson = json.dumps(vocab_state, indent=2) | |
| filtered = _filter_terms_df(pd.DataFrame({"term": terms}), search) | |
| return vjson, filtered, f"✅ Applied {len(terms)} terms to {category}." | |
| def vocab_filter_preview(terms_df, search): | |
| try: | |
| df = terms_df if isinstance(terms_df, pd.DataFrame) else pd.DataFrame(terms_df, columns=["term"]) | |
| except Exception: | |
| df = pd.DataFrame(columns=["term"]) | |
| return _filter_terms_df(df, search) | |
| # ============================= | |
| # Field mapping from endpoints | |
| # ============================= | |
| TYPE_CHOICES = ["str", "num", "bool", "list[str]", "list[num]", "enum", "list[enum]"] | |
| def build_spec_from_field_rows(rows: List[Dict[str, Any]]) -> str: | |
| lines = [ | |
| "# One field per line: Field Name | type | instructions", | |
| "# types: str, num, bool, list[str], list[num], enum[a,b,c], list[enum[a,b,c]]", | |
| "" | |
| ] | |
| for r in rows: | |
| field = str(r.get("field","")).strip() | |
| ftype = str(r.get("type","")).strip() | |
| enums = str(r.get("enum_values","")).strip() | |
| instr = str(r.get("instructions","")).strip() | |
| if not field or not ftype: | |
| continue | |
| if ftype == "enum": | |
| vals = [v.strip() for v in enums.split(",") if v.strip()] | |
| type_str = f"enum[{','.join(vals)}]" if vals else "str" | |
| elif ftype == "list[enum]": | |
| vals = [v.strip() for v in enums.split(",") if v.strip()] | |
| type_str = f"list[enum[{','.join(vals)}]]" if vals else "list[str]" | |
| else: | |
| type_str = ftype | |
| lines.append(f"{field} | {type_str} | {instr}") | |
| return "\n".join(lines).strip() + "\n" | |
| def build_rows_from_endpoints(selected_endpoints: List[str]) -> Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, List[str]]]: | |
| selected_endpoints = selected_endpoints or [] | |
| rows: List[Dict[str, Any]] = [] | |
| field_key_to_module: Dict[str, str] = {} | |
| module_to_keys: Dict[str, List[str]] = {} | |
| for r in PRESET_CORE: | |
| rows.append(dict(r)) | |
| k = slugify_field(r["field"]) | |
| field_key_to_module[k] = "Core" | |
| module_to_keys.setdefault("Core", []).append(k) | |
| for module in selected_endpoints: | |
| preset = ENDPOINT_MODULES.get(module) | |
| if not preset: | |
| continue | |
| for r in preset: | |
| rows.append(dict(r)) | |
| k = slugify_field(r["field"]) | |
| field_key_to_module[k] = module | |
| module_to_keys.setdefault(module, []).append(k) | |
| seen = set() | |
| deduped: List[Dict[str, Any]] = [] | |
| for r in rows: | |
| k = str(r.get("field","")).strip().lower() | |
| if not k or k in seen: | |
| continue | |
| seen.add(k) | |
| deduped.append(r) | |
| # Rebuild module_to_keys to match deduped | |
| dedup_keys = set([slugify_field(r["field"]) for r in deduped]) | |
| module_to_keys = {m: [k for k in ks if k in dedup_keys] for m, ks in module_to_keys.items()} | |
| return deduped, field_key_to_module, module_to_keys | |
| def apply_endpoint_preset(preset_name: str): | |
| vals = ENDPOINT_PRESETS.get(preset_name, []) | |
| return gr.update(value=vals) | |
| def sync_fields_from_endpoints(selected_endpoints: List[str], admin_mode: bool, current_rows: List[Dict[str, Any]], current_spec: str): | |
| if admin_mode: | |
| df = pd.DataFrame(current_rows or [], columns=["field","type","enum_values","instructions"]) | |
| return current_rows, df, current_spec, "Admin mode: endpoint selection will not overwrite custom columns." | |
| rows, _, _ = build_rows_from_endpoints(selected_endpoints or []) | |
| df = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"]) | |
| spec = build_spec_from_field_rows(rows) | |
| return rows, df, spec, "✅ Columns updated from selected endpoints." | |
| def admin_apply_endpoints(selected_endpoints: List[str]): | |
| rows, _, _ = build_rows_from_endpoints(selected_endpoints or []) | |
| df = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"]) | |
| spec = build_spec_from_field_rows(rows) | |
| return rows, df, spec, "✅ Loaded selected endpoints into the builder (Replace)." | |
| def fields_add_or_update(field_name: str, ftype: str, enum_values: str, instructions: str, field_rows: List[Dict[str, Any]]): | |
| field_name = (field_name or "").strip() | |
| ftype = (ftype or "").strip() | |
| enum_values = (enum_values or "").strip() | |
| instructions = (instructions or "").strip() | |
| if not field_name or not ftype: | |
| df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"]) | |
| return field_rows, df, build_spec_from_field_rows(field_rows), "Field name and type are required." | |
| updated = False | |
| for r in field_rows: | |
| if str(r.get("field","")).strip().lower() == field_name.lower(): | |
| r["type"] = ftype | |
| r["enum_values"] = enum_values | |
| r["instructions"] = instructions | |
| updated = True | |
| break | |
| if not updated: | |
| field_rows.append({"field": field_name, "type": ftype, "enum_values": enum_values, "instructions": instructions}) | |
| df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"]) | |
| return field_rows, df, build_spec_from_field_rows(field_rows), ("Updated field." if updated else "Added field.") | |
| def fields_apply_df(field_rows: List[Dict[str, Any]], df_in: Any): | |
| try: | |
| df = df_in if isinstance(df_in, pd.DataFrame) else pd.DataFrame(df_in, columns=["field","type","enum_values","instructions"]) | |
| except Exception: | |
| df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"]) | |
| return field_rows, df, build_spec_from_field_rows(field_rows), "Could not parse builder table." | |
| cleaned = [] | |
| seen = set() | |
| for _, r in df.iterrows(): | |
| field = str(r.get("field","")).strip() | |
| ftype = str(r.get("type","")).strip() | |
| enums = str(r.get("enum_values","")).strip() | |
| instr = str(r.get("instructions","")).strip() | |
| if not field or not ftype: | |
| continue | |
| k = field.lower() | |
| if k in seen: | |
| continue | |
| seen.add(k) | |
| cleaned.append({"field": field, "type": ftype, "enum_values": enums, "instructions": instr}) | |
| df2 = pd.DataFrame(cleaned, columns=["field","type","enum_values","instructions"]) | |
| spec = build_spec_from_field_rows(cleaned) | |
| return cleaned, df2, spec, f"✅ Applied builder table ({len(cleaned)} fields)." | |
| # ============================= | |
| # Row building + “non-empty module” logic | |
| # ============================= | |
| def _as_list(x) -> List[str]: | |
| if x is None: | |
| return [] | |
| if isinstance(x, list): | |
| out = [] | |
| for v in x: | |
| s = str(v).strip() | |
| if s: | |
| out.append(s) | |
| return out | |
| s = str(x).strip() | |
| return [s] if s else [] | |
| def _format_value(v: Any) -> Any: | |
| if isinstance(v, list): | |
| return "; ".join([str(x) for x in v if str(x).strip()]) | |
| return v | |
| EMPTY_STRINGS = {"", "not_reported", "insufficient_data", "none", "na", "n/a", "null"} | |
| def _is_empty_value(v: Any) -> bool: | |
| if v is None: | |
| return True | |
| if isinstance(v, float) and np.isnan(v): | |
| return True | |
| if isinstance(v, list): | |
| cleaned = [str(x).strip() for x in v if str(x).strip()] | |
| if not cleaned: | |
| return True | |
| # empty if all items are not_reported / similar | |
| return all((c.lower() in EMPTY_STRINGS) for c in cleaned) | |
| s = str(v).strip() | |
| if not s: | |
| return True | |
| return s.lower() in EMPTY_STRINGS | |
| def _json_default(o: Any): | |
| if isinstance(o, np.integer): | |
| return int(o) | |
| if isinstance(o, np.floating): | |
| return float(o) | |
| if isinstance(o, np.ndarray): | |
| return o.tolist() | |
| raise TypeError(f"Object of type {o.__class__.__name__} is not JSON serializable") | |
| def _record_id(file_name: str, chemical: str, endpoint: str) -> str: | |
| chemical = (chemical or "").strip() or "-" | |
| endpoint = (endpoint or "").strip() or "Paper" | |
| return f"{file_name} | {chemical} | {endpoint}" | |
| def _module_has_any_data(ext: Dict[str, Any], module_keys: List[str], field_props: Dict[str, Any]) -> bool: | |
| for k in (module_keys or []): | |
| v = ext.get(k, None) | |
| if not _is_empty_value(v): | |
| return True | |
| return False | |
| # ============================= | |
| # Evidence + report helpers | |
| # ============================= | |
| def _make_vertical(records: List[Dict[str, Any]], record_id: str) -> pd.DataFrame: | |
| if not records or not record_id: | |
| return pd.DataFrame(columns=["Field", "Value"]) | |
| row = next((r for r in records if r.get("record_id") == record_id), None) | |
| if not row: | |
| return pd.DataFrame(columns=["Field", "Value"]) | |
| hidden = {"record_id"} | |
| keys = [k for k in row.keys() if k not in hidden] | |
| return pd.DataFrame({"Field": keys, "Value": [row.get(k, "") for k in keys]}) | |
| def _render_evidence(details: List[Dict[str, Any]], file_name: str, allowed_fields: Optional[set] = None, max_items: int = 120) -> str: | |
| if not details or not file_name: | |
| return "" | |
| d = next((x for x in details if x.get("_file") == file_name), None) | |
| if not d: | |
| return "" | |
| ev = d.get("evidence", []) or [] | |
| lines = [] | |
| for e in ev: | |
| field = (e.get("field", "") or "").strip() | |
| if allowed_fields is not None and field and field not in allowed_fields: | |
| continue | |
| quote = (e.get("quote", "") or "").strip() | |
| pages = (e.get("pages", "") or "").strip() | |
| if quote: | |
| if len(quote) > 320: | |
| quote = quote[:320] + "…" | |
| lines.append(f"- **{field}** (pages {pages}): “{quote}”") | |
| if len(lines) >= max_items: | |
| break | |
| header = "### Evidence (grounding)\n" | |
| return header + ("\n".join(lines) if lines else "- (no evidence returned)") | |
| def _overview_df_from_records(records: List[Dict[str, Any]]) -> pd.DataFrame: | |
| if not records: | |
| return pd.DataFrame(columns=["record_id","file","paper_title","chemical","endpoint","risk_stance","risk_confidence"]) | |
| df = pd.DataFrame(records) | |
| cols = ["record_id","file","paper_title","chemical","endpoint","risk_stance","risk_confidence"] | |
| cols = [c for c in cols if c in df.columns] | |
| return df[cols].copy() if cols else df.head(50) | |
| def _risk_badge(risk: str) -> str: | |
| r = (risk or "").strip().lower() | |
| if r == "acceptable": | |
| bg = "#e7f7ed"; fg = "#0f5132" | |
| elif r == "acceptable_with_uncertainty": | |
| bg = "#fff3cd"; fg = "#664d03" | |
| elif r == "not_acceptable": | |
| bg = "#f8d7da"; fg = "#842029" | |
| else: | |
| bg = "#e2e3e5"; fg = "#41464b" | |
| label = risk if risk else "unknown" | |
| return f'<span style="background:{bg};color:{fg};padding:4px 10px;border-radius:999px;font-weight:600;font-size:12px;">{label}</span>' | |
| def _safe_str(x: Any) -> str: | |
| if x is None: | |
| return "" | |
| if isinstance(x, float) and np.isnan(x): | |
| return "" | |
| return str(x) | |
| def render_summary_card(record_id: str, records: List[Dict[str, Any]]) -> str: | |
| if not record_id or not records: | |
| return "<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#666;'>Run extraction to view results.</div></div>" | |
| row = next((r for r in records if r.get("record_id") == record_id), None) | |
| if not row: | |
| return "<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#666;'>Select a record.</div></div>" | |
| title = _safe_str(row.get("paper_title", "")).strip() or "Untitled paper" | |
| file_name = _safe_str(row.get("file", "")) | |
| chemical = _safe_str(row.get("chemical", "-")) | |
| endpoint = _safe_str(row.get("endpoint", "Paper")) | |
| risk = _safe_str(row.get("risk_stance", "")) | |
| conf = row.get("risk_confidence", "") | |
| try: | |
| conf_txt = f"{float(conf):.2f}" if conf != "" else "" | |
| except Exception: | |
| conf_txt = _safe_str(conf) | |
| key_findings = _safe_str(row.get("key_findings", "")).strip() | |
| dose_metrics = _safe_str(row.get("dose_metrics", "")).strip() | |
| conclusion = _safe_str(row.get("conclusion", "")).strip() | |
| risk_summary = _safe_str(row.get("risk_summary", "")).strip() | |
| # Keep compact | |
| def _clip(s: str, n: int = 380) -> str: | |
| s = s.strip() | |
| if len(s) <= n: | |
| return s | |
| return s[:n] + "…" | |
| return f""" | |
| <div style="border:1px solid #eaeaea;padding:14px;border-radius:12px;"> | |
| <div style="display:flex;align-items:center;justify-content:space-between;gap:12px;flex-wrap:wrap;"> | |
| <div style="font-weight:700;font-size:16px;">Executive Summary</div> | |
| <div>{_risk_badge(risk)} <span style="margin-left:10px;color:#666;font-size:12px;">confidence: {conf_txt}</span></div> | |
| </div> | |
| <div style="margin-top:10px;"> | |
| <div style="font-weight:650;">{title}</div> | |
| <div style="color:#666;font-size:12px;margin-top:4px;"> | |
| <span><b>File:</b> {file_name}</span> • | |
| <span><b>Chemical:</b> {chemical}</span> • | |
| <span><b>Endpoint:</b> {endpoint}</span> | |
| </div> | |
| </div> | |
| <div style="margin-top:12px;display:grid;grid-template-columns:1fr;gap:10px;"> | |
| <div> | |
| <div style="font-weight:650;margin-bottom:4px;">Key Findings</div> | |
| <div style="color:#222;">{_clip(key_findings) if key_findings else "<span style='color:#666'>(not reported)</span>"}</div> | |
| </div> | |
| <div> | |
| <div style="font-weight:650;margin-bottom:4px;">Dose Metrics</div> | |
| <div style="color:#222;">{_clip(dose_metrics) if dose_metrics else "<span style='color:#666'>(not reported)</span>"}</div> | |
| </div> | |
| <div> | |
| <div style="font-weight:650;margin-bottom:4px;">Conclusion</div> | |
| <div style="color:#222;">{_clip(conclusion) if conclusion else "<span style='color:#666'>(not reported)</span>"}</div> | |
| </div> | |
| <div> | |
| <div style="font-weight:650;margin-bottom:4px;">Risk Summary</div> | |
| <div style="color:#222;">{_clip(risk_summary) if risk_summary else "<span style='color:#666'>(not reported)</span>"}</div> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| # ============================= | |
| # Main extraction handler | |
| # ============================= | |
| def run_extraction( | |
| files, | |
| api_key, | |
| model, | |
| selected_endpoints, | |
| field_spec, | |
| vocab_json, | |
| max_pages, | |
| chunk_chars, | |
| max_context_chars, | |
| admin_mode | |
| ): | |
| if not files: | |
| return ( | |
| "<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#666;'>Upload PDFs to run extraction.</div></div>", | |
| pd.DataFrame(), None, None, None, "Upload one or more PDFs.", | |
| gr.update(choices=[], value=None), | |
| [], [], pd.DataFrame(columns=["Field","Value"]), "" | |
| ) | |
| try: | |
| vocab = json.loads(vocab_json or DEFAULT_CONTROLLED_VOCAB_JSON) | |
| except Exception as e: | |
| return ( | |
| "<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#b00;'>Invalid vocab JSON.</div></div>", | |
| pd.DataFrame(), None, None, None, f"Controlled vocab JSON invalid: {e}", | |
| gr.update(choices=[], value=None), | |
| [], [], pd.DataFrame(columns=["Field","Value"]), "" | |
| ) | |
| field_props, field_instr = parse_field_spec(field_spec or "") | |
| if not field_props: | |
| return ( | |
| "<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#b00;'>No columns defined.</div></div>", | |
| pd.DataFrame(), None, None, None, "No extraction fields are defined. (Check selected endpoints or admin field spec.)", | |
| gr.update(choices=[], value=None), | |
| [], [], pd.DataFrame(columns=["Field","Value"]), "" | |
| ) | |
| schema = build_extraction_schema(field_props, vocab) | |
| if admin_mode: | |
| field_key_to_module = {k: "Custom" for k in field_props.keys()} | |
| module_to_keys: Dict[str, List[str]] = {"Custom": list(field_props.keys())} | |
| endpoint_modules_for_rows = ["Custom"] | |
| else: | |
| _, field_key_to_module, module_to_keys = build_rows_from_endpoints(selected_endpoints or []) | |
| endpoint_modules_for_rows = list(selected_endpoints or []) or ["Core"] | |
| try: | |
| client = get_openai_client(api_key) | |
| except Exception as e: | |
| return ( | |
| "<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#b00;'>Missing API key.</div></div>", | |
| pd.DataFrame(), None, None, None, str(e), | |
| gr.update(choices=[], value=None), | |
| [], [], pd.DataFrame(columns=["Field","Value"]), "" | |
| ) | |
| paper_details: List[Dict[str, Any]] = [] | |
| output_rows: List[Dict[str, Any]] = [] | |
| nlp_diagnostics: List[Dict[str, Any]] = [] | |
| tmpdir = Path(tempfile.mkdtemp(prefix="tox_extract_")) | |
| for f in files: | |
| pdf_path = f.name | |
| filename = os.path.basename(pdf_path) | |
| pages, page_count = extract_pages_from_pdf(pdf_path, max_pages=int(max_pages)) | |
| if _text_based_pdf_warning(pages): | |
| ex = { | |
| "_file": filename, | |
| "_pages_in_pdf": page_count, | |
| "paper_title": "", | |
| "risk_stance": "insufficient_data", | |
| "risk_confidence": 0.0, | |
| "risk_summary": "No extractable text found. This app supports text-based PDFs only (not scanned images).", | |
| "extracted": {k: ([] if field_props[k].get("type") == "array" else "") for k in field_props.keys()}, | |
| "evidence": [] | |
| } | |
| nlp_diagnostics.append( | |
| { | |
| "file": filename, | |
| "ranking_method": "unavailable_no_text", | |
| "selected_indices": [], | |
| "coverage_by_query_family": {}, | |
| "coverage_score": 0.0, | |
| } | |
| ) | |
| else: | |
| chunks = chunk_pages(pages, target_chars=int(chunk_chars)) | |
| base_queries = [ | |
| "regulatory acceptability risk hazard concern conclusion uncertainty evidence NOAEL LOAEL BMD", | |
| "chemical name CAS number", | |
| ] | |
| extra_terms = [ins if ins else k for k, ins in field_instr.items()] | |
| queries, families = expand_regulatory_queries( | |
| base_queries=base_queries, | |
| endpoint_modules=selected_endpoints or [], | |
| frameworks=["FDA CTP", "EPA"], | |
| extra_terms=extra_terms, | |
| ) | |
| emb_mat = None | |
| qemb = None | |
| try: | |
| texts = [c.get("text", "") for c in chunks] | |
| if texts: | |
| emb_mat = embed_texts(client, DEFAULT_EMBEDDING_MODEL, texts) | |
| qemb = embed_texts(client, DEFAULT_EMBEDDING_MODEL, [" ".join(queries[:20])])[0] | |
| except Exception: | |
| emb_mat = None | |
| qemb = None | |
| selected, diag = hybrid_rank_text_items( | |
| items=chunks, | |
| query=" ".join(queries[:20]), | |
| families=families, | |
| top_k=12, | |
| item_embeddings=emb_mat, | |
| query_embedding=qemb, | |
| ) | |
| nlp_diagnostics.append(dict({"file": filename}, **diag)) | |
| span_blocks: List[str] = [] | |
| chars = 0 | |
| for c in selected: | |
| span = extract_evidence_span(c.get("text", ""), " ".join(queries[:20]), page=None, n_sentences=5) | |
| snippet = span.get("text", "") or c.get("text", "") | |
| block = f"[pages {c.get('pages','')}]\n{snippet}\n" | |
| if chars + len(block) > int(max_context_chars): | |
| break | |
| span_blocks.append(block) | |
| chars += len(block) | |
| context = "\n".join(span_blocks).strip() | |
| if not context: | |
| context = build_context(selected, max_chars=int(max_context_chars)) | |
| ex = openai_structured_extract( | |
| client=client, | |
| model=model, | |
| schema=schema, | |
| controlled_vocab=vocab, | |
| field_instructions=field_instr, | |
| context=context | |
| ) | |
| ex["_file"] = filename | |
| ex["_pages_in_pdf"] = page_count | |
| paper_details.append(ex) | |
| base = { | |
| "file": filename, | |
| "paper_title": ex.get("paper_title", ""), | |
| "risk_stance": ex.get("risk_stance", ""), | |
| "risk_confidence": ex.get("risk_confidence", ""), | |
| "risk_summary": ex.get("risk_summary", ""), | |
| } | |
| ext = ex.get("extracted") or {} | |
| chemicals = _as_list(ext.get("chemicals")) | |
| if not chemicals: | |
| chemicals = ["-"] | |
| # Single-chemical => one-row-per-paper | |
| if len(chemicals) <= 1: | |
| chem = chemicals[0] | |
| row = dict(base) | |
| row["chemical"] = chem | |
| row["endpoint"] = "Paper" | |
| row["record_id"] = _record_id(filename, chem, row["endpoint"]) | |
| for k in field_props.keys(): | |
| row[k] = _format_value(ext.get(k, [] if field_props[k].get("type") == "array" else "")) | |
| output_rows.append(row) | |
| # Multi-chemical => chemical–endpoint rows (ONLY non-empty modules) | |
| else: | |
| core_keys = [k for k, m in field_key_to_module.items() if m == "Core"] if not admin_mode else [] | |
| # determine which endpoint modules have any data (skip empty ones) | |
| candidate_modules = [m for m in endpoint_modules_for_rows if m != "Core"] | |
| non_empty_modules = [] | |
| for m in candidate_modules: | |
| if _module_has_any_data(ext, module_to_keys.get(m, []), field_props): | |
| non_empty_modules.append(m) | |
| # If everything empty, fall back to a single Paper row (otherwise you get no rows) | |
| if not non_empty_modules: | |
| row = dict(base) | |
| row["chemical"] = "multiple" | |
| row["endpoint"] = "Paper" | |
| row["record_id"] = _record_id(filename, row["chemical"], row["endpoint"]) | |
| for k in field_props.keys(): | |
| row[k] = _format_value(ext.get(k, [] if field_props[k].get("type") == "array" else "")) | |
| output_rows.append(row) | |
| else: | |
| for chem in chemicals: | |
| for module in non_empty_modules: | |
| row = dict(base) | |
| row["chemical"] = chem | |
| row["endpoint"] = module | |
| row["record_id"] = _record_id(filename, chem, module) | |
| for k in field_props.keys(): | |
| m = field_key_to_module.get(k, "Custom") | |
| include = (m == module) or admin_mode | |
| if include: | |
| if k == "chemicals": | |
| row[k] = chem | |
| else: | |
| row[k] = _format_value(ext.get(k, [] if field_props[k].get("type") == "array" else "")) | |
| output_rows.append(row) | |
| df = pd.DataFrame(output_rows) | |
| records = df.to_dict("records") | |
| csv_path = tmpdir / "extraction_table.csv" | |
| json_path = tmpdir / "extraction_details.json" | |
| df.to_csv(csv_path, index=False) | |
| details_payload = { | |
| "papers": paper_details, | |
| "toxra_extensions": { | |
| "nlp_diagnostics": nlp_diagnostics, | |
| "regulatory_gap_assessment": {}, | |
| "risk_calculation_refs": [], | |
| }, | |
| } | |
| json_path.write_text(json.dumps(details_payload, indent=2, default=_json_default), encoding="utf-8") | |
| prefilled_template_path = export_prefilled_cancer_risk_template(records) | |
| choices = [r.get("record_id") for r in records if r.get("record_id")] | |
| default = choices[0] if choices else None | |
| vertical = _make_vertical(records, default) if default else pd.DataFrame(columns=["Field","Value"]) | |
| summary_html = render_summary_card(default, records) if default else render_summary_card("", []) | |
| allowed_fields = None | |
| file_for_evidence = None | |
| if default: | |
| selected_row = next((r for r in records if r.get("record_id") == default), {}) | |
| allowed_fields = set([k for k in selected_row.keys() if k not in {"record_id"}]) | |
| file_for_evidence = (default.split(" | ")[0] or "").strip() | |
| evidence = _render_evidence(paper_details, file_for_evidence, allowed_fields=allowed_fields) if file_for_evidence else "" | |
| overview = _overview_df_from_records(records) | |
| status = "✅ Done. Review in the report below and export when ready." | |
| return ( | |
| summary_html, | |
| overview, | |
| str(csv_path), | |
| str(json_path), | |
| str(prefilled_template_path), | |
| status, | |
| gr.update(choices=choices, value=default), | |
| records, | |
| paper_details, | |
| vertical, | |
| evidence | |
| ) | |
| # ============================= | |
| # Review mode handlers | |
| # ============================= | |
| def on_pick(record_id: str, records: List[Dict[str, Any]], details: List[Dict[str, Any]]): | |
| if not record_id: | |
| return render_summary_card("", []), pd.DataFrame(columns=["Field","Value"]), "" | |
| row = next((r for r in (records or []) if r.get("record_id") == record_id), {}) | |
| file_name = (row.get("file") or "") | |
| allowed_fields = set(row.keys()) - {"record_id"} | |
| return render_summary_card(record_id, records), _make_vertical(records, record_id), _render_evidence(details, file_name, allowed_fields=allowed_fields) | |
| def toggle_review_mode(is_on: bool): | |
| return gr.update(interactive=bool(is_on)) | |
| def save_review_changes(record_id: str, vertical_df: Any, records: List[Dict[str, Any]]): | |
| if not record_id or not records: | |
| return pd.DataFrame(), records, "Nothing to save.", render_summary_card("", []) | |
| try: | |
| dfv = vertical_df if isinstance(vertical_df, pd.DataFrame) else pd.DataFrame(vertical_df, columns=["Field", "Value"]) | |
| except Exception: | |
| return _overview_df_from_records(records), records, "Could not parse edited vertical table.", render_summary_card(record_id, records) | |
| dfv = dfv.dropna(subset=["Field"]) | |
| updates = {str(r["Field"]): r["Value"] for _, r in dfv.iterrows() if str(r["Field"]).strip()} | |
| new_records = [] | |
| updated = False | |
| for r in records: | |
| if r.get("record_id") == record_id: | |
| rr = dict(r) | |
| for k, v in updates.items(): | |
| rr[k] = v | |
| new_records.append(rr) | |
| updated = True | |
| else: | |
| new_records.append(r) | |
| msg = "Saved changes into session data. Export reviewed CSV to download." if updated else "Record not found." | |
| return _overview_df_from_records(new_records), new_records, msg, render_summary_card(record_id, new_records) | |
| def export_reviewed_csv(records: List[Dict[str, Any]]): | |
| if not records: | |
| return None, "No reviewed data to export." | |
| tmpdir = Path(tempfile.mkdtemp(prefix="tox_review_")) | |
| path = tmpdir / "reviewed_extraction_table.csv" | |
| pd.DataFrame(records).to_csv(path, index=False) | |
| return str(path), "Reviewed CSV ready to download." | |
| # ============================= | |
| # New modules: template, mapping, MCP batch | |
| # ============================= | |
| def _load_extraction_payload(file_obj: Any) -> Tuple[Any, List[Dict[str, Any]], Dict[str, Any]]: | |
| if file_obj is None: | |
| raise ValueError("Upload extraction_details.json first.") | |
| payload = json.loads(Path(file_obj.name).read_text(encoding="utf-8")) | |
| if isinstance(payload, list): | |
| return payload, payload, {} | |
| if isinstance(payload, dict): | |
| papers = payload.get("papers", []) | |
| if not isinstance(papers, list): | |
| raise ValueError("Invalid extraction_details.json format: papers must be a list.") | |
| ext = payload.get("toxra_extensions", {}) | |
| return payload, papers, (ext if isinstance(ext, dict) else {}) | |
| raise ValueError("Unsupported extraction_details.json format.") | |
| def export_blank_cancer_risk_template(): | |
| tmpdir = Path(tempfile.mkdtemp(prefix="tox_template_")) | |
| path = tmpdir / "cancer_risk_input_template.csv" | |
| pd.DataFrame(columns=CANCER_RISK_TEMPLATE_COLUMNS).to_csv(path, index=False) | |
| return str(path), "Blank cancer risk template ready." | |
| def export_prefilled_cancer_risk_template(records: List[Dict[str, Any]]): | |
| tmpdir = Path(tempfile.mkdtemp(prefix="tox_template_prefilled_")) | |
| path = tmpdir / "cancer_risk_input_template_prefilled.csv" | |
| if not records: | |
| pd.DataFrame(columns=CANCER_RISK_TEMPLATE_COLUMNS).to_csv(path, index=False) | |
| return str(path) | |
| rows: List[Dict[str, Any]] = [] | |
| seen = set() | |
| for r in records: | |
| rid = str(r.get("record_id", "")).strip() | |
| if not rid or rid in seen: | |
| continue | |
| seen.add(rid) | |
| route = str(r.get("exposure_route", "")).strip().lower() | |
| if route not in {"oral", "inhalation"}: | |
| route = "" | |
| casn = str(r.get("cas_numbers", "")).split(";")[0].strip() | |
| rows.append( | |
| { | |
| "record_id": rid, | |
| "chemical_name": str(r.get("chemical", "")).strip(), | |
| "casrn": casn, | |
| "route": route, | |
| "exposure_value": "", | |
| "exposure_unit": "", | |
| "body_weight_kg": "", | |
| "csf_value": "", | |
| "csf_unit": "", | |
| "iur_value": "", | |
| "air_conc_value": "", | |
| "air_conc_unit": "", | |
| "source_reference": str(r.get("file", "")).strip(), | |
| } | |
| ) | |
| df = pd.DataFrame(rows, columns=CANCER_RISK_TEMPLATE_COLUMNS) | |
| df.to_csv(path, index=False) | |
| return str(path) | |
| def run_regulatory_gap_assessment(extraction_json_file, framework: str, override_notes: str): | |
| if extraction_json_file is None: | |
| return pd.DataFrame(), "Upload extraction_details.json first.", None, None, "No input file." | |
| try: | |
| payload, _, _ = _load_extraction_payload(extraction_json_file) | |
| matrix_df, report, report_md = map_extraction_to_framework( | |
| extraction_payload=payload, | |
| framework=framework, | |
| catalog_dir="regulatory_catalog", | |
| override_notes=override_notes or "", | |
| ) | |
| except Exception as e: | |
| return pd.DataFrame(), f"(assessment unavailable: {e})", None, None, str(e) | |
| run_dir = make_run_dir(base_dir="runs") | |
| matrix_path = write_dataframe_csv(run_dir / "regulatory_gap_matrix.csv", matrix_df) | |
| report_path = write_json(run_dir / "regulatory_gap_report.json", report) | |
| write_markdown(run_dir / "regulatory_gap_report.md", report_md) | |
| md = "### Regulatory Gap Summary\n" + report_md | |
| status = f"✅ Gap assessment complete. Covered={report.get('summary', {}).get('covered', 0)} | Missing={report.get('summary', {}).get('missing', 0)}" | |
| return matrix_df, md, str(matrix_path), str(report_path), status | |
| def run_cancer_risk_batch_ui(input_csv_file): | |
| if input_csv_file is None: | |
| return pd.DataFrame(), None, None, None, "Upload a populated cancer risk input CSV." | |
| try: | |
| df = pd.read_csv(input_csv_file.name) | |
| except Exception as e: | |
| return pd.DataFrame(), None, None, None, f"Could not read CSV: {e}" | |
| missing = [c for c in CANCER_RISK_TEMPLATE_COLUMNS if c not in df.columns] | |
| if missing: | |
| return pd.DataFrame(), None, None, None, f"Missing required columns: {missing}" | |
| run_dir = make_run_dir(base_dir="runs") | |
| rows = df.fillna("").to_dict("records") | |
| try: | |
| result = run_batch_cancer_risk(rows, run_dir=str(run_dir)) | |
| except MCPClientError as e: | |
| return pd.DataFrame(), None, None, None, f"MCP server unavailable: {e}" | |
| except Exception as e: | |
| return pd.DataFrame(), None, None, None, f"Calculation failed: {e}" | |
| result_rows = result.get("rows", []) if isinstance(result.get("rows", []), list) else [] | |
| out_df = pd.DataFrame(result_rows) | |
| result_csv_path = write_dataframe_csv(run_dir / "cancer_risk_results.csv", out_df) | |
| write_json(run_dir / "cancer_risk_results.json", result) | |
| artifacts = result.get("artifacts", {}) if isinstance(result, dict) else {} | |
| log_path = artifacts.get("log_jsonl", str(run_dir / "cancer_risk_log.jsonl")) | |
| report_path = artifacts.get("report_md", str(run_dir / "cancer_risk_report.md")) | |
| summ = result.get("summary", {}) | |
| status = ( | |
| f"✅ Batch complete. total={summ.get('total_rows', 0)} " | |
| f"ok={summ.get('ok_rows', 0)} error={summ.get('error_rows', 0)}" | |
| ) | |
| return out_df, str(result_csv_path), str(log_path), str(report_path), status | |
| # ============================= | |
| # Synthesis tab handler | |
| # ============================= | |
| def run_synthesis(api_key, model, extraction_json_file): | |
| if extraction_json_file is None: | |
| return "Upload the extraction_details.json from Extract tab first." | |
| try: | |
| client = get_openai_client(api_key) | |
| except Exception as e: | |
| return str(e) | |
| payload = json.loads(Path(extraction_json_file.name).read_text(encoding="utf-8")) | |
| rows = payload.get("papers", payload) if isinstance(payload, dict) else payload | |
| if not isinstance(rows, list): | |
| return "Invalid extraction JSON format for synthesis." | |
| return openai_synthesize_across_papers(client, model, rows) | |
| # ============================= | |
| # Admin visibility helpers | |
| # ============================= | |
| def set_admin_visibility(is_admin: bool): | |
| return ( | |
| gr.update(visible=bool(is_admin)), | |
| gr.update(visible=bool(is_admin)), | |
| gr.update(visible=bool(is_admin)) | |
| ) | |
| # ============================= | |
| # Gradio UI | |
| # ============================= | |
| with gr.Blocks(title="Toxicology PDF → Grounded Extractor", css=APP_CSS) as demo: | |
| gr.HTML( | |
| """ | |
| <div class="hero"> | |
| <div class="hero-left"> | |
| <div class="hero-title">TOXRA.AI</div> | |
| <div class="hero-sub">Grounded toxicology extraction & literature exploration</div> | |
| <div class="hero-pills"> | |
| <span class="hero-pill">Text-based PDFs only</span> | |
| <span class="hero-pill">Results-first reporting</span> | |
| <span class="hero-pill">Admin-configurable extraction</span> | |
| </div> | |
| </div> | |
| <div class="hero-right"> | |
| <span class="hero-status">Production · Beta</span> | |
| </div> | |
| </div> | |
| """ | |
| ) | |
| state_records = gr.State([]) | |
| state_details = gr.State([]) | |
| vocab_state = gr.State({}) | |
| field_rows_state = gr.State([]) | |
| field_spec = gr.Textbox(visible=False, interactive=False, lines=8) | |
| vocab_json = gr.Textbox(visible=False, interactive=False, lines=8) | |
| with gr.Tab("Extract"): | |
| with gr.Row(elem_classes="split-row"): | |
| with gr.Column(scale=4, min_width=320, elem_classes="left-rail"): | |
| with gr.Group(elem_classes="card"): | |
| gr.Markdown("Extract setup", elem_classes="section-title") | |
| files = gr.File(label="Upload toxicology PDFs", file_types=[".pdf"], file_count="multiple") | |
| with gr.Row(): | |
| api_key = gr.Textbox(label="OpenAI API key (optional if set as OPENAI_API_KEY secret)", type="password") | |
| model = gr.Dropdown(label="Model", choices=["gpt-4o-2024-08-06", "gpt-4o", "gpt-4o-mini"], value="gpt-4o-2024-08-06") | |
| with gr.Row(): | |
| endpoint_preset = gr.Dropdown( | |
| label="Endpoint preset", | |
| choices=list(ENDPOINT_PRESETS.keys()), | |
| value="Required – Safety Assessor" | |
| ) | |
| endpoints = gr.Dropdown( | |
| label="Endpoints to extract (Core included automatically)", | |
| choices=list(ENDPOINT_MODULES.keys()), | |
| multiselect=True, | |
| value=ENDPOINT_PRESETS["Required – Safety Assessor"] | |
| ) | |
| extract_btn = gr.Button("Run Extraction", variant="primary") | |
| status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Accordion("Advanced runtime settings", open=False, elem_classes="card"): | |
| with gr.Row(): | |
| max_pages = gr.Slider(0, 250, value=0, step=1, label="Max pages to read (0 = all)") | |
| chunk_chars = gr.Slider(1200, 9000, value=3200, step=100, label="Chunk size (chars)") | |
| max_context_chars = gr.Slider(5000, 45000, value=20000, step=1000, label="Max context sent to GPT (chars)") | |
| with gr.Accordion("Admin tools (taxonomy + custom columns)", open=False, elem_classes="card"): | |
| admin_mode = gr.Checkbox(label="Enable Admin mode", value=False) | |
| admin_group = gr.Group(visible=False) | |
| admin_vocab_group = gr.Group(visible=False) | |
| admin_fields_group = gr.Group(visible=False) | |
| with admin_group: | |
| gr.Markdown("### Admin: Configure extraction taxonomy + custom columns.") | |
| with admin_vocab_group: | |
| gr.Markdown("### Controlled vocabulary (lists only)") | |
| vocab_category = gr.Dropdown(label="Category (lists only)", choices=[], value=None) | |
| vocab_search = gr.Textbox(label="Search terms", placeholder="Type to filter (e.g., 471, AMES, comet)", lines=1) | |
| with gr.Row(): | |
| vocab_term_add = gr.Textbox(label="Add term", placeholder="type term and click Add") | |
| vocab_add_btn = gr.Button("Add") | |
| with gr.Row(): | |
| vocab_term_remove = gr.Textbox(label="Remove term", placeholder="type exact term and click Remove") | |
| vocab_remove_btn = gr.Button("Remove") | |
| vocab_apply_btn = gr.Button("Apply full list to category") | |
| vocab_reset_btn = gr.Button("Reset vocab to defaults") | |
| vocab_terms_df = gr.Dataframe(headers=["term"], label="Terms (full list; edit directly)", interactive=True, wrap=True) | |
| vocab_terms_filtered = gr.Dataframe(headers=["term"], label="Filtered preview (read-only)", interactive=False, wrap=True) | |
| vocab_status = gr.Textbox(label="Vocab status", interactive=False) | |
| with gr.Accordion("Raw vocab JSON (auto-generated)", open=False): | |
| vocab_json_admin = gr.Textbox(label="Controlled vocab JSON", lines=12, interactive=False) | |
| with admin_fields_group: | |
| gr.Markdown("### Custom columns (Field Builder)") | |
| gr.Markdown("Tip: Use endpoint selection to start, then tweak fields.") | |
| with gr.Row(): | |
| admin_apply_endpoints_btn = gr.Button("Load selected endpoints into builder (Replace)", variant="secondary") | |
| fields_apply_btn = gr.Button("Apply builder table") | |
| with gr.Row(): | |
| field_name_in = gr.Textbox(label="Field name", placeholder="e.g., genotoxicity_result") | |
| field_type_in = gr.Dropdown(label="Type", choices=TYPE_CHOICES, value="str") | |
| enum_values_in = gr.Textbox(label="Enum values (comma-separated; for enum/list[enum])", placeholder="a,b,c", lines=2) | |
| instructions_in = gr.Textbox(label="Instructions", placeholder="Tell the extractor exactly what to pull.", lines=2) | |
| add_update_field_btn = gr.Button("Add/Update field") | |
| fields_df = gr.Dataframe( | |
| label="Fields (edit and click Apply)", | |
| headers=["field","type","enum_values","instructions"], | |
| interactive=True, | |
| wrap=True | |
| ) | |
| fields_status = gr.Textbox(label="Field builder status", interactive=False) | |
| with gr.Column(scale=7, min_width=480, elem_classes="right-panel"): | |
| with gr.Tabs(elem_classes="report-tabs"): | |
| with gr.Tab("Overview"): | |
| with gr.Group(elem_classes="card"): | |
| gr.Markdown("Report overview", elem_classes="section-title") | |
| summary_card = gr.HTML(render_summary_card("", [])) | |
| with gr.Group(elem_classes="card"): | |
| overview_df = gr.Dataframe( | |
| label="Batch Overview", | |
| interactive=False, | |
| wrap=True, | |
| show_row_numbers=True | |
| ) | |
| with gr.Tab("Record"): | |
| with gr.Group(elem_classes="card"): | |
| record_pick = gr.Dropdown(label="Select record", choices=[], value=None) | |
| with gr.Row(): | |
| review_mode = gr.Checkbox(label="Review mode (enable editing)", value=False) | |
| save_btn = gr.Button("Save edits") | |
| export_btn = gr.Button("Export reviewed CSV") | |
| review_status = gr.Textbox(label="Review status", interactive=False) | |
| with gr.Group(elem_classes="card"): | |
| vertical_view = gr.Dataframe( | |
| headers=["Field", "Value"], | |
| interactive=False, | |
| wrap=True, | |
| show_row_numbers=False, | |
| label="Extracted fields (vertical)" | |
| ) | |
| with gr.Tab("Evidence"): | |
| with gr.Group(elem_classes="card"): | |
| evidence_md = gr.Markdown() | |
| with gr.Tab("Exports"): | |
| with gr.Group(elem_classes="card"): | |
| out_csv = gr.File(label="Download: extraction_table.csv") | |
| out_json = gr.File(label="Download: extraction_details.json (evidence + structured data)") | |
| risk_template_prefilled = gr.File(label="Download: cancer_risk_input_template_prefilled.csv (record_id linked)") | |
| reviewed_csv = gr.File(label="Download: reviewed_extraction_table.csv") | |
| # --- Wiring --- | |
| admin_mode.change( | |
| fn=set_admin_visibility, | |
| inputs=[admin_mode], | |
| outputs=[admin_group, admin_vocab_group, admin_fields_group] | |
| ) | |
| endpoint_preset.change( | |
| fn=apply_endpoint_preset, | |
| inputs=[endpoint_preset], | |
| outputs=[endpoints] | |
| ) | |
| endpoints.change( | |
| fn=sync_fields_from_endpoints, | |
| inputs=[endpoints, admin_mode, field_rows_state, field_spec], | |
| outputs=[field_rows_state, fields_df, field_spec, status] | |
| ) | |
| extract_btn.click( | |
| fn=run_extraction, | |
| inputs=[files, api_key, model, endpoints, field_spec, vocab_json, max_pages, chunk_chars, max_context_chars, admin_mode], | |
| outputs=[summary_card, overview_df, out_csv, out_json, risk_template_prefilled, status, record_pick, state_records, state_details, vertical_view, evidence_md] | |
| ) | |
| record_pick.change( | |
| fn=on_pick, | |
| inputs=[record_pick, state_records, state_details], | |
| outputs=[summary_card, vertical_view, evidence_md] | |
| ) | |
| review_mode.change(fn=toggle_review_mode, inputs=[review_mode], outputs=[vertical_view]) | |
| save_btn.click( | |
| fn=save_review_changes, | |
| inputs=[record_pick, vertical_view, state_records], | |
| outputs=[overview_df, state_records, review_status, summary_card] | |
| ) | |
| export_btn.click( | |
| fn=export_reviewed_csv, | |
| inputs=[state_records], | |
| outputs=[reviewed_csv, review_status] | |
| ) | |
| # Admin vocab wiring | |
| vocab_search.change(fn=vocab_filter_preview, inputs=[vocab_terms_df, vocab_search], outputs=[vocab_terms_filtered]) | |
| vocab_category.change( | |
| fn=vocab_load_category, | |
| inputs=[vocab_state, vocab_category, vocab_search], | |
| outputs=[vocab_terms_df, vocab_terms_filtered, vocab_status] | |
| ) | |
| vocab_add_btn.click( | |
| fn=vocab_add_term, | |
| inputs=[vocab_state, vocab_category, vocab_term_add, vocab_search], | |
| outputs=[vocab_terms_df, vocab_terms_filtered, vocab_term_add, vocab_status] | |
| ) | |
| vocab_remove_btn.click( | |
| fn=vocab_remove_term, | |
| inputs=[vocab_state, vocab_category, vocab_term_remove, vocab_search], | |
| outputs=[vocab_terms_df, vocab_terms_filtered, vocab_term_remove, vocab_status] | |
| ) | |
| vocab_apply_btn.click( | |
| fn=vocab_apply_df, | |
| inputs=[vocab_state, vocab_category, vocab_terms_df, vocab_search], | |
| outputs=[vocab_json_admin, vocab_terms_filtered, vocab_status] | |
| ).then( | |
| fn=lambda x: x, | |
| inputs=[vocab_json_admin], | |
| outputs=[vocab_json] | |
| ) | |
| vocab_reset_btn.click( | |
| fn=vocab_reset_defaults_ui, | |
| inputs=None, | |
| outputs=[vocab_state, vocab_category, vocab_terms_df, vocab_terms_filtered, vocab_json_admin, vocab_status, vocab_json] | |
| ) | |
| # Admin field builder wiring | |
| admin_apply_endpoints_btn.click( | |
| fn=admin_apply_endpoints, | |
| inputs=[endpoints], | |
| outputs=[field_rows_state, fields_df, field_spec, fields_status] | |
| ) | |
| add_update_field_btn.click( | |
| fn=fields_add_or_update, | |
| inputs=[field_name_in, field_type_in, enum_values_in, instructions_in, field_rows_state], | |
| outputs=[field_rows_state, fields_df, field_spec, fields_status] | |
| ) | |
| fields_apply_btn.click( | |
| fn=fields_apply_df, | |
| inputs=[field_rows_state, fields_df], | |
| outputs=[field_rows_state, fields_df, field_spec, fields_status] | |
| ) | |
| # Init | |
| def _init_all(): | |
| vocab, keys, k0, full_df, filtered_df, vjson, vmsg = vocab_init_state(DEFAULT_CONTROLLED_VOCAB_JSON) | |
| default_endpoints = ENDPOINT_PRESETS["Required – Safety Assessor"] | |
| rows, _, _ = build_rows_from_endpoints(default_endpoints) | |
| fdf = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"]) | |
| fspec = build_spec_from_field_rows(rows) | |
| return ( | |
| vocab, | |
| gr.update(choices=keys, value=k0), | |
| full_df, | |
| filtered_df, | |
| vjson, | |
| vmsg, | |
| vjson, | |
| rows, | |
| fdf, | |
| fspec, | |
| "✅ Ready." | |
| ) | |
| demo.load( | |
| _init_all, | |
| inputs=None, | |
| outputs=[ | |
| vocab_state, | |
| vocab_category, | |
| vocab_terms_df, | |
| vocab_terms_filtered, | |
| vocab_json_admin, | |
| vocab_status, | |
| vocab_json, | |
| field_rows_state, | |
| fields_df, | |
| field_spec, | |
| status | |
| ] | |
| ) | |
| with gr.Tab("Literature Explorer"): | |
| build_literature_explorer_tab() | |
| with gr.Tab("Cross-paper Synthesis"): | |
| with gr.Group(elem_classes="card"): | |
| gr.Markdown("Upload `extraction_details.json` from Extract tab. Synthesis is based strictly on grounded extractions.") | |
| api_key2 = gr.Textbox(label="OpenAI API key (optional if set as OPENAI_API_KEY secret)", type="password") | |
| model2 = gr.Dropdown(label="Model", choices=["gpt-4o-2024-08-06", "gpt-4o", "gpt-4o-mini"], value="gpt-4o-2024-08-06") | |
| extraction_json_file = gr.File(label="Upload extraction_details.json", file_types=[".json"], file_count="single") | |
| synth_btn = gr.Button("Synthesize Across Papers") | |
| synth_md = gr.Markdown() | |
| synth_btn.click(fn=run_synthesis, inputs=[api_key2, model2, extraction_json_file], outputs=[synth_md]) | |
| with gr.Tab("Regulatory Gap Assessment"): | |
| with gr.Group(elem_classes="card"): | |
| gr.Markdown( | |
| "Run clause-level mapping against regulatory catalogs. " | |
| "Use `extraction_details.json` from Extract tab." | |
| ) | |
| with gr.Row(): | |
| reg_extraction_json = gr.File(label="Upload extraction_details.json", file_types=[".json"], file_count="single") | |
| reg_framework = gr.Dropdown(label="Framework profile", choices=["FDA CTP", "EPA"], value="FDA CTP") | |
| reg_override_notes = gr.Textbox( | |
| label="Override notes (optional)", | |
| lines=2, | |
| placeholder="Context to include in gap prompts." | |
| ) | |
| reg_run_btn = gr.Button("Run Regulatory Gap Assessment", variant="primary") | |
| reg_status = gr.Textbox(label="Status", interactive=False) | |
| reg_summary_md = gr.Markdown() | |
| reg_matrix_df = gr.Dataframe(label="Clause-level gap matrix", interactive=False, wrap=True) | |
| reg_matrix_file = gr.File(label="Download: regulatory_gap_matrix.csv") | |
| reg_report_file = gr.File(label="Download: regulatory_gap_report.json") | |
| reg_run_btn.click( | |
| fn=run_regulatory_gap_assessment, | |
| inputs=[reg_extraction_json, reg_framework, reg_override_notes], | |
| outputs=[reg_matrix_df, reg_summary_md, reg_matrix_file, reg_report_file, reg_status] | |
| ) | |
| with gr.Tab("Cancer Risk Calculator"): | |
| with gr.Group(elem_classes="card"): | |
| gr.Markdown( | |
| "Deterministic FDA/EPA cancer risk calculations routed through a dedicated local MCP server. " | |
| "Use `record_id` values from extraction outputs for traceability." | |
| ) | |
| with gr.Row(): | |
| template_btn = gr.Button("Download Blank CSV Template") | |
| template_file = gr.File(label="Download: cancer_risk_input_template.csv") | |
| template_status = gr.Textbox(label="Template status", interactive=False) | |
| template_btn.click(fn=export_blank_cancer_risk_template, inputs=None, outputs=[template_file, template_status]) | |
| risk_input_csv = gr.File(label="Upload populated cancer risk input CSV", file_types=[".csv"], file_count="single") | |
| risk_run_btn = gr.Button("Run Cancer Risk Batch", variant="primary") | |
| risk_status = gr.Textbox(label="Status", interactive=False) | |
| risk_results_df = gr.Dataframe(label="Cancer risk results", interactive=False, wrap=True) | |
| risk_results_csv = gr.File(label="Download: cancer_risk_results.csv") | |
| risk_log_file = gr.File(label="Download: cancer_risk_log.jsonl") | |
| risk_report_file = gr.File(label="Download: cancer_risk_report.md") | |
| risk_run_btn.click( | |
| fn=run_cancer_risk_batch_ui, | |
| inputs=[risk_input_csv], | |
| outputs=[risk_results_df, risk_results_csv, risk_log_file, risk_report_file, risk_status] | |
| ) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", "7860")) | |
| demo.queue().launch(server_name="0.0.0.0", server_port=port) | |