NLP_Project / app.py
hchevva's picture
Upload app.py
40a8012 verified
import os
import re
import json
import tempfile
from pathlib import Path
from typing import Dict, List, Tuple, Any, Optional
import gradio as gr
import numpy as np
import pandas as pd
from pypdf import PdfReader
try:
from sklearn.feature_extraction.text import TfidfVectorizer
except Exception: # pragma: no cover - fallback path for minimal runtime
TfidfVectorizer = None
from openai import OpenAI
from literature_explorer import build_literature_explorer_tab
from toxra_core.artifacts import make_run_dir, write_dataframe_csv, write_json, write_markdown
from toxra_core.calculation_client import MCPClientError, run_batch_cancer_risk
from toxra_core.contracts import CANCER_RISK_TEMPLATE_COLUMNS
from toxra_core.nlp_pipeline import extract_evidence_span, expand_regulatory_queries, hybrid_rank_text_items
from toxra_core.regulatory_mapper import map_extraction_to_framework
# =============================
# UI theme
# =============================
APP_CSS = """
@import url('https://fonts.googleapis.com/css2?family=IBM+Plex+Sans:wght@400;500;600;700&display=swap');
:root {
--bg: #f5f7fb;
--panel: #ffffff;
--ink: #0f172a;
--muted: #516079;
--line: #e2e8f0;
--accent: #2563eb;
--accent-2: #0ea5e9;
--accent-soft: #e6efff;
--shadow: 0 10px 28px rgba(15, 23, 42, 0.08);
--radius: 14px;
}
.gradio-container {
background: var(--bg);
color: var(--ink);
font-family: "IBM Plex Sans", ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, "Helvetica Neue", Arial, "Noto Sans", "Apple Color Emoji", "Segoe UI Emoji";
}
.hero {
background: linear-gradient(180deg, #edf3ff 0%, #f4f8ff 100%);
color: var(--ink);
border-radius: 16px;
padding: 18px 22px;
box-shadow: var(--shadow);
border: 1px solid #dbe5f4;
display: flex;
align-items: center;
justify-content: space-between;
gap: 16px;
flex-wrap: wrap;
}
.hero-left { min-width: 240px; }
.hero-right { margin-left: auto; }
.hero-title { font-size: 22px; font-weight: 700; letter-spacing: 0.08em; }
.hero-sub { margin-top: 4px; font-size: 13px; color: #3b4b63; }
.hero-pills { margin-top: 10px; display: flex; gap: 8px; flex-wrap: wrap; }
.hero-pill {
background: var(--accent-soft);
color: #1e3a8a;
border: 1px solid #d6e3f6;
border-radius: 999px;
padding: 4px 10px;
font-size: 11px;
font-weight: 600;
}
.hero-status {
background: #ffffff;
color: #334155;
border: 1px solid #d9e2ef;
border-radius: 999px;
padding: 6px 12px;
font-size: 12px;
font-weight: 600;
box-shadow: 0 6px 16px rgba(15, 23, 42, 0.06);
}
.split-row { gap: 18px; }
.card {
background: var(--panel);
border: 1px solid var(--line);
border-radius: var(--radius);
padding: 16px;
box-shadow: var(--shadow);
}
.left-rail .card + .card { margin-top: 16px; }
.right-panel .card { margin-bottom: 14px; }
.section-title {
font-size: 12px;
text-transform: uppercase;
letter-spacing: 0.14em;
color: var(--muted);
margin-bottom: 8px;
}
.gradio-container input,
.gradio-container textarea,
.gradio-container select {
border-radius: 10px !important;
border-color: var(--line) !important;
}
.gradio-container button.primary {
background: var(--accent) !important;
border-color: var(--accent) !important;
}
.gradio-container button.primary:hover { background: #1d4ed8 !important; }
.gradio-container .tab-nav { gap: 8px; }
.gradio-container .tab-nav button {
background: var(--panel);
border: 1px solid var(--line);
border-radius: 999px;
padding: 6px 14px;
font-size: 12px;
color: var(--muted);
}
.gradio-container .tab-nav button.selected {
background: var(--accent);
border-color: var(--accent);
color: #ffffff;
}
.gradio-container .accordion {
border: 1px solid var(--line);
border-radius: var(--radius);
}
"""
# =============================
# Defaults
# =============================
DEFAULT_CONTROLLED_VOCAB_JSON = """{
"risk_stance_enum": ["acceptable","acceptable_with_uncertainty","not_acceptable","insufficient_data"],
"fda_ctp_tier_enum": ["Tier_1_high_priority","Tier_2_moderate_priority","Tier_3_lower_priority","enough data is not available"],
"approach_enum": ["in_vivo","in_vitro","in_silico","nams","mixed","not_reported"],
"in_silico_method_enum": [
"qsar","read_across","molecular_docking","molecular_dynamics","pbpk_pbtK","aop_based","ml_model","other","not_reported"
],
"nams_method_enum": [
"high_throughput_screening_hts","omics_transcriptomics","omics_proteomics","omics_metabolomics",
"organ_on_chip","microphysiological_system_mps","3d_tissue_model","in_chemico_assay",
"in_silico_as_nams","other","not_reported"
],
"exposure_route_enum": ["oral","inhalation","dermal","parenteral","multiple","not_reported"],
"species_enum": ["human","rat","mouse","rabbit","dog","non_human_primate","cell_line","other","not_reported"],
"genotoxicity_oecd_tg_in_vitro_enum": [
"OECD_TG_471_Bacterial Reverse mutation test(AMES test)",
"OECD_TG_473_In Vitro Mammalian Chromosomal Aberration Test",
"OECD_TG_476_In Vitro Mammalian Cell Gene Mutation Tests (Hprt & xprt)",
"OECD_TG_487_In Vitro Mammalian Cell Micronucleus Test",
"OECD_TG_490_In Vitro Mammalian Cell Gene Mutation Tests (Thymidine Kinase)",
"not_reported"
],
"genotoxicity_oecd_tg_in_vivo_enum": [
"OECD_TG_474_In Vivo Mammalian Erythrocyte Micronucleus Test",
"OECD_TG_475_Mammalian Bone Marrow Chromosomal Aberration Test",
"OECD_TG_488_Transgenic Rodent Somatic & Germ Cell Gene Mutation Assays",
"OECD_TG_489_In Vivo Mammalian Alkaline Comet Assay",
"not_reported"
],
"genotoxicity_result_enum": ["positive","negative","equivocal","not_reported"],
"binary_result_enum": ["positive","negative","equivocal","not_reported"],
"carcinogenicity_result_enum": ["carcinogenic","not_carcinogenic","insufficient_data","not_reported"]
}"""
# =============================
# Endpoint modules (what users choose)
# =============================
PRESET_CORE = [
{"field": "chemicals", "type": "list[str]", "enum_values": "", "instructions": "List chemical(s) studied. If multiple, include each separately."},
{"field": "cas_numbers", "type": "list[str]", "enum_values": "", "instructions": "Extract CAS number(s) mentioned (may be multiple)."},
{"field": "study_type", "type": "enum", "enum_values": "in_vivo,in_vitro,epidemiology,in_silico,review,methodology,other,not_reported", "instructions": "Choose best match."},
{"field": "exposure_route", "type": "enum", "enum_values": "oral,inhalation,dermal,parenteral,multiple,not_reported", "instructions": "Choose best match."},
{"field": "species", "type": "enum", "enum_values": "human,rat,mouse,rabbit,dog,non_human_primate,cell_line,other,not_reported", "instructions": "Choose best match."},
{"field": "dose_metrics", "type": "list[str]", "enum_values": "", "instructions": "Capture NOAEL/LOAEL/BMD/BMDL/LD50/LC50 etc with units and route if available."},
{"field": "key_findings", "type": "str", "enum_values": "", "instructions": "2–4 short sentences summarizing major findings. Grounded to text."},
{"field": "conclusion", "type": "str", "enum_values": "", "instructions": "Paper's conclusion about safety/risk (grounded)."},
]
PRESET_NAMS_INSILICO = [
{"field": "approach", "type": "enum", "enum_values": "in_vivo,in_vitro,in_silico,nams,mixed,not_reported", "instructions": "Identify if results are in silico or NAMs; use mixed if multiple."},
{"field": "in_silico_methods", "type": "list[enum]", "enum_values": "qsar,read_across,molecular_docking,molecular_dynamics,pbpk_pbtK,aop_based,ml_model,other,not_reported", "instructions": "If in_silico, list methods used (multiple allowed)."},
{"field": "nams_methods", "type": "list[enum]", "enum_values": "high_throughput_screening_hts,omics_transcriptomics,omics_proteomics,omics_metabolomics,organ_on_chip,microphysiological_system_mps,3d_tissue_model,in_chemico_assay,in_silico_as_nams,other,not_reported", "instructions": "If NAMs, list methods used (multiple allowed)."},
{"field": "nams_or_insilico_key_results", "type": "str", "enum_values": "", "instructions": "Summarize in silico / NAMs results and key metrics (grounded)."},
]
PRESET_GENOTOX_OECD = [
{
"field": "genotox_oecd_tg_in_vitro",
"type": "list[enum]",
"enum_values": "OECD_TG_471_Bacterial Reverse mutation test(AMES test),OECD_TG_473_In Vitro Mammalian Chromosomal Aberration Test,OECD_TG_476_In Vitro Mammalian Cell Gene Mutation Tests (Hprt & xprt),OECD_TG_487_In Vitro Mammalian Cell Micronucleus Test,OECD_TG_490_In Vitro Mammalian Cell Gene Mutation Tests (Thymidine Kinase),not_reported",
"instructions": "Select all in vitro OECD TGs explicitly reported (or clearly described). If none, use not_reported."
},
{
"field": "genotox_oecd_tg_in_vivo",
"type": "list[enum]",
"enum_values": "OECD_TG_474_In Vivo Mammalian Erythrocyte Micronucleus Test,OECD_TG_475_Mammalian Bone Marrow Chromosomal Aberration Test,OECD_TG_488_Transgenic Rodent Somatic & Germ Cell Gene Mutation Assays,OECD_TG_489_In Vivo Mammalian Alkaline Comet Assay,not_reported",
"instructions": "Select all in vivo OECD TGs explicitly reported (or clearly described). If none, use not_reported."
},
{
"field": "fda_ctp_carcinogenicity_tier",
"type": "enum",
"enum_values": "Tier_1_high_priority,Tier_2_moderate_priority,Tier_3_lower_priority,enough data is not available",
"instructions": "Assign FDA CTP carcinogenicity/genotoxicity tier based strictly on provided evidence. If decision cannot be made from excerpts, use exactly: enough data is not available."
},
{"field": "genotoxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Classify overall genotoxicity outcome as reported. If unclear, not_reported."},
{"field": "genotoxicity_result_notes", "type": "str", "enum_values": "", "instructions": "Short explanation grounded to text + test context (e.g., AMES, micronucleus)."},
]
PRESET_ACUTE_TOX = [
{"field": "acute_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "If acute toxicity is assessed, classify as positive/negative/equivocal; otherwise not_reported."},
{"field": "acute_toxicity_key_metrics", "type": "list[str]", "enum_values": "", "instructions": "Extract LD50/LC50/EC50/IC50 etc with units/route/species if available."},
{"field": "acute_toxicity_notes", "type": "str", "enum_values": "", "instructions": "Grounded summary of acute toxicity findings."},
]
PRESET_REPEATED_DOSE = [
{"field": "repeated_dose_noael_loael", "type": "list[str]", "enum_values": "", "instructions": "Extract NOAEL/LOAEL (and study duration) with units/route if available."},
{"field": "repeated_dose_target_organs", "type": "list[str]", "enum_values": "", "instructions": "List target organs/critical effects explicitly reported."},
{"field": "repeated_dose_notes", "type": "str", "enum_values": "", "instructions": "Grounded summary of repeated-dose toxicity conclusions."},
]
PRESET_IRR_SENS = [
{"field": "skin_irritation_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Skin irritation outcome (as reported)."},
{"field": "eye_irritation_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Eye irritation outcome (as reported)."},
{"field": "skin_sensitization_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Skin sensitization outcome (as reported)."},
{"field": "irritation_sensitization_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including method/model if stated."},
]
PRESET_REPRO_DEV = [
{"field": "reproductive_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Reproductive toxicity outcome (as reported)."},
{"field": "developmental_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Developmental toxicity outcome (as reported)."},
{"field": "repro_dev_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including endpoints and study design if stated."},
]
PRESET_CARCINOGENICITY = [
{"field": "carcinogenicity_result", "type": "enum", "enum_values": "carcinogenic,not_carcinogenic,insufficient_data,not_reported", "instructions": "As reported. If evidence insufficient, insufficient_data."},
{"field": "carcinogenicity_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including species, duration, tumor findings if stated."},
]
ENDPOINT_MODULES: Dict[str, List[Dict[str, Any]]] = {
"Genotoxicity (OECD TG)": PRESET_GENOTOX_OECD,
"NAMs / In Silico": PRESET_NAMS_INSILICO,
"Acute toxicity": PRESET_ACUTE_TOX,
"Repeated dose toxicity": PRESET_REPEATED_DOSE,
"Irritation / Sensitization": PRESET_IRR_SENS,
"Repro / Developmental": PRESET_REPRO_DEV,
"Carcinogenicity": PRESET_CARCINOGENICITY,
}
# Endpoint presets (requested)
ENDPOINT_PRESETS: Dict[str, List[str]] = {
"Required – Safety Assessor": [
"Genotoxicity (OECD TG)",
"Repeated dose toxicity",
"Irritation / Sensitization",
"Repro / Developmental",
"Acute toxicity",
],
"Core only (fast)": [],
"Screening – NAMs + Genotox": ["NAMs / In Silico", "Genotoxicity (OECD TG)"],
"Full – All endpoints": list(ENDPOINT_MODULES.keys()),
}
ENDPOINT_QUERY_HINTS: Dict[str, List[str]] = {
"Genotoxicity (OECD TG)": ["genotoxicity", "mutagenicity", "AMES", "micronucleus", "comet assay", "chromosomal aberration", "OECD TG 471 473 476 487 490 474 489", "carcinogenicity tiering", "FDA CTP tier"],
"NAMs / In Silico": ["in silico", "QSAR", "read-across", "AOP", "PBPK", "high-throughput", "omics", "organ-on-chip", "microphysiological"],
"Acute toxicity": ["acute toxicity", "LD50", "LC50", "single dose", "lethality", "mortality"],
"Repeated dose toxicity": ["repeated dose", "subchronic", "chronic", "NOAEL", "LOAEL", "target organ", "90-day", "28-day"],
"Irritation / Sensitization": ["skin irritation", "eye irritation", "sensitization", "LLNA", "Draize"],
"Repro / Developmental": ["reproductive toxicity", "fertility", "developmental toxicity", "teratogenic", "prenatal", "postnatal"],
"Carcinogenicity": ["carcinogenicity", "tumor", "neoplasm", "cancer", "two-year bioassay"],
}
# =============================
# PDF extraction (text-based PDFs only)
# =============================
def extract_pages_from_pdf(pdf_path: str, max_pages: int = 0) -> Tuple[List[Tuple[int, str]], int]:
reader = PdfReader(pdf_path)
page_count = len(reader.pages)
pages_to_read = page_count if (max_pages is None or max_pages <= 0) else min(page_count, int(max_pages))
pages: List[Tuple[int, str]] = []
for i in range(pages_to_read):
try:
t = reader.pages[i].extract_text() or ""
except Exception:
t = ""
pages.append((i + 1, t or ""))
return pages, page_count
def clean_text(t: str) -> str:
t = t or ""
t = t.replace("\x00", " ")
t = re.sub(r"\s+", " ", t).strip()
return t
def chunk_pages(pages: List[Tuple[int, str]], target_chars: int = 3000) -> List[Dict[str, Any]]:
chunks = []
buf = []
start_page = None
cur_len = 0
for pno, txt in pages:
txt = clean_text(txt)
if not txt:
continue
if start_page is None:
start_page = pno
if cur_len + len(txt) + 1 > target_chars and buf:
end_page = pno - 1
end_page = end_page if end_page >= start_page else start_page
chunks.append({"pages": f"{start_page}-{end_page}", "text": " ".join(buf)})
buf = [txt]
start_page = pno
cur_len = len(txt)
else:
buf.append(txt)
cur_len += len(txt) + 1
if buf and start_page is not None:
end_page = pages[-1][0] if pages else start_page
chunks.append({"pages": f"{start_page}-{end_page}", "text": " ".join(buf)})
return chunks
def _text_based_pdf_warning(pages: List[Tuple[int, str]]) -> bool:
joined = " ".join([clean_text(t) for _, t in pages if clean_text(t)])
return len(joined.strip()) < 200
# =============================
# Lightweight retrieval (TF-IDF)
# =============================
def select_relevant_chunks(
chunks: List[Dict[str, Any]],
queries: List[str],
top_per_query: int = 2,
max_chunks: int = 12
) -> List[Dict[str, Any]]:
texts = [c["text"] for c in chunks]
if not texts:
return []
if TfidfVectorizer is None:
selected_idx: List[int] = []
for q in queries:
q_tokens = set([w for w in re.findall(r"[a-zA-Z0-9\\-]+", (q or "").lower()) if len(w) >= 3])
scored = []
for i, t in enumerate(texts):
tl = t.lower()
scored.append((sum(1 for tok in q_tokens if tok in tl), i))
scored.sort(key=lambda x: x[0], reverse=True)
for _, i in scored[:top_per_query]:
if i not in selected_idx:
selected_idx.append(i)
if not selected_idx:
selected_idx = list(range(min(len(chunks), max_chunks)))
return [chunks[i] for i in selected_idx[:max_chunks]]
vectorizer = TfidfVectorizer(stop_words="english", ngram_range=(1, 2), max_features=20000)
X = vectorizer.fit_transform(texts)
selected_idx: List[int] = []
for q in queries:
q = (q or "").strip()
if not q:
continue
qv = vectorizer.transform([q])
sims = (X @ qv.T).toarray().ravel()
idx = np.argsort(sims)[::-1]
for i in idx[:top_per_query]:
if i not in selected_idx:
selected_idx.append(i)
if not selected_idx:
selected_idx = list(range(min(len(chunks), max_chunks)))
return [chunks[i] for i in selected_idx[:max_chunks]]
def build_context(selected_chunks: List[Dict[str, Any]], max_chars: int = 20000) -> str:
parts = []
total = 0
for c in selected_chunks:
block = f"[pages {c['pages']}]\n{c['text']}\n"
if total + len(block) > max_chars:
break
parts.append(block)
total += len(block)
return "\n".join(parts).strip()
# =============================
# Spec -> JSON schema
# =============================
def slugify_field(name: str) -> str:
name = (name or "").strip()
name = re.sub(r"[^\w\s-]", "", name)
name = re.sub(r"[\s-]+", "_", name).lower()
return name[:80] if name else "field"
def parse_field_spec(spec: str) -> Tuple[Dict[str, Any], Dict[str, str]]:
props: Dict[str, Any] = {}
instr: Dict[str, str] = {}
for raw_line in (spec or "").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) < 2:
continue
field_name = parts[0]
ftype = parts[1]
finstr = parts[2] if len(parts) >= 3 else ""
key = slugify_field(field_name)
instr[key] = finstr
schema: Dict[str, Any] = {"type": "string"}
if ftype == "str":
schema = {"type": "string"}
elif ftype == "num":
schema = {"type": "number"}
elif ftype == "bool":
schema = {"type": "boolean"}
elif ftype.startswith("list[enum[") and ftype.endswith("]]"):
inside = ftype[len("list[enum["):-2].strip()
vals = [v.strip() for v in inside.split(",") if v.strip()]
schema = {"type": "array", "items": {"type": "string", "enum": vals}}
elif ftype.startswith("list[str]"):
schema = {"type": "array", "items": {"type": "string"}}
elif ftype.startswith("list[num]"):
schema = {"type": "array", "items": {"type": "number"}}
elif ftype.startswith("enum[") and ftype.endswith("]"):
inside = ftype[len("enum["):-1].strip()
vals = [v.strip() for v in inside.split(",") if v.strip()]
schema = {"type": "string", "enum": vals}
else:
schema = {"type": "string"}
props[key] = schema
return props, instr
def build_extraction_schema(field_props: Dict[str, Any], vocab: Dict[str, Any]) -> Dict[str, Any]:
risk_enum = vocab.get("risk_stance_enum", ["acceptable","acceptable_with_uncertainty","not_acceptable","insufficient_data"])
all_field_keys = list(field_props.keys())
return {
"type": "object",
"additionalProperties": False,
"properties": {
"paper_title": {"type": "string"},
"risk_stance": {"type": "string", "enum": risk_enum},
"risk_confidence": {"type": "number", "minimum": 0, "maximum": 1},
"risk_summary": {"type": "string"},
"extracted": {
"type": "object",
"additionalProperties": False,
"properties": field_props,
"required": all_field_keys
},
"evidence": {
"type": "array",
"items": {
"type": "object",
"additionalProperties": False,
"properties": {
"field": {"type": "string"},
"quote": {"type": "string"},
"pages": {"type": "string"}
},
"required": ["field", "quote", "pages"]
}
}
},
"required": ["paper_title","risk_stance","risk_confidence","risk_summary","extracted","evidence"]
}
# =============================
# OpenAI client + extraction
# =============================
def get_openai_client(api_key: str) -> OpenAI:
key = (api_key or "").strip() or os.getenv("OPENAI_API_KEY", "").strip()
if not key:
raise ValueError("Missing OpenAI API key. Provide it in the UI or set OPENAI_API_KEY secret in Hugging Face.")
return OpenAI(api_key=key)
def openai_structured_extract(
client: OpenAI,
model: str,
schema: Dict[str, Any],
controlled_vocab: Dict[str, Any],
field_instructions: Dict[str, str],
context: str
) -> Dict[str, Any]:
field_instr_lines = [f"- {k}: {v if v else '(no extra instructions)'}" for k, v in field_instructions.items()]
vocab_text = json.dumps(controlled_vocab, indent=2)
has_fda_tier_field = "fda_ctp_carcinogenicity_tier" in field_instructions
system_msg = (
"You are a toxicology research paper data-extraction assistant for an industry safety assessor.\n"
"Grounding rules (must follow):\n"
"1) Use ONLY the provided excerpts; do NOT invent details.\n"
"2) If a value is not explicitly stated, output empty string or empty list, OR the enum value 'not_reported'/'insufficient_data' when applicable.\n"
"3) Provide evidence quotes + page ranges for extracted fields.\n"
"4) risk_stance is regulatory: acceptable / acceptable_with_uncertainty / not_acceptable / insufficient_data.\n"
"5) Prefer controlled vocab terms when applicable.\n"
"6) Use an INTERNAL Tree-of-Thought process before finalizing JSON:\n"
" - Branch evidence by endpoint/theme.\n"
" - Test competing interpretations.\n"
" - Prune branches that are not directly supported by excerpts.\n"
" - Select the most evidence-grounded branch only.\n"
" - Do NOT output reasoning traces; output JSON only.\n"
"7) If the FDA CTP tier field is requested but evidence is insufficient, output exactly: 'enough data is not available'.\n"
)
user_msg = (
"CONTROLLED VOCAB (JSON):\n"
f"{vocab_text}\n\n"
"TREE-OF-THOUGHT EXECUTION FRAMEWORK (internal only, do not output):\n"
"A) Build evidence map: claims -> quotes -> page ranges.\n"
"B) Generate candidate interpretations per endpoint.\n"
"C) Eliminate candidates lacking direct quote support.\n"
"D) Select final grounded interpretation and populate schema fields.\n"
"E) For uncertain fields, use explicit fallback values from enum/instructions.\n\n"
"FIELD INSTRUCTIONS:\n"
+ "\n".join(field_instr_lines)
+ "\n\n"
"EXCERPTS (with page ranges):\n"
f"{context}\n\n"
+ (
"IMPORTANT: `fda_ctp_carcinogenicity_tier` must be one of "
"[Tier_1_high_priority, Tier_2_moderate_priority, Tier_3_lower_priority, enough data is not available].\n"
if has_fda_tier_field else ""
)
)
resp = client.responses.create(
model=model,
input=[
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg}
],
text={
"format": {
"type": "json_schema",
"name": "tox_extraction",
"schema": schema,
"strict": True
}
}
)
return json.loads(resp.output_text)
def openai_synthesize_across_papers(client: OpenAI, model: str, rows: List[Dict[str, Any]]) -> str:
system_msg = (
"You are a senior toxicology safety assessor summarizing multiple papers.\n"
"Create a concise synthesis: consensus, disagreements, data gaps, and actionable next steps.\n"
"Base strictly on the provided extracted JSON (which is evidence-backed).\n"
)
user_msg = "EXTRACTED_ROWS_JSON:\n" + json.dumps(rows, indent=2)
resp = client.responses.create(model=model, input=[{"role":"system","content":system_msg},{"role":"user","content":user_msg}])
return resp.output_text
# =============================
# Controlled vocab editor helpers (lists only) + search filter
# =============================
def _filter_terms_df(df: pd.DataFrame, query: str) -> pd.DataFrame:
if df is None or df.empty:
return pd.DataFrame(columns=["term"])
q = (query or "").strip().lower()
if not q:
return df[["term"]].copy()
mask = df["term"].astype(str).str.lower().str.contains(q, na=False)
return df.loc[mask, ["term"]].copy()
def vocab_init_state(vocab_json: str):
try:
vocab = json.loads(vocab_json or DEFAULT_CONTROLLED_VOCAB_JSON)
except Exception:
vocab = json.loads(DEFAULT_CONTROLLED_VOCAB_JSON)
list_keys = sorted([k for k, v in vocab.items() if isinstance(v, list)])
default_key = list_keys[0] if list_keys else None
terms = vocab.get(default_key, []) if default_key else []
full_df = pd.DataFrame({"term": terms})
filtered_df = _filter_terms_df(full_df, "")
return vocab, list_keys, default_key, full_df, filtered_df, json.dumps(vocab, indent=2), "✅ Vocab loaded."
def vocab_reset_defaults_ui():
vocab, keys, k0, full_df, filtered_df, vjson, msg = vocab_init_state(DEFAULT_CONTROLLED_VOCAB_JSON)
return vocab, gr.update(choices=keys, value=k0), full_df, filtered_df, vjson, msg, vjson
def vocab_load_category(vocab_state: Dict[str, Any], category: str, search: str):
if not category or category not in vocab_state:
empty = pd.DataFrame(columns=["term"])
return empty, empty, "Select a category."
terms = vocab_state.get(category, [])
if not isinstance(terms, list):
empty = pd.DataFrame(columns=["term"])
return empty, empty, "This category is not a list."
full = pd.DataFrame({"term": terms})
filtered = _filter_terms_df(full, search)
return full, filtered, f"Editing: {category}"
def vocab_add_term(vocab_state: Dict[str, Any], category: str, term: str, search: str):
term = (term or "").strip()
if not term:
return gr.update(), gr.update(), "", "Enter a term to add."
if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list):
return gr.update(), gr.update(), "", "Pick a list category first."
if term not in vocab_state[category]:
vocab_state[category].append(term)
full = pd.DataFrame({"term": vocab_state[category]})
filtered = _filter_terms_df(full, search)
return full, filtered, "", f"Added: {term}"
def vocab_remove_term(vocab_state: Dict[str, Any], category: str, term: str, search: str):
term = (term or "").strip()
if not term:
return gr.update(), gr.update(), "", "Enter a term to remove."
if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list):
return gr.update(), gr.update(), "", "Pick a list category first."
vocab_state[category] = [t for t in vocab_state[category] if t != term]
full = pd.DataFrame({"term": vocab_state[category]})
filtered = _filter_terms_df(full, search)
return full, filtered, "", f"Removed: {term}"
def vocab_apply_df(vocab_state: Dict[str, Any], category: str, terms_df: Any, search: str):
if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list):
return json.dumps(vocab_state, indent=2), pd.DataFrame(columns=["term"]), "Pick a list category first."
try:
df = terms_df if isinstance(terms_df, pd.DataFrame) else pd.DataFrame(terms_df, columns=["term"])
except Exception:
return json.dumps(vocab_state, indent=2), pd.DataFrame(columns=["term"]), "Could not parse terms table."
terms = []
for t in df.get("term", []).tolist():
t = (str(t) if t is not None else "").strip()
if t and t not in terms:
terms.append(t)
vocab_state[category] = terms
vjson = json.dumps(vocab_state, indent=2)
filtered = _filter_terms_df(pd.DataFrame({"term": terms}), search)
return vjson, filtered, f"✅ Applied {len(terms)} terms to {category}."
def vocab_filter_preview(terms_df, search):
try:
df = terms_df if isinstance(terms_df, pd.DataFrame) else pd.DataFrame(terms_df, columns=["term"])
except Exception:
df = pd.DataFrame(columns=["term"])
return _filter_terms_df(df, search)
# =============================
# Field mapping from endpoints
# =============================
TYPE_CHOICES = ["str", "num", "bool", "list[str]", "list[num]", "enum", "list[enum]"]
def build_spec_from_field_rows(rows: List[Dict[str, Any]]) -> str:
lines = [
"# One field per line: Field Name | type | instructions",
"# types: str, num, bool, list[str], list[num], enum[a,b,c], list[enum[a,b,c]]",
""
]
for r in rows:
field = str(r.get("field","")).strip()
ftype = str(r.get("type","")).strip()
enums = str(r.get("enum_values","")).strip()
instr = str(r.get("instructions","")).strip()
if not field or not ftype:
continue
if ftype == "enum":
vals = [v.strip() for v in enums.split(",") if v.strip()]
type_str = f"enum[{','.join(vals)}]" if vals else "str"
elif ftype == "list[enum]":
vals = [v.strip() for v in enums.split(",") if v.strip()]
type_str = f"list[enum[{','.join(vals)}]]" if vals else "list[str]"
else:
type_str = ftype
lines.append(f"{field} | {type_str} | {instr}")
return "\n".join(lines).strip() + "\n"
def build_rows_from_endpoints(selected_endpoints: List[str]) -> Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, List[str]]]:
selected_endpoints = selected_endpoints or []
rows: List[Dict[str, Any]] = []
field_key_to_module: Dict[str, str] = {}
module_to_keys: Dict[str, List[str]] = {}
for r in PRESET_CORE:
rows.append(dict(r))
k = slugify_field(r["field"])
field_key_to_module[k] = "Core"
module_to_keys.setdefault("Core", []).append(k)
for module in selected_endpoints:
preset = ENDPOINT_MODULES.get(module)
if not preset:
continue
for r in preset:
rows.append(dict(r))
k = slugify_field(r["field"])
field_key_to_module[k] = module
module_to_keys.setdefault(module, []).append(k)
seen = set()
deduped: List[Dict[str, Any]] = []
for r in rows:
k = str(r.get("field","")).strip().lower()
if not k or k in seen:
continue
seen.add(k)
deduped.append(r)
# Rebuild module_to_keys to match deduped
dedup_keys = set([slugify_field(r["field"]) for r in deduped])
module_to_keys = {m: [k for k in ks if k in dedup_keys] for m, ks in module_to_keys.items()}
return deduped, field_key_to_module, module_to_keys
def apply_endpoint_preset(preset_name: str):
vals = ENDPOINT_PRESETS.get(preset_name, [])
return gr.update(value=vals)
def sync_fields_from_endpoints(selected_endpoints: List[str], admin_mode: bool, current_rows: List[Dict[str, Any]], current_spec: str):
if admin_mode:
df = pd.DataFrame(current_rows or [], columns=["field","type","enum_values","instructions"])
return current_rows, df, current_spec, "Admin mode: endpoint selection will not overwrite custom columns."
rows, _, _ = build_rows_from_endpoints(selected_endpoints or [])
df = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"])
spec = build_spec_from_field_rows(rows)
return rows, df, spec, "✅ Columns updated from selected endpoints."
def admin_apply_endpoints(selected_endpoints: List[str]):
rows, _, _ = build_rows_from_endpoints(selected_endpoints or [])
df = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"])
spec = build_spec_from_field_rows(rows)
return rows, df, spec, "✅ Loaded selected endpoints into the builder (Replace)."
def fields_add_or_update(field_name: str, ftype: str, enum_values: str, instructions: str, field_rows: List[Dict[str, Any]]):
field_name = (field_name or "").strip()
ftype = (ftype or "").strip()
enum_values = (enum_values or "").strip()
instructions = (instructions or "").strip()
if not field_name or not ftype:
df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"])
return field_rows, df, build_spec_from_field_rows(field_rows), "Field name and type are required."
updated = False
for r in field_rows:
if str(r.get("field","")).strip().lower() == field_name.lower():
r["type"] = ftype
r["enum_values"] = enum_values
r["instructions"] = instructions
updated = True
break
if not updated:
field_rows.append({"field": field_name, "type": ftype, "enum_values": enum_values, "instructions": instructions})
df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"])
return field_rows, df, build_spec_from_field_rows(field_rows), ("Updated field." if updated else "Added field.")
def fields_apply_df(field_rows: List[Dict[str, Any]], df_in: Any):
try:
df = df_in if isinstance(df_in, pd.DataFrame) else pd.DataFrame(df_in, columns=["field","type","enum_values","instructions"])
except Exception:
df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"])
return field_rows, df, build_spec_from_field_rows(field_rows), "Could not parse builder table."
cleaned = []
seen = set()
for _, r in df.iterrows():
field = str(r.get("field","")).strip()
ftype = str(r.get("type","")).strip()
enums = str(r.get("enum_values","")).strip()
instr = str(r.get("instructions","")).strip()
if not field or not ftype:
continue
k = field.lower()
if k in seen:
continue
seen.add(k)
cleaned.append({"field": field, "type": ftype, "enum_values": enums, "instructions": instr})
df2 = pd.DataFrame(cleaned, columns=["field","type","enum_values","instructions"])
spec = build_spec_from_field_rows(cleaned)
return cleaned, df2, spec, f"✅ Applied builder table ({len(cleaned)} fields)."
# =============================
# Row building + “non-empty module” logic
# =============================
def _as_list(x) -> List[str]:
if x is None:
return []
if isinstance(x, list):
out = []
for v in x:
s = str(v).strip()
if s:
out.append(s)
return out
s = str(x).strip()
return [s] if s else []
def _format_value(v: Any) -> Any:
if isinstance(v, list):
return "; ".join([str(x) for x in v if str(x).strip()])
return v
EMPTY_STRINGS = {"", "not_reported", "insufficient_data", "none", "na", "n/a", "null"}
def _is_empty_value(v: Any) -> bool:
if v is None:
return True
if isinstance(v, float) and np.isnan(v):
return True
if isinstance(v, list):
cleaned = [str(x).strip() for x in v if str(x).strip()]
if not cleaned:
return True
# empty if all items are not_reported / similar
return all((c.lower() in EMPTY_STRINGS) for c in cleaned)
s = str(v).strip()
if not s:
return True
return s.lower() in EMPTY_STRINGS
def _json_default(o: Any):
if isinstance(o, np.integer):
return int(o)
if isinstance(o, np.floating):
return float(o)
if isinstance(o, np.ndarray):
return o.tolist()
raise TypeError(f"Object of type {o.__class__.__name__} is not JSON serializable")
def _record_id(file_name: str, chemical: str, endpoint: str) -> str:
chemical = (chemical or "").strip() or "-"
endpoint = (endpoint or "").strip() or "Paper"
return f"{file_name} | {chemical} | {endpoint}"
def _module_has_any_data(ext: Dict[str, Any], module_keys: List[str], field_props: Dict[str, Any]) -> bool:
for k in (module_keys or []):
v = ext.get(k, None)
if not _is_empty_value(v):
return True
return False
# =============================
# Evidence + report helpers
# =============================
def _make_vertical(records: List[Dict[str, Any]], record_id: str) -> pd.DataFrame:
if not records or not record_id:
return pd.DataFrame(columns=["Field", "Value"])
row = next((r for r in records if r.get("record_id") == record_id), None)
if not row:
return pd.DataFrame(columns=["Field", "Value"])
hidden = {"record_id"}
keys = [k for k in row.keys() if k not in hidden]
return pd.DataFrame({"Field": keys, "Value": [row.get(k, "") for k in keys]})
def _render_evidence(details: List[Dict[str, Any]], file_name: str, allowed_fields: Optional[set] = None, max_items: int = 120) -> str:
if not details or not file_name:
return ""
d = next((x for x in details if x.get("_file") == file_name), None)
if not d:
return ""
ev = d.get("evidence", []) or []
lines = []
for e in ev:
field = (e.get("field", "") or "").strip()
if allowed_fields is not None and field and field not in allowed_fields:
continue
quote = (e.get("quote", "") or "").strip()
pages = (e.get("pages", "") or "").strip()
if quote:
if len(quote) > 320:
quote = quote[:320] + "…"
lines.append(f"- **{field}** (pages {pages}): “{quote}”")
if len(lines) >= max_items:
break
header = "### Evidence (grounding)\n"
return header + ("\n".join(lines) if lines else "- (no evidence returned)")
def _overview_df_from_records(records: List[Dict[str, Any]]) -> pd.DataFrame:
if not records:
return pd.DataFrame(columns=["record_id","file","paper_title","chemical","endpoint","risk_stance","risk_confidence"])
df = pd.DataFrame(records)
cols = ["record_id","file","paper_title","chemical","endpoint","risk_stance","risk_confidence"]
cols = [c for c in cols if c in df.columns]
return df[cols].copy() if cols else df.head(50)
def _risk_badge(risk: str) -> str:
r = (risk or "").strip().lower()
if r == "acceptable":
bg = "#e7f7ed"; fg = "#0f5132"
elif r == "acceptable_with_uncertainty":
bg = "#fff3cd"; fg = "#664d03"
elif r == "not_acceptable":
bg = "#f8d7da"; fg = "#842029"
else:
bg = "#e2e3e5"; fg = "#41464b"
label = risk if risk else "unknown"
return f'<span style="background:{bg};color:{fg};padding:4px 10px;border-radius:999px;font-weight:600;font-size:12px;">{label}</span>'
def _safe_str(x: Any) -> str:
if x is None:
return ""
if isinstance(x, float) and np.isnan(x):
return ""
return str(x)
def render_summary_card(record_id: str, records: List[Dict[str, Any]]) -> str:
if not record_id or not records:
return "<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#666;'>Run extraction to view results.</div></div>"
row = next((r for r in records if r.get("record_id") == record_id), None)
if not row:
return "<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#666;'>Select a record.</div></div>"
title = _safe_str(row.get("paper_title", "")).strip() or "Untitled paper"
file_name = _safe_str(row.get("file", ""))
chemical = _safe_str(row.get("chemical", "-"))
endpoint = _safe_str(row.get("endpoint", "Paper"))
risk = _safe_str(row.get("risk_stance", ""))
conf = row.get("risk_confidence", "")
try:
conf_txt = f"{float(conf):.2f}" if conf != "" else ""
except Exception:
conf_txt = _safe_str(conf)
key_findings = _safe_str(row.get("key_findings", "")).strip()
dose_metrics = _safe_str(row.get("dose_metrics", "")).strip()
conclusion = _safe_str(row.get("conclusion", "")).strip()
risk_summary = _safe_str(row.get("risk_summary", "")).strip()
# Keep compact
def _clip(s: str, n: int = 380) -> str:
s = s.strip()
if len(s) <= n:
return s
return s[:n] + "…"
return f"""
<div style="border:1px solid #eaeaea;padding:14px;border-radius:12px;">
<div style="display:flex;align-items:center;justify-content:space-between;gap:12px;flex-wrap:wrap;">
<div style="font-weight:700;font-size:16px;">Executive Summary</div>
<div>{_risk_badge(risk)} <span style="margin-left:10px;color:#666;font-size:12px;">confidence: {conf_txt}</span></div>
</div>
<div style="margin-top:10px;">
<div style="font-weight:650;">{title}</div>
<div style="color:#666;font-size:12px;margin-top:4px;">
<span><b>File:</b> {file_name}</span> &nbsp; • &nbsp;
<span><b>Chemical:</b> {chemical}</span> &nbsp; • &nbsp;
<span><b>Endpoint:</b> {endpoint}</span>
</div>
</div>
<div style="margin-top:12px;display:grid;grid-template-columns:1fr;gap:10px;">
<div>
<div style="font-weight:650;margin-bottom:4px;">Key Findings</div>
<div style="color:#222;">{_clip(key_findings) if key_findings else "<span style='color:#666'>(not reported)</span>"}</div>
</div>
<div>
<div style="font-weight:650;margin-bottom:4px;">Dose Metrics</div>
<div style="color:#222;">{_clip(dose_metrics) if dose_metrics else "<span style='color:#666'>(not reported)</span>"}</div>
</div>
<div>
<div style="font-weight:650;margin-bottom:4px;">Conclusion</div>
<div style="color:#222;">{_clip(conclusion) if conclusion else "<span style='color:#666'>(not reported)</span>"}</div>
</div>
<div>
<div style="font-weight:650;margin-bottom:4px;">Risk Summary</div>
<div style="color:#222;">{_clip(risk_summary) if risk_summary else "<span style='color:#666'>(not reported)</span>"}</div>
</div>
</div>
</div>
"""
# =============================
# Main extraction handler
# =============================
def run_extraction(
files,
api_key,
model,
selected_endpoints,
field_spec,
vocab_json,
max_pages,
chunk_chars,
max_context_chars,
admin_mode
):
if not files:
return (
"<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#666;'>Upload PDFs to run extraction.</div></div>",
pd.DataFrame(), None, None, None, "Upload one or more PDFs.",
gr.update(choices=[], value=None),
[], [], pd.DataFrame(columns=["Field","Value"]), ""
)
try:
vocab = json.loads(vocab_json or DEFAULT_CONTROLLED_VOCAB_JSON)
except Exception as e:
return (
"<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#b00;'>Invalid vocab JSON.</div></div>",
pd.DataFrame(), None, None, None, f"Controlled vocab JSON invalid: {e}",
gr.update(choices=[], value=None),
[], [], pd.DataFrame(columns=["Field","Value"]), ""
)
field_props, field_instr = parse_field_spec(field_spec or "")
if not field_props:
return (
"<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#b00;'>No columns defined.</div></div>",
pd.DataFrame(), None, None, None, "No extraction fields are defined. (Check selected endpoints or admin field spec.)",
gr.update(choices=[], value=None),
[], [], pd.DataFrame(columns=["Field","Value"]), ""
)
schema = build_extraction_schema(field_props, vocab)
if admin_mode:
field_key_to_module = {k: "Custom" for k in field_props.keys()}
module_to_keys: Dict[str, List[str]] = {"Custom": list(field_props.keys())}
endpoint_modules_for_rows = ["Custom"]
else:
_, field_key_to_module, module_to_keys = build_rows_from_endpoints(selected_endpoints or [])
endpoint_modules_for_rows = list(selected_endpoints or []) or ["Core"]
try:
client = get_openai_client(api_key)
except Exception as e:
return (
"<div style='border:1px solid #eee;padding:14px;border-radius:10px;'><b>Executive Summary</b><div style='margin-top:8px;color:#b00;'>Missing API key.</div></div>",
pd.DataFrame(), None, None, None, str(e),
gr.update(choices=[], value=None),
[], [], pd.DataFrame(columns=["Field","Value"]), ""
)
paper_details: List[Dict[str, Any]] = []
output_rows: List[Dict[str, Any]] = []
nlp_diagnostics: List[Dict[str, Any]] = []
tmpdir = Path(tempfile.mkdtemp(prefix="tox_extract_"))
for f in files:
pdf_path = f.name
filename = os.path.basename(pdf_path)
pages, page_count = extract_pages_from_pdf(pdf_path, max_pages=int(max_pages))
if _text_based_pdf_warning(pages):
ex = {
"_file": filename,
"_pages_in_pdf": page_count,
"paper_title": "",
"risk_stance": "insufficient_data",
"risk_confidence": 0.0,
"risk_summary": "No extractable text found. This app supports text-based PDFs only (not scanned images).",
"extracted": {k: ([] if field_props[k].get("type") == "array" else "") for k in field_props.keys()},
"evidence": []
}
nlp_diagnostics.append(
{
"file": filename,
"ranking_method": "unavailable_no_text",
"selected_indices": [],
"coverage_by_query_family": {},
"coverage_score": 0.0,
}
)
else:
chunks = chunk_pages(pages, target_chars=int(chunk_chars))
base_queries = [
"regulatory acceptability risk hazard concern conclusion uncertainty evidence NOAEL LOAEL BMD",
"chemical name CAS number",
]
extra_terms = [ins if ins else k for k, ins in field_instr.items()]
queries, families = expand_regulatory_queries(
base_queries=base_queries,
endpoint_modules=selected_endpoints or [],
frameworks=["FDA CTP", "EPA"],
extra_terms=extra_terms,
)
emb_mat = None
qemb = None
try:
texts = [c.get("text", "") for c in chunks]
if texts:
emb_mat = embed_texts(client, DEFAULT_EMBEDDING_MODEL, texts)
qemb = embed_texts(client, DEFAULT_EMBEDDING_MODEL, [" ".join(queries[:20])])[0]
except Exception:
emb_mat = None
qemb = None
selected, diag = hybrid_rank_text_items(
items=chunks,
query=" ".join(queries[:20]),
families=families,
top_k=12,
item_embeddings=emb_mat,
query_embedding=qemb,
)
nlp_diagnostics.append(dict({"file": filename}, **diag))
span_blocks: List[str] = []
chars = 0
for c in selected:
span = extract_evidence_span(c.get("text", ""), " ".join(queries[:20]), page=None, n_sentences=5)
snippet = span.get("text", "") or c.get("text", "")
block = f"[pages {c.get('pages','')}]\n{snippet}\n"
if chars + len(block) > int(max_context_chars):
break
span_blocks.append(block)
chars += len(block)
context = "\n".join(span_blocks).strip()
if not context:
context = build_context(selected, max_chars=int(max_context_chars))
ex = openai_structured_extract(
client=client,
model=model,
schema=schema,
controlled_vocab=vocab,
field_instructions=field_instr,
context=context
)
ex["_file"] = filename
ex["_pages_in_pdf"] = page_count
paper_details.append(ex)
base = {
"file": filename,
"paper_title": ex.get("paper_title", ""),
"risk_stance": ex.get("risk_stance", ""),
"risk_confidence": ex.get("risk_confidence", ""),
"risk_summary": ex.get("risk_summary", ""),
}
ext = ex.get("extracted") or {}
chemicals = _as_list(ext.get("chemicals"))
if not chemicals:
chemicals = ["-"]
# Single-chemical => one-row-per-paper
if len(chemicals) <= 1:
chem = chemicals[0]
row = dict(base)
row["chemical"] = chem
row["endpoint"] = "Paper"
row["record_id"] = _record_id(filename, chem, row["endpoint"])
for k in field_props.keys():
row[k] = _format_value(ext.get(k, [] if field_props[k].get("type") == "array" else ""))
output_rows.append(row)
# Multi-chemical => chemical–endpoint rows (ONLY non-empty modules)
else:
core_keys = [k for k, m in field_key_to_module.items() if m == "Core"] if not admin_mode else []
# determine which endpoint modules have any data (skip empty ones)
candidate_modules = [m for m in endpoint_modules_for_rows if m != "Core"]
non_empty_modules = []
for m in candidate_modules:
if _module_has_any_data(ext, module_to_keys.get(m, []), field_props):
non_empty_modules.append(m)
# If everything empty, fall back to a single Paper row (otherwise you get no rows)
if not non_empty_modules:
row = dict(base)
row["chemical"] = "multiple"
row["endpoint"] = "Paper"
row["record_id"] = _record_id(filename, row["chemical"], row["endpoint"])
for k in field_props.keys():
row[k] = _format_value(ext.get(k, [] if field_props[k].get("type") == "array" else ""))
output_rows.append(row)
else:
for chem in chemicals:
for module in non_empty_modules:
row = dict(base)
row["chemical"] = chem
row["endpoint"] = module
row["record_id"] = _record_id(filename, chem, module)
for k in field_props.keys():
m = field_key_to_module.get(k, "Custom")
include = (m == module) or admin_mode
if include:
if k == "chemicals":
row[k] = chem
else:
row[k] = _format_value(ext.get(k, [] if field_props[k].get("type") == "array" else ""))
output_rows.append(row)
df = pd.DataFrame(output_rows)
records = df.to_dict("records")
csv_path = tmpdir / "extraction_table.csv"
json_path = tmpdir / "extraction_details.json"
df.to_csv(csv_path, index=False)
details_payload = {
"papers": paper_details,
"toxra_extensions": {
"nlp_diagnostics": nlp_diagnostics,
"regulatory_gap_assessment": {},
"risk_calculation_refs": [],
},
}
json_path.write_text(json.dumps(details_payload, indent=2, default=_json_default), encoding="utf-8")
prefilled_template_path = export_prefilled_cancer_risk_template(records)
choices = [r.get("record_id") for r in records if r.get("record_id")]
default = choices[0] if choices else None
vertical = _make_vertical(records, default) if default else pd.DataFrame(columns=["Field","Value"])
summary_html = render_summary_card(default, records) if default else render_summary_card("", [])
allowed_fields = None
file_for_evidence = None
if default:
selected_row = next((r for r in records if r.get("record_id") == default), {})
allowed_fields = set([k for k in selected_row.keys() if k not in {"record_id"}])
file_for_evidence = (default.split(" | ")[0] or "").strip()
evidence = _render_evidence(paper_details, file_for_evidence, allowed_fields=allowed_fields) if file_for_evidence else ""
overview = _overview_df_from_records(records)
status = "✅ Done. Review in the report below and export when ready."
return (
summary_html,
overview,
str(csv_path),
str(json_path),
str(prefilled_template_path),
status,
gr.update(choices=choices, value=default),
records,
paper_details,
vertical,
evidence
)
# =============================
# Review mode handlers
# =============================
def on_pick(record_id: str, records: List[Dict[str, Any]], details: List[Dict[str, Any]]):
if not record_id:
return render_summary_card("", []), pd.DataFrame(columns=["Field","Value"]), ""
row = next((r for r in (records or []) if r.get("record_id") == record_id), {})
file_name = (row.get("file") or "")
allowed_fields = set(row.keys()) - {"record_id"}
return render_summary_card(record_id, records), _make_vertical(records, record_id), _render_evidence(details, file_name, allowed_fields=allowed_fields)
def toggle_review_mode(is_on: bool):
return gr.update(interactive=bool(is_on))
def save_review_changes(record_id: str, vertical_df: Any, records: List[Dict[str, Any]]):
if not record_id or not records:
return pd.DataFrame(), records, "Nothing to save.", render_summary_card("", [])
try:
dfv = vertical_df if isinstance(vertical_df, pd.DataFrame) else pd.DataFrame(vertical_df, columns=["Field", "Value"])
except Exception:
return _overview_df_from_records(records), records, "Could not parse edited vertical table.", render_summary_card(record_id, records)
dfv = dfv.dropna(subset=["Field"])
updates = {str(r["Field"]): r["Value"] for _, r in dfv.iterrows() if str(r["Field"]).strip()}
new_records = []
updated = False
for r in records:
if r.get("record_id") == record_id:
rr = dict(r)
for k, v in updates.items():
rr[k] = v
new_records.append(rr)
updated = True
else:
new_records.append(r)
msg = "Saved changes into session data. Export reviewed CSV to download." if updated else "Record not found."
return _overview_df_from_records(new_records), new_records, msg, render_summary_card(record_id, new_records)
def export_reviewed_csv(records: List[Dict[str, Any]]):
if not records:
return None, "No reviewed data to export."
tmpdir = Path(tempfile.mkdtemp(prefix="tox_review_"))
path = tmpdir / "reviewed_extraction_table.csv"
pd.DataFrame(records).to_csv(path, index=False)
return str(path), "Reviewed CSV ready to download."
# =============================
# New modules: template, mapping, MCP batch
# =============================
def _load_extraction_payload(file_obj: Any) -> Tuple[Any, List[Dict[str, Any]], Dict[str, Any]]:
if file_obj is None:
raise ValueError("Upload extraction_details.json first.")
payload = json.loads(Path(file_obj.name).read_text(encoding="utf-8"))
if isinstance(payload, list):
return payload, payload, {}
if isinstance(payload, dict):
papers = payload.get("papers", [])
if not isinstance(papers, list):
raise ValueError("Invalid extraction_details.json format: papers must be a list.")
ext = payload.get("toxra_extensions", {})
return payload, papers, (ext if isinstance(ext, dict) else {})
raise ValueError("Unsupported extraction_details.json format.")
def export_blank_cancer_risk_template():
tmpdir = Path(tempfile.mkdtemp(prefix="tox_template_"))
path = tmpdir / "cancer_risk_input_template.csv"
pd.DataFrame(columns=CANCER_RISK_TEMPLATE_COLUMNS).to_csv(path, index=False)
return str(path), "Blank cancer risk template ready."
def export_prefilled_cancer_risk_template(records: List[Dict[str, Any]]):
tmpdir = Path(tempfile.mkdtemp(prefix="tox_template_prefilled_"))
path = tmpdir / "cancer_risk_input_template_prefilled.csv"
if not records:
pd.DataFrame(columns=CANCER_RISK_TEMPLATE_COLUMNS).to_csv(path, index=False)
return str(path)
rows: List[Dict[str, Any]] = []
seen = set()
for r in records:
rid = str(r.get("record_id", "")).strip()
if not rid or rid in seen:
continue
seen.add(rid)
route = str(r.get("exposure_route", "")).strip().lower()
if route not in {"oral", "inhalation"}:
route = ""
casn = str(r.get("cas_numbers", "")).split(";")[0].strip()
rows.append(
{
"record_id": rid,
"chemical_name": str(r.get("chemical", "")).strip(),
"casrn": casn,
"route": route,
"exposure_value": "",
"exposure_unit": "",
"body_weight_kg": "",
"csf_value": "",
"csf_unit": "",
"iur_value": "",
"air_conc_value": "",
"air_conc_unit": "",
"source_reference": str(r.get("file", "")).strip(),
}
)
df = pd.DataFrame(rows, columns=CANCER_RISK_TEMPLATE_COLUMNS)
df.to_csv(path, index=False)
return str(path)
def run_regulatory_gap_assessment(extraction_json_file, framework: str, override_notes: str):
if extraction_json_file is None:
return pd.DataFrame(), "Upload extraction_details.json first.", None, None, "No input file."
try:
payload, _, _ = _load_extraction_payload(extraction_json_file)
matrix_df, report, report_md = map_extraction_to_framework(
extraction_payload=payload,
framework=framework,
catalog_dir="regulatory_catalog",
override_notes=override_notes or "",
)
except Exception as e:
return pd.DataFrame(), f"(assessment unavailable: {e})", None, None, str(e)
run_dir = make_run_dir(base_dir="runs")
matrix_path = write_dataframe_csv(run_dir / "regulatory_gap_matrix.csv", matrix_df)
report_path = write_json(run_dir / "regulatory_gap_report.json", report)
write_markdown(run_dir / "regulatory_gap_report.md", report_md)
md = "### Regulatory Gap Summary\n" + report_md
status = f"✅ Gap assessment complete. Covered={report.get('summary', {}).get('covered', 0)} | Missing={report.get('summary', {}).get('missing', 0)}"
return matrix_df, md, str(matrix_path), str(report_path), status
def run_cancer_risk_batch_ui(input_csv_file):
if input_csv_file is None:
return pd.DataFrame(), None, None, None, "Upload a populated cancer risk input CSV."
try:
df = pd.read_csv(input_csv_file.name)
except Exception as e:
return pd.DataFrame(), None, None, None, f"Could not read CSV: {e}"
missing = [c for c in CANCER_RISK_TEMPLATE_COLUMNS if c not in df.columns]
if missing:
return pd.DataFrame(), None, None, None, f"Missing required columns: {missing}"
run_dir = make_run_dir(base_dir="runs")
rows = df.fillna("").to_dict("records")
try:
result = run_batch_cancer_risk(rows, run_dir=str(run_dir))
except MCPClientError as e:
return pd.DataFrame(), None, None, None, f"MCP server unavailable: {e}"
except Exception as e:
return pd.DataFrame(), None, None, None, f"Calculation failed: {e}"
result_rows = result.get("rows", []) if isinstance(result.get("rows", []), list) else []
out_df = pd.DataFrame(result_rows)
result_csv_path = write_dataframe_csv(run_dir / "cancer_risk_results.csv", out_df)
write_json(run_dir / "cancer_risk_results.json", result)
artifacts = result.get("artifacts", {}) if isinstance(result, dict) else {}
log_path = artifacts.get("log_jsonl", str(run_dir / "cancer_risk_log.jsonl"))
report_path = artifacts.get("report_md", str(run_dir / "cancer_risk_report.md"))
summ = result.get("summary", {})
status = (
f"✅ Batch complete. total={summ.get('total_rows', 0)} "
f"ok={summ.get('ok_rows', 0)} error={summ.get('error_rows', 0)}"
)
return out_df, str(result_csv_path), str(log_path), str(report_path), status
# =============================
# Synthesis tab handler
# =============================
def run_synthesis(api_key, model, extraction_json_file):
if extraction_json_file is None:
return "Upload the extraction_details.json from Extract tab first."
try:
client = get_openai_client(api_key)
except Exception as e:
return str(e)
payload = json.loads(Path(extraction_json_file.name).read_text(encoding="utf-8"))
rows = payload.get("papers", payload) if isinstance(payload, dict) else payload
if not isinstance(rows, list):
return "Invalid extraction JSON format for synthesis."
return openai_synthesize_across_papers(client, model, rows)
# =============================
# Admin visibility helpers
# =============================
def set_admin_visibility(is_admin: bool):
return (
gr.update(visible=bool(is_admin)),
gr.update(visible=bool(is_admin)),
gr.update(visible=bool(is_admin))
)
# =============================
# Gradio UI
# =============================
with gr.Blocks(title="Toxicology PDF → Grounded Extractor", css=APP_CSS) as demo:
gr.HTML(
"""
<div class="hero">
<div class="hero-left">
<div class="hero-title">TOXRA.AI</div>
<div class="hero-sub">Grounded toxicology extraction &amp; literature exploration</div>
<div class="hero-pills">
<span class="hero-pill">Text-based PDFs only</span>
<span class="hero-pill">Results-first reporting</span>
<span class="hero-pill">Admin-configurable extraction</span>
</div>
</div>
<div class="hero-right">
<span class="hero-status">Production · Beta</span>
</div>
</div>
"""
)
state_records = gr.State([])
state_details = gr.State([])
vocab_state = gr.State({})
field_rows_state = gr.State([])
field_spec = gr.Textbox(visible=False, interactive=False, lines=8)
vocab_json = gr.Textbox(visible=False, interactive=False, lines=8)
with gr.Tab("Extract"):
with gr.Row(elem_classes="split-row"):
with gr.Column(scale=4, min_width=320, elem_classes="left-rail"):
with gr.Group(elem_classes="card"):
gr.Markdown("Extract setup", elem_classes="section-title")
files = gr.File(label="Upload toxicology PDFs", file_types=[".pdf"], file_count="multiple")
with gr.Row():
api_key = gr.Textbox(label="OpenAI API key (optional if set as OPENAI_API_KEY secret)", type="password")
model = gr.Dropdown(label="Model", choices=["gpt-4o-2024-08-06", "gpt-4o", "gpt-4o-mini"], value="gpt-4o-2024-08-06")
with gr.Row():
endpoint_preset = gr.Dropdown(
label="Endpoint preset",
choices=list(ENDPOINT_PRESETS.keys()),
value="Required – Safety Assessor"
)
endpoints = gr.Dropdown(
label="Endpoints to extract (Core included automatically)",
choices=list(ENDPOINT_MODULES.keys()),
multiselect=True,
value=ENDPOINT_PRESETS["Required – Safety Assessor"]
)
extract_btn = gr.Button("Run Extraction", variant="primary")
status = gr.Textbox(label="Status", interactive=False)
with gr.Accordion("Advanced runtime settings", open=False, elem_classes="card"):
with gr.Row():
max_pages = gr.Slider(0, 250, value=0, step=1, label="Max pages to read (0 = all)")
chunk_chars = gr.Slider(1200, 9000, value=3200, step=100, label="Chunk size (chars)")
max_context_chars = gr.Slider(5000, 45000, value=20000, step=1000, label="Max context sent to GPT (chars)")
with gr.Accordion("Admin tools (taxonomy + custom columns)", open=False, elem_classes="card"):
admin_mode = gr.Checkbox(label="Enable Admin mode", value=False)
admin_group = gr.Group(visible=False)
admin_vocab_group = gr.Group(visible=False)
admin_fields_group = gr.Group(visible=False)
with admin_group:
gr.Markdown("### Admin: Configure extraction taxonomy + custom columns.")
with admin_vocab_group:
gr.Markdown("### Controlled vocabulary (lists only)")
vocab_category = gr.Dropdown(label="Category (lists only)", choices=[], value=None)
vocab_search = gr.Textbox(label="Search terms", placeholder="Type to filter (e.g., 471, AMES, comet)", lines=1)
with gr.Row():
vocab_term_add = gr.Textbox(label="Add term", placeholder="type term and click Add")
vocab_add_btn = gr.Button("Add")
with gr.Row():
vocab_term_remove = gr.Textbox(label="Remove term", placeholder="type exact term and click Remove")
vocab_remove_btn = gr.Button("Remove")
vocab_apply_btn = gr.Button("Apply full list to category")
vocab_reset_btn = gr.Button("Reset vocab to defaults")
vocab_terms_df = gr.Dataframe(headers=["term"], label="Terms (full list; edit directly)", interactive=True, wrap=True)
vocab_terms_filtered = gr.Dataframe(headers=["term"], label="Filtered preview (read-only)", interactive=False, wrap=True)
vocab_status = gr.Textbox(label="Vocab status", interactive=False)
with gr.Accordion("Raw vocab JSON (auto-generated)", open=False):
vocab_json_admin = gr.Textbox(label="Controlled vocab JSON", lines=12, interactive=False)
with admin_fields_group:
gr.Markdown("### Custom columns (Field Builder)")
gr.Markdown("Tip: Use endpoint selection to start, then tweak fields.")
with gr.Row():
admin_apply_endpoints_btn = gr.Button("Load selected endpoints into builder (Replace)", variant="secondary")
fields_apply_btn = gr.Button("Apply builder table")
with gr.Row():
field_name_in = gr.Textbox(label="Field name", placeholder="e.g., genotoxicity_result")
field_type_in = gr.Dropdown(label="Type", choices=TYPE_CHOICES, value="str")
enum_values_in = gr.Textbox(label="Enum values (comma-separated; for enum/list[enum])", placeholder="a,b,c", lines=2)
instructions_in = gr.Textbox(label="Instructions", placeholder="Tell the extractor exactly what to pull.", lines=2)
add_update_field_btn = gr.Button("Add/Update field")
fields_df = gr.Dataframe(
label="Fields (edit and click Apply)",
headers=["field","type","enum_values","instructions"],
interactive=True,
wrap=True
)
fields_status = gr.Textbox(label="Field builder status", interactive=False)
with gr.Column(scale=7, min_width=480, elem_classes="right-panel"):
with gr.Tabs(elem_classes="report-tabs"):
with gr.Tab("Overview"):
with gr.Group(elem_classes="card"):
gr.Markdown("Report overview", elem_classes="section-title")
summary_card = gr.HTML(render_summary_card("", []))
with gr.Group(elem_classes="card"):
overview_df = gr.Dataframe(
label="Batch Overview",
interactive=False,
wrap=True,
show_row_numbers=True
)
with gr.Tab("Record"):
with gr.Group(elem_classes="card"):
record_pick = gr.Dropdown(label="Select record", choices=[], value=None)
with gr.Row():
review_mode = gr.Checkbox(label="Review mode (enable editing)", value=False)
save_btn = gr.Button("Save edits")
export_btn = gr.Button("Export reviewed CSV")
review_status = gr.Textbox(label="Review status", interactive=False)
with gr.Group(elem_classes="card"):
vertical_view = gr.Dataframe(
headers=["Field", "Value"],
interactive=False,
wrap=True,
show_row_numbers=False,
label="Extracted fields (vertical)"
)
with gr.Tab("Evidence"):
with gr.Group(elem_classes="card"):
evidence_md = gr.Markdown()
with gr.Tab("Exports"):
with gr.Group(elem_classes="card"):
out_csv = gr.File(label="Download: extraction_table.csv")
out_json = gr.File(label="Download: extraction_details.json (evidence + structured data)")
risk_template_prefilled = gr.File(label="Download: cancer_risk_input_template_prefilled.csv (record_id linked)")
reviewed_csv = gr.File(label="Download: reviewed_extraction_table.csv")
# --- Wiring ---
admin_mode.change(
fn=set_admin_visibility,
inputs=[admin_mode],
outputs=[admin_group, admin_vocab_group, admin_fields_group]
)
endpoint_preset.change(
fn=apply_endpoint_preset,
inputs=[endpoint_preset],
outputs=[endpoints]
)
endpoints.change(
fn=sync_fields_from_endpoints,
inputs=[endpoints, admin_mode, field_rows_state, field_spec],
outputs=[field_rows_state, fields_df, field_spec, status]
)
extract_btn.click(
fn=run_extraction,
inputs=[files, api_key, model, endpoints, field_spec, vocab_json, max_pages, chunk_chars, max_context_chars, admin_mode],
outputs=[summary_card, overview_df, out_csv, out_json, risk_template_prefilled, status, record_pick, state_records, state_details, vertical_view, evidence_md]
)
record_pick.change(
fn=on_pick,
inputs=[record_pick, state_records, state_details],
outputs=[summary_card, vertical_view, evidence_md]
)
review_mode.change(fn=toggle_review_mode, inputs=[review_mode], outputs=[vertical_view])
save_btn.click(
fn=save_review_changes,
inputs=[record_pick, vertical_view, state_records],
outputs=[overview_df, state_records, review_status, summary_card]
)
export_btn.click(
fn=export_reviewed_csv,
inputs=[state_records],
outputs=[reviewed_csv, review_status]
)
# Admin vocab wiring
vocab_search.change(fn=vocab_filter_preview, inputs=[vocab_terms_df, vocab_search], outputs=[vocab_terms_filtered])
vocab_category.change(
fn=vocab_load_category,
inputs=[vocab_state, vocab_category, vocab_search],
outputs=[vocab_terms_df, vocab_terms_filtered, vocab_status]
)
vocab_add_btn.click(
fn=vocab_add_term,
inputs=[vocab_state, vocab_category, vocab_term_add, vocab_search],
outputs=[vocab_terms_df, vocab_terms_filtered, vocab_term_add, vocab_status]
)
vocab_remove_btn.click(
fn=vocab_remove_term,
inputs=[vocab_state, vocab_category, vocab_term_remove, vocab_search],
outputs=[vocab_terms_df, vocab_terms_filtered, vocab_term_remove, vocab_status]
)
vocab_apply_btn.click(
fn=vocab_apply_df,
inputs=[vocab_state, vocab_category, vocab_terms_df, vocab_search],
outputs=[vocab_json_admin, vocab_terms_filtered, vocab_status]
).then(
fn=lambda x: x,
inputs=[vocab_json_admin],
outputs=[vocab_json]
)
vocab_reset_btn.click(
fn=vocab_reset_defaults_ui,
inputs=None,
outputs=[vocab_state, vocab_category, vocab_terms_df, vocab_terms_filtered, vocab_json_admin, vocab_status, vocab_json]
)
# Admin field builder wiring
admin_apply_endpoints_btn.click(
fn=admin_apply_endpoints,
inputs=[endpoints],
outputs=[field_rows_state, fields_df, field_spec, fields_status]
)
add_update_field_btn.click(
fn=fields_add_or_update,
inputs=[field_name_in, field_type_in, enum_values_in, instructions_in, field_rows_state],
outputs=[field_rows_state, fields_df, field_spec, fields_status]
)
fields_apply_btn.click(
fn=fields_apply_df,
inputs=[field_rows_state, fields_df],
outputs=[field_rows_state, fields_df, field_spec, fields_status]
)
# Init
def _init_all():
vocab, keys, k0, full_df, filtered_df, vjson, vmsg = vocab_init_state(DEFAULT_CONTROLLED_VOCAB_JSON)
default_endpoints = ENDPOINT_PRESETS["Required – Safety Assessor"]
rows, _, _ = build_rows_from_endpoints(default_endpoints)
fdf = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"])
fspec = build_spec_from_field_rows(rows)
return (
vocab,
gr.update(choices=keys, value=k0),
full_df,
filtered_df,
vjson,
vmsg,
vjson,
rows,
fdf,
fspec,
"✅ Ready."
)
demo.load(
_init_all,
inputs=None,
outputs=[
vocab_state,
vocab_category,
vocab_terms_df,
vocab_terms_filtered,
vocab_json_admin,
vocab_status,
vocab_json,
field_rows_state,
fields_df,
field_spec,
status
]
)
with gr.Tab("Literature Explorer"):
build_literature_explorer_tab()
with gr.Tab("Cross-paper Synthesis"):
with gr.Group(elem_classes="card"):
gr.Markdown("Upload `extraction_details.json` from Extract tab. Synthesis is based strictly on grounded extractions.")
api_key2 = gr.Textbox(label="OpenAI API key (optional if set as OPENAI_API_KEY secret)", type="password")
model2 = gr.Dropdown(label="Model", choices=["gpt-4o-2024-08-06", "gpt-4o", "gpt-4o-mini"], value="gpt-4o-2024-08-06")
extraction_json_file = gr.File(label="Upload extraction_details.json", file_types=[".json"], file_count="single")
synth_btn = gr.Button("Synthesize Across Papers")
synth_md = gr.Markdown()
synth_btn.click(fn=run_synthesis, inputs=[api_key2, model2, extraction_json_file], outputs=[synth_md])
with gr.Tab("Regulatory Gap Assessment"):
with gr.Group(elem_classes="card"):
gr.Markdown(
"Run clause-level mapping against regulatory catalogs. "
"Use `extraction_details.json` from Extract tab."
)
with gr.Row():
reg_extraction_json = gr.File(label="Upload extraction_details.json", file_types=[".json"], file_count="single")
reg_framework = gr.Dropdown(label="Framework profile", choices=["FDA CTP", "EPA"], value="FDA CTP")
reg_override_notes = gr.Textbox(
label="Override notes (optional)",
lines=2,
placeholder="Context to include in gap prompts."
)
reg_run_btn = gr.Button("Run Regulatory Gap Assessment", variant="primary")
reg_status = gr.Textbox(label="Status", interactive=False)
reg_summary_md = gr.Markdown()
reg_matrix_df = gr.Dataframe(label="Clause-level gap matrix", interactive=False, wrap=True)
reg_matrix_file = gr.File(label="Download: regulatory_gap_matrix.csv")
reg_report_file = gr.File(label="Download: regulatory_gap_report.json")
reg_run_btn.click(
fn=run_regulatory_gap_assessment,
inputs=[reg_extraction_json, reg_framework, reg_override_notes],
outputs=[reg_matrix_df, reg_summary_md, reg_matrix_file, reg_report_file, reg_status]
)
with gr.Tab("Cancer Risk Calculator"):
with gr.Group(elem_classes="card"):
gr.Markdown(
"Deterministic FDA/EPA cancer risk calculations routed through a dedicated local MCP server. "
"Use `record_id` values from extraction outputs for traceability."
)
with gr.Row():
template_btn = gr.Button("Download Blank CSV Template")
template_file = gr.File(label="Download: cancer_risk_input_template.csv")
template_status = gr.Textbox(label="Template status", interactive=False)
template_btn.click(fn=export_blank_cancer_risk_template, inputs=None, outputs=[template_file, template_status])
risk_input_csv = gr.File(label="Upload populated cancer risk input CSV", file_types=[".csv"], file_count="single")
risk_run_btn = gr.Button("Run Cancer Risk Batch", variant="primary")
risk_status = gr.Textbox(label="Status", interactive=False)
risk_results_df = gr.Dataframe(label="Cancer risk results", interactive=False, wrap=True)
risk_results_csv = gr.File(label="Download: cancer_risk_results.csv")
risk_log_file = gr.File(label="Download: cancer_risk_log.jsonl")
risk_report_file = gr.File(label="Download: cancer_risk_report.md")
risk_run_btn.click(
fn=run_cancer_risk_batch_ui,
inputs=[risk_input_csv],
outputs=[risk_results_df, risk_results_csv, risk_log_file, risk_report_file, risk_status]
)
if __name__ == "__main__":
port = int(os.environ.get("PORT", "7860"))
demo.queue().launch(server_name="0.0.0.0", server_port=port)