diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,1473 +1,1008 @@ +# app.py — TOXRA.AI (Production UI shell + optional private-core loader) +# - Run Assessment: left sidebar inputs, right report output +# - Review & Export: separate tab +# - Literature Search: separate module (literature_explorer.py) +# - Admin: locked (secrets), advanced JSON editors live here +# - Optional: download private toxra_core wheel at runtime using HF_TOKEN (not copied when users duplicate Space) + import os import re import json -import tempfile -from pathlib import Path -from typing import Dict, List, Tuple, Any, Optional +import sys +import time +import hashlib +import textwrap +import subprocess +from typing import Any, Dict, List, Tuple, Optional import gradio as gr import numpy as np import pandas as pd - from pypdf import PdfReader -from sklearn.feature_extraction.text import TfidfVectorizer -from openai import OpenAI -from literature_explorer import build_literature_explorer_tab +# OpenAI is optional if you only run toxra_core; required for fallback extractor. +try: + from openai import OpenAI +except Exception: + OpenAI = None # type: ignore + +# HF Hub is optional; only needed for private-core loader. +try: + from huggingface_hub import hf_hub_download +except Exception: + hf_hub_download = None # type: ignore +# Literature explorer tab (Option A split) +try: + from literature_explorer import build_literature_explorer_tab +except Exception: + build_literature_explorer_tab = None # type: ignore # ============================= -# Defaults +# Branding / UI CSS (neutral, production-grade) # ============================= -DEFAULT_CONTROLLED_VOCAB_JSON = """{ - "risk_stance_enum": ["acceptable","acceptable_with_uncertainty","not_acceptable","insufficient_data"], - - "approach_enum": ["in_vivo","in_vitro","in_silico","nams","mixed","not_reported"], - - "in_silico_method_enum": [ - "qsar","read_across","molecular_docking","molecular_dynamics","pbpk_pbtK","aop_based","ml_model","other","not_reported" - ], - "nams_method_enum": [ - "high_throughput_screening_hts","omics_transcriptomics","omics_proteomics","omics_metabolomics", - "organ_on_chip","microphysiological_system_mps","3d_tissue_model","in_chemico_assay", - "in_silico_as_nams","other","not_reported" - ], - - "exposure_route_enum": ["oral","inhalation","dermal","parenteral","multiple","not_reported"], - "species_enum": ["human","rat","mouse","rabbit","dog","non_human_primate","cell_line","other","not_reported"], - - "genotoxicity_oecd_tg_in_vitro_enum": [ - "OECD_TG_471_Bacterial Reverse mutation test(AMES test)", - "OECD_TG_473_In Vitro Mammalian Chromosomal Aberration Test", - "OECD_TG_476_In Vitro Mammalian Cell Gene Mutation Tests (Hprt & xprt)", - "OECD_TG_487_In Vitro Mammalian Cell Micronucleus Test", - "OECD_TG_490_In Vitro Mammalian Cell Gene Mutation Tests (Thymidine Kinase)", - "not_reported" - ], - "genotoxicity_oecd_tg_in_vivo_enum": [ - "OECD_TG_474_In Vivo Mammalian Erythrocyte Micronucleus Test", - "OECD_TG_475_Mammalian Bone Marrow Chromosomal Aberration Test", - "OECD_TG_488_Transgenic Rodent Somatic & Germ Cell Gene Mutation Assays", - "OECD_TG_489_In Vivo Mammalian Alkaline Comet Assay", - "not_reported" - ], - - "genotoxicity_result_enum": ["positive","negative","equivocal","not_reported"], - "binary_result_enum": ["positive","negative","equivocal","not_reported"], - "carcinogenicity_result_enum": ["carcinogenic","not_carcinogenic","insufficient_data","not_reported"] -}""" +APP_NAME = "TOXRA.AI" + +TOXRA_CSS = """ +:root { + --bg: #f6f7fb; + --card: #ffffff; + --stroke: rgba(15, 23, 42, 0.10); + --text: rgba(15, 23, 42, 0.92); + --muted: rgba(15, 23, 42, 0.68); + --accent: #2563eb; + --accent2: #0ea5e9; + --shadow: 0 10px 25px rgba(15, 23, 42, 0.06); +} +.gradio-container { background: var(--bg); } +#toxra_header { + display:flex; align-items:center; justify-content:space-between; + padding: 14px 16px; border:1px solid var(--stroke); border-radius: 16px; + background: linear-gradient(90deg, rgba(37,99,235,0.06), rgba(14,165,233,0.04)); + box-shadow: var(--shadow); + margin-bottom: 12px; +} +.toxra_title { font-size: 18px; font-weight: 800; color: var(--text); letter-spacing: 0.2px; } +.toxra_sub { font-size: 12px; color: var(--muted); margin-top: 2px; } +.toxra_pill { + padding: 5px 10px; border-radius: 999px; + border: 1px solid var(--stroke); + background: rgba(255,255,255,0.8); + color: var(--muted); font-size: 12px; +} +.toxra_card { + border: 1px solid var(--stroke); + border-radius: 16px; + background: var(--card); + box-shadow: var(--shadow); + padding: 12px; +} +.toxra_sidebar { position: sticky; top: 12px; } +.toxra_section_title { font-size: 13px; font-weight: 750; color: var(--text); margin: 6px 0 8px; } +.toxra_hint { font-size: 12px; color: var(--muted); } +.toxra_kpi { + display:flex; gap:10px; flex-wrap:wrap; margin-top: 6px; +} +.toxra_kpi span{ + border:1px solid var(--stroke); padding:4px 8px; border-radius:999px; + background: rgba(37,99,235,0.05); color: var(--muted); font-size: 12px; +} +""" # ============================= -# Endpoint modules (what users choose) +# Endpoint family → OECD TG mapping (2-level picker) # ============================= -PRESET_CORE = [ - {"field": "chemicals", "type": "list[str]", "enum_values": "", "instructions": "List chemical(s) studied. If multiple, include each separately."}, - {"field": "cas_numbers", "type": "list[str]", "enum_values": "", "instructions": "Extract CAS number(s) mentioned (may be multiple)."}, - {"field": "study_type", "type": "enum", "enum_values": "in_vivo,in_vitro,epidemiology,in_silico,review,methodology,other,not_reported", "instructions": "Choose best match."}, - {"field": "exposure_route", "type": "enum", "enum_values": "oral,inhalation,dermal,parenteral,multiple,not_reported", "instructions": "Choose best match."}, - {"field": "species", "type": "enum", "enum_values": "human,rat,mouse,rabbit,dog,non_human_primate,cell_line,other,not_reported", "instructions": "Choose best match."}, - {"field": "dose_metrics", "type": "list[str]", "enum_values": "", "instructions": "Capture NOAEL/LOAEL/BMD/BMDL/LD50/LC50 etc with units and route if available."}, - {"field": "key_findings", "type": "str", "enum_values": "", "instructions": "2–4 short sentences summarizing major findings. Grounded to text."}, - {"field": "conclusion", "type": "str", "enum_values": "", "instructions": "Paper's conclusion about safety/risk (grounded)."}, +FAMILIES = [ + "Genotoxicity", + "Repeated dose", + "Carcinogenicity", + "Repro/Developmental", + "Irritation/Sensitization", + "NAMs/In Silico", ] -PRESET_NAMS_INSILICO = [ - {"field": "approach", "type": "enum", "enum_values": "in_vivo,in_vitro,in_silico,nams,mixed,not_reported", "instructions": "Identify if results are in silico or NAMs; use mixed if multiple."}, - {"field": "in_silico_methods", "type": "list[enum]", "enum_values": "qsar,read_across,molecular_docking,molecular_dynamics,pbpk_pbtK,aop_based,ml_model,other,not_reported", "instructions": "If in_silico, list methods used (multiple allowed)."}, - {"field": "nams_methods", "type": "list[enum]", "enum_values": "high_throughput_screening_hts,omics_transcriptomics,omics_proteomics,omics_metabolomics,organ_on_chip,microphysiological_system_mps,3d_tissue_model,in_chemico_assay,in_silico_as_nams,other,not_reported", "instructions": "If NAMs, list methods used (multiple allowed)."}, - {"field": "nams_or_insilico_key_results", "type": "str", "enum_values": "", "instructions": "Summarize in silico / NAMs results and key metrics (grounded)."}, -] +OECD_TG_BY_FAMILY = { + "Genotoxicity": [ + "OECD TG 471 (AMES)", + "OECD TG 473 (In Vitro Chromosomal Aberration)", + "OECD TG 476 (In Vitro Gene Mutation: Hprt/xprt)", + "OECD TG 487 (In Vitro Micronucleus)", + "OECD TG 490 (In Vitro Gene Mutation: TK)", + "OECD TG 474 (In Vivo Micronucleus)", + "OECD TG 475 (In Vivo Chromosomal Aberration)", + "OECD TG 488 (Transgenic Rodent Gene Mutation)", + "OECD TG 489 (In Vivo Comet Assay)", + ], +} -PRESET_GENOTOX_OECD = [ - { - "field": "genotox_oecd_tg_in_vitro", - "type": "list[enum]", - "enum_values": "OECD_TG_471_Bacterial Reverse mutation test(AMES test),OECD_TG_473_In Vitro Mammalian Chromosomal Aberration Test,OECD_TG_476_In Vitro Mammalian Cell Gene Mutation Tests (Hprt & xprt),OECD_TG_487_In Vitro Mammalian Cell Micronucleus Test,OECD_TG_490_In Vitro Mammalian Cell Gene Mutation Tests (Thymidine Kinase),not_reported", - "instructions": "Select all in vitro OECD TGs explicitly reported (or clearly described). If none, use not_reported." - }, - { - "field": "genotox_oecd_tg_in_vivo", - "type": "list[enum]", - "enum_values": "OECD_TG_474_In Vivo Mammalian Erythrocyte Micronucleus Test,OECD_TG_475_Mammalian Bone Marrow Chromosomal Aberration Test,OECD_TG_488_Transgenic Rodent Somatic & Germ Cell Gene Mutation Assays,OECD_TG_489_In Vivo Mammalian Alkaline Comet Assay,not_reported", - "instructions": "Select all in vivo OECD TGs explicitly reported (or clearly described). If none, use not_reported." - }, - {"field": "genotoxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Classify overall genotoxicity outcome as reported. If unclear, not_reported."}, - {"field": "genotoxicity_result_notes", "type": "str", "enum_values": "", "instructions": "Short explanation grounded to text + test context (e.g., AMES, micronucleus)."}, -] +# default stance scale (your requested regulatory phrasing) +RISK_STANCE_ENUM = ["acceptable", "acceptable_with_uncertainty", "not_acceptable", "insufficient_data"] -PRESET_ACUTE_TOX = [ - {"field": "acute_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "If acute toxicity is assessed, classify as positive/negative/equivocal; otherwise not_reported."}, - {"field": "acute_toxicity_key_metrics", "type": "list[str]", "enum_values": "", "instructions": "Extract LD50/LC50/EC50/IC50 etc with units/route/species if available."}, - {"field": "acute_toxicity_notes", "type": "str", "enum_values": "", "instructions": "Grounded summary of acute toxicity findings."}, -] +# ============================= +# Admin lock (Space secret) +# ============================= +ADMIN_PASSWORD_ENV = os.getenv("ADMIN_PASSWORD", "").strip() # set in HF Space Secrets +def check_admin_password(pw: str) -> bool: + if not ADMIN_PASSWORD_ENV: + return False + return (pw or "").strip() == ADMIN_PASSWORD_ENV -PRESET_REPEATED_DOSE = [ - {"field": "repeated_dose_noael_loael", "type": "list[str]", "enum_values": "", "instructions": "Extract NOAEL/LOAEL (and study duration) with units/route if available."}, - {"field": "repeated_dose_target_organs", "type": "list[str]", "enum_values": "", "instructions": "List target organs/critical effects explicitly reported."}, - {"field": "repeated_dose_notes", "type": "str", "enum_values": "", "instructions": "Grounded summary of repeated-dose toxicity conclusions."}, -] +# ============================= +# Pilot limits (can be tuned) +# ============================= +MAX_PAGES_DEFAULT = 20 +MAX_CONTEXT_CHARS_DEFAULT = 20000 -PRESET_IRR_SENS = [ - {"field": "skin_irritation_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Skin irritation outcome (as reported)."}, - {"field": "eye_irritation_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Eye irritation outcome (as reported)."}, - {"field": "skin_sensitization_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Skin sensitization outcome (as reported)."}, - {"field": "irritation_sensitization_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including method/model if stated."}, -] +# ============================= +# Optional: Private core loader (recommended for IP protection) +# ============================= +# Set these as Space Secrets / Variables: +# - HF_TOKEN : token that can read your private/gated core repo +# - TOXRA_CORE_REPO : e.g. "toxra-ai/toxra_core" +# - TOXRA_CORE_FILENAME : e.g. "toxra_core-0.1.0-py3-none-any.whl" +# - TOXRA_CORE_REPO_TYPE : "dataset" or "model" (default: dataset) +# - DISABLE_FALLBACK : "1" to prevent running the fallback extractor (stronger protection) +DISABLE_FALLBACK = os.getenv("DISABLE_FALLBACK", "0").strip() == "1" -PRESET_REPRO_DEV = [ - {"field": "reproductive_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Reproductive toxicity outcome (as reported)."}, - {"field": "developmental_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Developmental toxicity outcome (as reported)."}, - {"field": "repro_dev_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including endpoints and study design if stated."}, -] +def ensure_private_core_installed() -> Tuple[bool, str]: + repo = os.getenv("TOXRA_CORE_REPO", "").strip() + filename = os.getenv("TOXRA_CORE_FILENAME", "").strip() + repo_type = os.getenv("TOXRA_CORE_REPO_TYPE", "dataset").strip() or "dataset" + token = os.getenv("HF_TOKEN", "").strip() -PRESET_CARCINOGENICITY = [ - {"field": "carcinogenicity_result", "type": "enum", "enum_values": "carcinogenic,not_carcinogenic,insufficient_data,not_reported", "instructions": "As reported. If evidence insufficient, insufficient_data."}, - {"field": "carcinogenicity_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including species, duration, tumor findings if stated."}, -] + if not repo or not filename: + return False, "Private core not configured (TOXRA_CORE_REPO/TOXRA_CORE_FILENAME not set)." -ENDPOINT_MODULES: Dict[str, List[Dict[str, Any]]] = { - "Genotoxicity (OECD TG)": PRESET_GENOTOX_OECD, - "NAMs / In Silico": PRESET_NAMS_INSILICO, - "Acute toxicity": PRESET_ACUTE_TOX, - "Repeated dose toxicity": PRESET_REPEATED_DOSE, - "Irritation / Sensitization": PRESET_IRR_SENS, - "Repro / Developmental": PRESET_REPRO_DEV, - "Carcinogenicity": PRESET_CARCINOGENICITY, -} + if hf_hub_download is None: + return False, "huggingface_hub not installed; cannot load private core." -# Endpoint presets (requested) -ENDPOINT_PRESETS: Dict[str, List[str]] = { - "Required – Safety Assessor": [ - "Genotoxicity (OECD TG)", - "Repeated dose toxicity", - "Irritation / Sensitization", - "Repro / Developmental", - "Acute toxicity", - ], - "Core only (fast)": [], - "Screening – NAMs + Genotox": ["NAMs / In Silico", "Genotoxicity (OECD TG)"], - "Full – All endpoints": list(ENDPOINT_MODULES.keys()), -} + if not token: + return False, "HF_TOKEN missing; cannot download private core." -ENDPOINT_QUERY_HINTS: Dict[str, List[str]] = { - "Genotoxicity (OECD TG)": ["genotoxicity", "mutagenicity", "AMES", "micronucleus", "comet assay", "chromosomal aberration", "OECD TG 471 473 476 487 490 474 489"], - "NAMs / In Silico": ["in silico", "QSAR", "read-across", "AOP", "PBPK", "high-throughput", "omics", "organ-on-chip", "microphysiological"], - "Acute toxicity": ["acute toxicity", "LD50", "LC50", "single dose", "lethality", "mortality"], - "Repeated dose toxicity": ["repeated dose", "subchronic", "chronic", "NOAEL", "LOAEL", "target organ", "90-day", "28-day"], - "Irritation / Sensitization": ["skin irritation", "eye irritation", "sensitization", "LLNA", "Draize"], - "Repro / Developmental": ["reproductive toxicity", "fertility", "developmental toxicity", "teratogenic", "prenatal", "postnatal"], - "Carcinogenicity": ["carcinogenicity", "tumor", "neoplasm", "cancer", "two-year bioassay"], -} + try: + wheel_path = hf_hub_download( + repo_id=repo, + filename=filename, + repo_type=repo_type, + token=token, + ) + # install wheel (no deps to keep it fast + deterministic) + subprocess.check_call([sys.executable, "-m", "pip", "install", "--no-deps", "--upgrade", wheel_path]) + return True, f"✅ Private core installed from {repo} ({filename})." + except Exception as e: + return False, f"⚠️ Failed to install private core: {e}" + +def try_import_core(): + try: + import toxra_core # type: ignore + return toxra_core, "✅ toxra_core imported." + except Exception as e: + return None, f"ℹ️ toxra_core not available: {e}" # ============================= -# PDF extraction (text-based PDFs only) +# PDF utilities (text-based only) # ============================= -def extract_pages_from_pdf(pdf_path: str, max_pages: int = 0) -> Tuple[List[Tuple[int, str]], int]: +def extract_pages(pdf_path: str, max_pages: int) -> Tuple[List[Tuple[int, str]], int]: reader = PdfReader(pdf_path) - page_count = len(reader.pages) - pages_to_read = page_count if (max_pages is None or max_pages <= 0) else min(page_count, int(max_pages)) - + total = len(reader.pages) + n = min(total, max_pages) pages: List[Tuple[int, str]] = [] - for i in range(pages_to_read): + for i in range(n): try: - t = reader.pages[i].extract_text() or "" + txt = reader.pages[i].extract_text() or "" except Exception: - t = "" - pages.append((i + 1, t or "")) - return pages, page_count - + txt = "" + pages.append((i + 1, txt)) + return pages, total def clean_text(t: str) -> str: - t = t or "" - t = t.replace("\x00", " ") + t = (t or "").replace("\x00", " ") t = re.sub(r"\s+", " ", t).strip() return t - -def chunk_pages(pages: List[Tuple[int, str]], target_chars: int = 3000) -> List[Dict[str, Any]]: - chunks = [] - buf = [] - start_page = None - cur_len = 0 - - for pno, txt in pages: - txt = clean_text(txt) - if not txt: - continue - if start_page is None: - start_page = pno - - if cur_len + len(txt) + 1 > target_chars and buf: - end_page = pno - 1 - end_page = end_page if end_page >= start_page else start_page - chunks.append({"pages": f"{start_page}-{end_page}", "text": " ".join(buf)}) - buf = [txt] - start_page = pno - cur_len = len(txt) - else: - buf.append(txt) - cur_len += len(txt) + 1 - - if buf and start_page is not None: - end_page = pages[-1][0] if pages else start_page - chunks.append({"pages": f"{start_page}-{end_page}", "text": " ".join(buf)}) - - return chunks - - -def _text_based_pdf_warning(pages: List[Tuple[int, str]]) -> bool: +def is_text_based(pages: List[Tuple[int, str]]) -> bool: joined = " ".join([clean_text(t) for _, t in pages if clean_text(t)]) - return len(joined.strip()) < 200 + return len(joined) >= 200 +def sha1_text(s: str) -> str: + return hashlib.sha1((s or "").encode("utf-8", errors="ignore")).hexdigest()[:12] # ============================= -# Lightweight retrieval (TF-IDF) +# Simple organ inference (kept lightweight) # ============================= -def select_relevant_chunks( - chunks: List[Dict[str, Any]], - queries: List[str], - top_per_query: int = 2, - max_chunks: int = 12 -) -> List[Dict[str, Any]]: - texts = [c["text"] for c in chunks] - if not texts: - return [] - - vectorizer = TfidfVectorizer(stop_words="english", ngram_range=(1, 2), max_features=20000) - X = vectorizer.fit_transform(texts) - - selected_idx: List[int] = [] - for q in queries: - q = (q or "").strip() - if not q: - continue - qv = vectorizer.transform([q]) - sims = (X @ qv.T).toarray().ravel() - idx = np.argsort(sims)[::-1] - for i in idx[:top_per_query]: - if i not in selected_idx: - selected_idx.append(i) - - if not selected_idx: - selected_idx = list(range(min(len(chunks), max_chunks))) - - return [chunks[i] for i in selected_idx[:max_chunks]] - - -def build_context(selected_chunks: List[Dict[str, Any]], max_chars: int = 20000) -> str: - parts = [] - total = 0 - for c in selected_chunks: - block = f"[pages {c['pages']}]\n{c['text']}\n" - if total + len(block) > max_chars: - break - parts.append(block) - total += len(block) - return "\n".join(parts).strip() +ORGAN_HINTS = { + "liver": ["liver", "hepatic", "hepatocyte", "bile", "alt", "ast"], + "lung": ["lung", "pulmonary", "alveol", "airway", "inhalation", "respiratory"], + "kidney": ["kidney", "renal", "nephro", "glomerul", "creatinine", "bun"], + "skin": ["skin", "dermal", "epiderm", "cutaneous"], + "gi": ["gastro", "intestinal", "gut", "colon", "stomach", "oral", "ingestion"], + "cns": ["brain", "cns", "neuro", "neuronal", "blood-brain"], + "reproductive": ["testis", "ovary", "uterus", "placent", "fetus", "embryo"], + "immune_blood": ["immune", "cytok", "inflamm", "blood", "serum", "hemat"], +} +def infer_organ_label(doc_text: str) -> str: + t = (doc_text or "").lower() + scores = {k: 0 for k in ORGAN_HINTS.keys()} + for organ, hints in ORGAN_HINTS.items(): + for h in hints: + if h in t: + scores[organ] += 1 + best = sorted(scores.items(), key=lambda x: x[1], reverse=True) + if not best or best[0][1] == 0: + return "unknown" + top_org, top_score = best[0] + if len(best) > 1 and best[1][1] > 0 and (top_score - best[1][1]) <= 1: + return "mixed" + return top_org # ============================= -# Spec -> JSON schema +# Admin JSON defaults (kept small; you can expand in Admin) # ============================= -def slugify_field(name: str) -> str: - name = (name or "").strip() - name = re.sub(r"[^\w\s-]", "", name) - name = re.sub(r"[\s-]+", "_", name).lower() - return name[:80] if name else "field" +DEFAULT_CONTROLLED_VOCAB = { + "risk_stance_enum": RISK_STANCE_ENUM, + "genotoxicity_oecd_tg_in_vitro_enum": [ + "OECD_TG_471_Bacterial Reverse mutation test(AMES test)", + "OECD_TG_473_In Vitro Mammalian Chromosomal Aberration Test", + "OECD_TG_476_In Vitro Mammalian Cell Gene Mutation Tests (Hprt & xprt)", + "OECD_TG_487_In Vitro Mammalian Cell Micronucleus Test", + "OECD_TG_490_In Vitro Mammalian Cell Gene Mutation Tests (Thymidine Kinase)", + "not_reported", + ], + "genotoxicity_oecd_tg_in_vivo_enum": [ + "OECD_TG_474_In Vivo Mammalian Erythrocyte Micronucleus Test", + "OECD_TG_475_Mammalian Bone Marrow Chromosomal Aberration Test", + "OECD_TG_488_Transgenic Rodent Somatic & Germ Cell Gene Mutation Assays", + "OECD_TG_489_In Vivo Mammalian Alkaline Comet Assay", + "not_reported", + ], + "approach_enum": ["in_vivo", "in_vitro", "in_silico", "nams", "mixed", "not_reported"], +} +# Field spec that drives extraction columns (Admin can edit) +DEFAULT_FIELD_SPEC = [ + {"field": "paper_title", "type": "str", "enum_values": "", "instructions": "Title of the paper/report if stated."}, + {"field": "chemicals", "type": "list[str]", "enum_values": "", "instructions": "Primary chemical(s) studied; include common name + abbreviation if present."}, + {"field": "cas_numbers", "type": "list[str]", "enum_values": "", "instructions": "Any CAS numbers mentioned."}, + {"field": "organ", "type": "enum", "enum_values": "liver,lung,kidney,skin,gi,cns,reproductive,immune_blood,mixed,unknown", "instructions": "Organ label from paper context."}, + {"field": "study_type", "type": "enum", "enum_values": "in_vivo,in_vitro,epidemiology,in_silico,review,methodology,other", "instructions": "Choose the best match."}, + {"field": "approach", "type": "enum", "enum_values": "in_vivo,in_vitro,in_silico,nams,mixed,not_reported", "instructions": "Identify if results are in silico or NAMs; use 'mixed' if multiple."}, + {"field": "genotoxicity_oecd_tg_in_vitro", "type": "list[enum]", "enum_values": "genotoxicity_oecd_tg_in_vitro_enum", "instructions": "If reported, choose matching in vitro OECD TG(s)."}, + {"field": "genotoxicity_oecd_tg_in_vivo", "type": "list[enum]", "enum_values": "genotoxicity_oecd_tg_in_vivo_enum", "instructions": "If reported, choose matching in vivo OECD TG(s)."}, + {"field": "genotoxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Genotoxicity overall result if stated."}, + {"field": "risk_stance", "type": "enum", "enum_values": "risk_stance_enum", "instructions": "acceptable / acceptable_with_uncertainty / not_acceptable / insufficient_data."}, + {"field": "risk_confidence", "type": "num", "enum_values": "", "instructions": "0-1 confidence for risk stance (use low if unclear)."}, + {"field": "risk_summary", "type": "str", "enum_values": "", "instructions": "2–4 sentences summarizing the paper’s safety/risk posture (neutral)."}, + {"field": "key_findings", "type": "str", "enum_values": "", "instructions": "3–5 lines of key findings grounded to the text."}, + {"field": "conclusion", "type": "str", "enum_values": "", "instructions": "What does the paper conclude about safety/risk?"}, +] -def parse_field_spec(spec: str) -> Tuple[Dict[str, Any], Dict[str, str]]: - props: Dict[str, Any] = {} - instr: Dict[str, str] = {} +# ============================= +# Fallback extractor (basic) +# - Used only if toxra_core is unavailable. +# - Disable it (DISABLE_FALLBACK=1) once you move pipeline into a private toxra_core wheel. +# ============================= +def get_openai_client(api_key: str) -> OpenAI: + if OpenAI is None: + raise RuntimeError("openai package not available.") + key = (api_key or "").strip() or os.getenv("OPENAI_API_KEY", "").strip() + if not key: + raise ValueError("Missing OpenAI API key. Provide it or set OPENAI_API_KEY secret.") + return OpenAI(api_key=key) - for raw_line in (spec or "").splitlines(): - line = raw_line.strip() - if not line or line.startswith("#"): - continue +def parse_admin_json(vocab_json: str, spec_json: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]], str]: + try: + vocab = json.loads(vocab_json) if vocab_json else DEFAULT_CONTROLLED_VOCAB + except Exception as e: + return DEFAULT_CONTROLLED_VOCAB, DEFAULT_FIELD_SPEC, f"⚠️ Vocab JSON parse error: {e}" - parts = [p.strip() for p in line.split("|")] - if len(parts) < 2: - continue + try: + spec = json.loads(spec_json) if spec_json else DEFAULT_FIELD_SPEC + if not isinstance(spec, list): + raise ValueError("Field spec must be a list.") + except Exception as e: + return vocab, DEFAULT_FIELD_SPEC, f"⚠️ Spec JSON parse error: {e}" - field_name = parts[0] - ftype = parts[1] - finstr = parts[2] if len(parts) >= 3 else "" + return vocab, spec, "✅ Admin JSON loaded." - key = slugify_field(field_name) - instr[key] = finstr +def build_schema_from_spec(vocab: Dict[str, Any], spec: List[Dict[str, Any]]) -> Dict[str, Any]: + # Minimal JSON schema for OpenAI response_format json_schema + def field_schema(f: Dict[str, Any]) -> Dict[str, Any]: + ftype = (f.get("type") or "str").strip() + enum_values = (f.get("enum_values") or "").strip() - schema: Dict[str, Any] = {"type": "string"} + # resolve enums that reference vocab keys + enum_list = None + if ftype in ("enum", "list[enum]"): + if enum_values in vocab and isinstance(vocab[enum_values], list): + enum_list = [str(x) for x in vocab[enum_values]] + else: + enum_list = [x.strip() for x in enum_values.split(",") if x.strip()] if ftype == "str": - schema = {"type": "string"} - elif ftype == "num": - schema = {"type": "number"} - elif ftype == "bool": - schema = {"type": "boolean"} - elif ftype.startswith("list[enum[") and ftype.endswith("]]"): - inside = ftype[len("list[enum["):-2].strip() - vals = [v.strip() for v in inside.split(",") if v.strip()] - schema = {"type": "array", "items": {"type": "string", "enum": vals}} - elif ftype.startswith("list[str]"): - schema = {"type": "array", "items": {"type": "string"}} - elif ftype.startswith("list[num]"): - schema = {"type": "array", "items": {"type": "number"}} - elif ftype.startswith("enum[") and ftype.endswith("]"): - inside = ftype[len("enum["):-1].strip() - vals = [v.strip() for v in inside.split(",") if v.strip()] - schema = {"type": "string", "enum": vals} - else: - schema = {"type": "string"} - - props[key] = schema - - return props, instr + return {"type": ["string", "null"]} + if ftype == "num": + return {"type": ["number", "null"]} + if ftype == "bool": + return {"type": ["boolean", "null"]} + if ftype == "list[str]": + return {"type": ["array", "null"], "items": {"type": "string"}} + if ftype == "list[num]": + return {"type": ["array", "null"], "items": {"type": "number"}} + if ftype == "enum": + return {"type": ["string", "null"], "enum": enum_list or []} + if ftype == "list[enum]": + return {"type": ["array", "null"], "items": {"type": "string", "enum": enum_list or []}} + return {"type": ["string", "null"]} + + record_props: Dict[str, Any] = { + "file": {"type": "string"}, + "row_mode": {"type": "string", "enum": ["one_row_per_paper", "one_row_per_chemical_endpoint"]}, + "chemical": {"type": ["string", "null"]}, + "endpoint": {"type": ["string", "null"]}, + } + for f in spec: + name = (f.get("field") or "").strip() + if not name: + continue + record_props[name] = field_schema(f) -def build_extraction_schema(field_props: Dict[str, Any], vocab: Dict[str, Any]) -> Dict[str, Any]: - risk_enum = vocab.get("risk_stance_enum", ["acceptable","acceptable_with_uncertainty","not_acceptable","insufficient_data"]) - all_field_keys = list(field_props.keys()) + # Require all properties (OpenAI schema validator wants required list to include all keys) + required_keys = list(record_props.keys()) - return { + schema = { "type": "object", - "additionalProperties": False, "properties": { - "paper_title": {"type": "string"}, - "risk_stance": {"type": "string", "enum": risk_enum}, - "risk_confidence": {"type": "number", "minimum": 0, "maximum": 1}, - "risk_summary": {"type": "string"}, - "extracted": { - "type": "object", - "additionalProperties": False, - "properties": field_props, - "required": all_field_keys + "records": { + "type": "array", + "items": { + "type": "object", + "properties": record_props, + "required": required_keys, + "additionalProperties": False, + }, }, "evidence": { "type": "array", "items": { "type": "object", - "additionalProperties": False, "properties": { + "record_index": {"type": "integer"}, "field": {"type": "string"}, + "page": {"type": "integer"}, "quote": {"type": "string"}, - "pages": {"type": "string"} }, - "required": ["field", "quote", "pages"] - } - } + "required": ["record_index", "field", "page", "quote"], + "additionalProperties": False, + }, + }, + "notes": {"type": "string"}, }, - "required": ["paper_title","risk_stance","risk_confidence","risk_summary","extracted","evidence"] + "required": ["records", "evidence", "notes"], + "additionalProperties": False, } + return schema - -# ============================= -# OpenAI client + extraction -# ============================= -def get_openai_client(api_key: str) -> OpenAI: - key = (api_key or "").strip() or os.getenv("OPENAI_API_KEY", "").strip() - if not key: - raise ValueError("Missing OpenAI API key. Provide it in the UI or set OPENAI_API_KEY secret in Hugging Face.") - return OpenAI(api_key=key) - - -def openai_structured_extract( - client: OpenAI, - model: str, - schema: Dict[str, Any], - controlled_vocab: Dict[str, Any], - field_instructions: Dict[str, str], - context: str -) -> Dict[str, Any]: - field_instr_lines = [f"- {k}: {v if v else '(no extra instructions)'}" for k, v in field_instructions.items()] - vocab_text = json.dumps(controlled_vocab, indent=2) - - system_msg = ( - "You are a toxicology research paper data-extraction assistant for an industry safety assessor.\n" - "Grounding rules (must follow):\n" - "1) Use ONLY the provided excerpts; do NOT invent details.\n" - "2) If a value is not explicitly stated, output empty string or empty list, OR the enum value 'not_reported'/'insufficient_data' when applicable.\n" - "3) Provide evidence quotes + page ranges for extracted fields.\n" - "4) risk_stance is regulatory: acceptable / acceptable_with_uncertainty / not_acceptable / insufficient_data.\n" - "5) Prefer controlled vocab terms when applicable.\n" - ) - - user_msg = ( - "CONTROLLED VOCAB (JSON):\n" - f"{vocab_text}\n\n" - "FIELD INSTRUCTIONS:\n" - + "\n".join(field_instr_lines) - + "\n\n" - "EXCERPTS (with page ranges):\n" - f"{context}\n" - ) - - resp = client.responses.create( - model=model, - input=[ - {"role": "system", "content": system_msg}, - {"role": "user", "content": user_msg} - ], - text={ - "format": { - "type": "json_schema", - "name": "tox_extraction", - "schema": schema, - "strict": True - } - } - ) - return json.loads(resp.output_text) - - -def openai_synthesize_across_papers(client: OpenAI, model: str, rows: List[Dict[str, Any]]) -> str: - system_msg = ( - "You are a senior toxicology safety assessor summarizing multiple papers.\n" - "Create a concise synthesis: consensus, disagreements, data gaps, and actionable next steps.\n" - "Base strictly on the provided extracted JSON (which is evidence-backed).\n" - ) - user_msg = "EXTRACTED_ROWS_JSON:\n" + json.dumps(rows, indent=2) - resp = client.responses.create(model=model, input=[{"role":"system","content":system_msg},{"role":"user","content":user_msg}]) - return resp.output_text - - -# ============================= -# Controlled vocab editor helpers (lists only) + search filter -# ============================= -def _filter_terms_df(df: pd.DataFrame, query: str) -> pd.DataFrame: - if df is None or df.empty: - return pd.DataFrame(columns=["term"]) - q = (query or "").strip().lower() - if not q: - return df[["term"]].copy() - mask = df["term"].astype(str).str.lower().str.contains(q, na=False) - return df.loc[mask, ["term"]].copy() - - -def vocab_init_state(vocab_json: str): - try: - vocab = json.loads(vocab_json or DEFAULT_CONTROLLED_VOCAB_JSON) - except Exception: - vocab = json.loads(DEFAULT_CONTROLLED_VOCAB_JSON) - - list_keys = sorted([k for k, v in vocab.items() if isinstance(v, list)]) - default_key = list_keys[0] if list_keys else None - terms = vocab.get(default_key, []) if default_key else [] - full_df = pd.DataFrame({"term": terms}) - filtered_df = _filter_terms_df(full_df, "") - return vocab, list_keys, default_key, full_df, filtered_df, json.dumps(vocab, indent=2), "✅ Vocab loaded." - - -def vocab_reset_defaults_ui(): - vocab, keys, k0, full_df, filtered_df, vjson, msg = vocab_init_state(DEFAULT_CONTROLLED_VOCAB_JSON) - return vocab, gr.update(choices=keys, value=k0), full_df, filtered_df, vjson, msg, vjson - - -def vocab_load_category(vocab_state: Dict[str, Any], category: str, search: str): - if not category or category not in vocab_state: - empty = pd.DataFrame(columns=["term"]) - return empty, empty, "Select a category." - terms = vocab_state.get(category, []) - if not isinstance(terms, list): - empty = pd.DataFrame(columns=["term"]) - return empty, empty, "This category is not a list." - full = pd.DataFrame({"term": terms}) - filtered = _filter_terms_df(full, search) - return full, filtered, f"Editing: {category}" - - -def vocab_add_term(vocab_state: Dict[str, Any], category: str, term: str, search: str): - term = (term or "").strip() - if not term: - return gr.update(), gr.update(), "", "Enter a term to add." - if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list): - return gr.update(), gr.update(), "", "Pick a list category first." - - if term not in vocab_state[category]: - vocab_state[category].append(term) - - full = pd.DataFrame({"term": vocab_state[category]}) - filtered = _filter_terms_df(full, search) - return full, filtered, "", f"Added: {term}" - - -def vocab_remove_term(vocab_state: Dict[str, Any], category: str, term: str, search: str): - term = (term or "").strip() - if not term: - return gr.update(), gr.update(), "", "Enter a term to remove." - if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list): - return gr.update(), gr.update(), "", "Pick a list category first." - - vocab_state[category] = [t for t in vocab_state[category] if t != term] - full = pd.DataFrame({"term": vocab_state[category]}) - filtered = _filter_terms_df(full, search) - return full, filtered, "", f"Removed: {term}" - - -def vocab_apply_df(vocab_state: Dict[str, Any], category: str, terms_df: Any, search: str): - if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list): - return json.dumps(vocab_state, indent=2), pd.DataFrame(columns=["term"]), "Pick a list category first." - - try: - df = terms_df if isinstance(terms_df, pd.DataFrame) else pd.DataFrame(terms_df, columns=["term"]) - except Exception: - return json.dumps(vocab_state, indent=2), pd.DataFrame(columns=["term"]), "Could not parse terms table." - - terms = [] - for t in df.get("term", []).tolist(): - t = (str(t) if t is not None else "").strip() - if t and t not in terms: - terms.append(t) - - vocab_state[category] = terms - vjson = json.dumps(vocab_state, indent=2) - filtered = _filter_terms_df(pd.DataFrame({"term": terms}), search) - return vjson, filtered, f"✅ Applied {len(terms)} terms to {category}." - - -def vocab_filter_preview(terms_df, search): - try: - df = terms_df if isinstance(terms_df, pd.DataFrame) else pd.DataFrame(terms_df, columns=["term"]) - except Exception: - df = pd.DataFrame(columns=["term"]) - return _filter_terms_df(df, search) - - -# ============================= -# Field mapping from endpoints -# ============================= -TYPE_CHOICES = ["str", "num", "bool", "list[str]", "list[num]", "enum", "list[enum]"] - - -def build_spec_from_field_rows(rows: List[Dict[str, Any]]) -> str: - lines = [ - "# One field per line: Field Name | type | instructions", - "# types: str, num, bool, list[str], list[num], enum[a,b,c], list[enum[a,b,c]]", - "" - ] - for r in rows: - field = str(r.get("field","")).strip() - ftype = str(r.get("type","")).strip() - enums = str(r.get("enum_values","")).strip() - instr = str(r.get("instructions","")).strip() - - if not field or not ftype: +def build_context_pages(pages: List[Tuple[int, str]], max_context_chars: int) -> str: + # Build a numbered page context with truncation. + parts = [] + used = 0 + for pno, txt in pages: + c = clean_text(txt) + if not c: continue + block = f"[PAGE {pno}]\n{c}\n" + if used + len(block) > max_context_chars: + # try partial + remaining = max(0, max_context_chars - used) + if remaining > 200: + block = block[:remaining] + parts.append(block) + break + parts.append(block) + used += len(block) + return "\n".join(parts) - if ftype == "enum": - vals = [v.strip() for v in enums.split(",") if v.strip()] - type_str = f"enum[{','.join(vals)}]" if vals else "str" - elif ftype == "list[enum]": - vals = [v.strip() for v in enums.split(",") if v.strip()] - type_str = f"list[enum[{','.join(vals)}]]" if vals else "list[str]" - else: - type_str = ftype +def fallback_grounded_extract( + files, + api_key: str, + model: str, + max_pages: int, + max_context_chars: int, + endpoint_families: List[str], + oecd_tgs: List[str], + vocab_json: str, + spec_json: str, +) -> Tuple[Dict[str, Any], str, pd.DataFrame, str, str]: + """ + Returns: + run_state dict, status_text, overview_df, csv_path, details_json_path + """ + vocab, spec, admin_status = parse_admin_json(vocab_json, spec_json) - lines.append(f"{field} | {type_str} | {instr}") + if DISABLE_FALLBACK: + raise RuntimeError("Fallback extractor disabled (DISABLE_FALLBACK=1). Install/use toxra_core.") - return "\n".join(lines).strip() + "\n" + client = get_openai_client(api_key) + if not files: + return {"records": [], "evidence": [], "details": []}, "Upload at least one PDF.", pd.DataFrame(), "", "" -def build_rows_from_endpoints(selected_endpoints: List[str]) -> Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, List[str]]]: - selected_endpoints = selected_endpoints or [] - rows: List[Dict[str, Any]] = [] - field_key_to_module: Dict[str, str] = {} - module_to_keys: Dict[str, List[str]] = {} + records_all: List[Dict[str, Any]] = [] + evidence_all: List[Dict[str, Any]] = [] + details_all: List[Dict[str, Any]] = [] - for r in PRESET_CORE: - rows.append(dict(r)) - k = slugify_field(r["field"]) - field_key_to_module[k] = "Core" - module_to_keys.setdefault("Core", []).append(k) + schema = build_schema_from_spec(vocab, spec) - for module in selected_endpoints: - preset = ENDPOINT_MODULES.get(module) - if not preset: - continue - for r in preset: - rows.append(dict(r)) - k = slugify_field(r["field"]) - field_key_to_module[k] = module - module_to_keys.setdefault(module, []).append(k) + # run each pdf + for f in files: + pdf_path = f.name + filename = os.path.basename(pdf_path) - seen = set() - deduped: List[Dict[str, Any]] = [] - for r in rows: - k = str(r.get("field","")).strip().lower() - if not k or k in seen: + pages, total = extract_pages(pdf_path, max_pages) + if not is_text_based(pages): + # create minimal record with insufficient_data + rec = { + "file": filename, + "row_mode": "one_row_per_paper", + "chemical": None, + "endpoint": None, + } + # fill all spec fields with null / insufficient + for field in [x["field"] for x in spec]: + if field == "risk_stance": + rec[field] = "insufficient_data" + else: + rec[field] = None + records_all.append(rec) + details_all.append({"file": filename, "text_based": False, "pages_indexed": 0, "pages_total": total}) continue - seen.add(k) - deduped.append(r) - - # Rebuild module_to_keys to match deduped - dedup_keys = set([slugify_field(r["field"]) for r in deduped]) - module_to_keys = {m: [k for k in ks if k in dedup_keys] for m, ks in module_to_keys.items()} - - return deduped, field_key_to_module, module_to_keys - - -def apply_endpoint_preset(preset_name: str): - vals = ENDPOINT_PRESETS.get(preset_name, []) - return gr.update(value=vals) + doc_text = " ".join([clean_text(t) for _, t in pages if clean_text(t)]) + organ = infer_organ_label(doc_text) -def sync_fields_from_endpoints(selected_endpoints: List[str], admin_mode: bool, current_rows: List[Dict[str, Any]], current_spec: str): - if admin_mode: - df = pd.DataFrame(current_rows or [], columns=["field","type","enum_values","instructions"]) - return current_rows, df, current_spec, "Admin mode: endpoint selection will not overwrite custom columns." - rows, _, _ = build_rows_from_endpoints(selected_endpoints or []) - df = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"]) - spec = build_spec_from_field_rows(rows) - return rows, df, spec, "✅ Columns updated from selected endpoints." + context = build_context_pages(pages, max_context_chars=max_context_chars) + # Guidance: extraction only for selected endpoints + endpoint_guidance = { + "families": endpoint_families or [], + "oecd_tgs": oecd_tgs or [], + } -def admin_apply_endpoints(selected_endpoints: List[str]): - rows, _, _ = build_rows_from_endpoints(selected_endpoints or []) - df = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"]) - spec = build_spec_from_field_rows(rows) - return rows, df, spec, "✅ Loaded selected endpoints into the builder (Replace)." - - -def fields_add_or_update(field_name: str, ftype: str, enum_values: str, instructions: str, field_rows: List[Dict[str, Any]]): - field_name = (field_name or "").strip() - ftype = (ftype or "").strip() - enum_values = (enum_values or "").strip() - instructions = (instructions or "").strip() - - if not field_name or not ftype: - df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"]) - return field_rows, df, build_spec_from_field_rows(field_rows), "Field name and type are required." - - updated = False - for r in field_rows: - if str(r.get("field","")).strip().lower() == field_name.lower(): - r["type"] = ftype - r["enum_values"] = enum_values - r["instructions"] = instructions - updated = True - break - - if not updated: - field_rows.append({"field": field_name, "type": ftype, "enum_values": enum_values, "instructions": instructions}) - - df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"]) - return field_rows, df, build_spec_from_field_rows(field_rows), ("Updated field." if updated else "Added field.") + system = ( + "You are a toxicology literature extraction assistant for an industry safety assessor.\n" + "Rules:\n" + "1) Stay strictly grounded to the provided PAGE text. If not present, use null or 'not_reported'.\n" + "2) Prefer neutral phrasing.\n" + "3) Decide row_mode:\n" + " - If the document is about a single primary chemical and does not present multiple endpoints per chemical: one_row_per_paper.\n" + " - If multiple chemicals and/or multiple endpoints need separation: one_row_per_chemical_endpoint.\n" + "4) Only extract endpoint-related content for the user-selected endpoint families / OECD TGs.\n" + "5) Provide evidence quotes with page numbers for key fields.\n" + ) + user = ( + f"FILE: {filename}\n" + f"INFERRED_ORGAN (heuristic): {organ}\n\n" + f"USER_SELECTED_ENDPOINTS:\n{json.dumps(endpoint_guidance, indent=2)}\n\n" + f"FIELDS TO EXTRACT:\n{json.dumps(spec, indent=2)}\n\n" + "PAGE TEXT:\n" + f"{context}\n\n" + "Return JSON per the schema." + ) -def fields_apply_df(field_rows: List[Dict[str, Any]], df_in: Any): - try: - df = df_in if isinstance(df_in, pd.DataFrame) else pd.DataFrame(df_in, columns=["field","type","enum_values","instructions"]) - except Exception: - df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"]) - return field_rows, df, build_spec_from_field_rows(field_rows), "Could not parse builder table." + resp = client.responses.create( + model=model, + input=[{"role": "system", "content": system}, {"role": "user", "content": user}], + response_format={ + "type": "json_schema", + "json_schema": { + "name": "toxra_extraction", + "schema": schema, + "strict": True, + }, + }, + ) - cleaned = [] - seen = set() - for _, r in df.iterrows(): - field = str(r.get("field","")).strip() - ftype = str(r.get("type","")).strip() - enums = str(r.get("enum_values","")).strip() - instr = str(r.get("instructions","")).strip() - if not field or not ftype: - continue - k = field.lower() - if k in seen: - continue - seen.add(k) - cleaned.append({"field": field, "type": ftype, "enum_values": enums, "instructions": instr}) + out = resp.output_text.strip() + parsed = json.loads(out) + + # post-process: inject organ if field exists and missing + recs = parsed.get("records", []) + ev = parsed.get("evidence", []) + + # ensure organ field consistency + for r in recs: + if "organ" in r and not r.get("organ"): + r["organ"] = organ + + base_index = len(records_all) + for i, r in enumerate(recs): + records_all.append(r) + for e in ev: + # shift record_index by base_index + try: + e["record_index"] = int(e["record_index"]) + base_index + except Exception: + e["record_index"] = base_index + evidence_all.append(e) + + details_all.append({ + "file": filename, + "text_based": True, + "pages_indexed": min(total, max_pages), + "pages_total": total, + "notes": parsed.get("notes", ""), + "organ_inferred": organ, + }) + + # Build overview table + df = pd.DataFrame(records_all) + # prefer a compact set + overview_cols = [c for c in ["file", "paper_title", "risk_stance", "risk_confidence", "chemical", "endpoint", "row_mode"] if c in df.columns] + if "chemicals" in df.columns and "chemical" not in overview_cols: + overview_cols.append("chemicals") + overview_df = df[overview_cols].copy() if overview_cols else df.head(50) + + # Save CSV and details JSON + ts = int(time.time()) + csv_path = f"/tmp/toxra_extraction_{ts}.csv" + details_json_path = f"/tmp/toxra_details_{ts}.json" - df2 = pd.DataFrame(cleaned, columns=["field","type","enum_values","instructions"]) - spec = build_spec_from_field_rows(cleaned) - return cleaned, df2, spec, f"✅ Applied builder table ({len(cleaned)} fields)." + df.to_csv(csv_path, index=False) + with open(details_json_path, "w", encoding="utf-8") as f: + json.dump({"records": records_all, "evidence": evidence_all, "details": details_all}, f, indent=2) + run_state = {"records": records_all, "evidence": evidence_all, "details": details_all, "csv_path": csv_path, "details_path": details_json_path} -# ============================= -# Row building + “non-empty module” logic -# ============================= -def _as_list(x) -> List[str]: - if x is None: - return [] - if isinstance(x, list): - out = [] - for v in x: - s = str(v).strip() - if s: - out.append(s) - return out - s = str(x).strip() - return [s] if s else [] - - -def _format_value(v: Any) -> Any: - if isinstance(v, list): - return "; ".join([str(x) for x in v if str(x).strip()]) - return v - - -EMPTY_STRINGS = {"", "not_reported", "insufficient_data", "none", "na", "n/a", "null"} - - -def _is_empty_value(v: Any) -> bool: - if v is None: - return True - if isinstance(v, float) and np.isnan(v): - return True - if isinstance(v, list): - cleaned = [str(x).strip() for x in v if str(x).strip()] - if not cleaned: - return True - # empty if all items are not_reported / similar - return all((c.lower() in EMPTY_STRINGS) for c in cleaned) - s = str(v).strip() - if not s: - return True - return s.lower() in EMPTY_STRINGS - - -def _record_id(file_name: str, chemical: str, endpoint: str) -> str: - chemical = (chemical or "").strip() or "-" - endpoint = (endpoint or "").strip() or "Paper" - return f"{file_name} | {chemical} | {endpoint}" - - -def _module_has_any_data(ext: Dict[str, Any], module_keys: List[str], field_props: Dict[str, Any]) -> bool: - for k in (module_keys or []): - v = ext.get(k, None) - if not _is_empty_value(v): - return True - return False + status = f"✅ Done. Records: {len(records_all)} | Evidence items: {len(evidence_all)} | {admin_status}" + return run_state, status, overview_df, csv_path, details_json_path # ============================= -# Evidence + report helpers +# Report helpers (vertical view + evidence) # ============================= -def _make_vertical(records: List[Dict[str, Any]], record_id: str) -> pd.DataFrame: - if not records or not record_id: - return pd.DataFrame(columns=["Field", "Value"]) - row = next((r for r in records if r.get("record_id") == record_id), None) - if not row: - return pd.DataFrame(columns=["Field", "Value"]) - - hidden = {"record_id"} - keys = [k for k in row.keys() if k not in hidden] - return pd.DataFrame({"Field": keys, "Value": [row.get(k, "") for k in keys]}) - - -def _render_evidence(details: List[Dict[str, Any]], file_name: str, allowed_fields: Optional[set] = None, max_items: int = 120) -> str: - if not details or not file_name: - return "" - d = next((x for x in details if x.get("_file") == file_name), None) - if not d: - return "" - ev = d.get("evidence", []) or [] - lines = [] - for e in ev: - field = (e.get("field", "") or "").strip() - if allowed_fields is not None and field and field not in allowed_fields: - continue - quote = (e.get("quote", "") or "").strip() - pages = (e.get("pages", "") or "").strip() - if quote: - if len(quote) > 320: - quote = quote[:320] + "…" - lines.append(f"- **{field}** (pages {pages}): “{quote}”") - if len(lines) >= max_items: - break - header = "### Evidence (grounding)\n" - return header + ("\n".join(lines) if lines else "- (no evidence returned)") - - -def _overview_df_from_records(records: List[Dict[str, Any]]) -> pd.DataFrame: - if not records: - return pd.DataFrame(columns=["record_id","file","paper_title","chemical","endpoint","risk_stance","risk_confidence"]) - df = pd.DataFrame(records) - cols = ["record_id","file","paper_title","chemical","endpoint","risk_stance","risk_confidence"] - cols = [c for c in cols if c in df.columns] - return df[cols].copy() if cols else df.head(50) - - -def _risk_badge(risk: str) -> str: - r = (risk or "").strip().lower() - if r == "acceptable": - bg = "#e7f7ed"; fg = "#0f5132" - elif r == "acceptable_with_uncertainty": - bg = "#fff3cd"; fg = "#664d03" - elif r == "not_acceptable": - bg = "#f8d7da"; fg = "#842029" - else: - bg = "#e2e3e5"; fg = "#41464b" - label = risk if risk else "unknown" - return f'{label}' - - -def _safe_str(x: Any) -> str: - if x is None: - return "" - if isinstance(x, float) and np.isnan(x): - return "" - return str(x) - - -def render_summary_card(record_id: str, records: List[Dict[str, Any]]) -> str: - if not record_id or not records: - return "
Executive Summary
Run extraction to view results.
" - - row = next((r for r in records if r.get("record_id") == record_id), None) - if not row: - return "
Executive Summary
Select a record.
" - - title = _safe_str(row.get("paper_title", "")).strip() or "Untitled paper" - file_name = _safe_str(row.get("file", "")) - chemical = _safe_str(row.get("chemical", "-")) - endpoint = _safe_str(row.get("endpoint", "Paper")) - risk = _safe_str(row.get("risk_stance", "")) - conf = row.get("risk_confidence", "") +def to_vertical_df(record: Dict[str, Any]) -> pd.DataFrame: + rows = [] + for k, v in (record or {}).items(): + if isinstance(v, list): + vv = ", ".join([str(x) for x in v]) + else: + vv = "" if v is None else str(v) + rows.append({"Field": k, "Value": vv}) + return pd.DataFrame(rows, columns=["Field", "Value"]) + +def evidence_markdown_for_record(run_state: Dict[str, Any], record_index: int) -> str: + ev = (run_state or {}).get("evidence", []) or [] + items = [x for x in ev if int(x.get("record_index", -1)) == int(record_index)] + if not items: + return "### Evidence used\n(no evidence captured)" + lines = ["### Evidence used"] + for it in items[:40]: + field = it.get("field", "") + page = it.get("page", "") + quote = (it.get("quote", "") or "").strip() + quote = quote[:260] + ("…" if len(quote) > 260 else "") + lines.append(f"- **{field}** (p.{page}): “{quote}”") + return "\n".join(lines) + +def record_choices(run_state: Dict[str, Any]) -> List[str]: + recs = (run_state or {}).get("records", []) or [] + out = [] + for i, r in enumerate(recs): + file = r.get("file", "") + chem = r.get("chemical") or "" + ep = r.get("endpoint") or "" + label = f"{i}: {file}" + if chem or ep: + label += f" | {chem} | {ep}" + out.append(label) + return out + +def parse_choice_index(choice: str) -> int: + m = re.match(r"^\s*(\d+)\s*:", choice or "") + return int(m.group(1)) if m else 0 + +def build_review_df(run_state: Dict[str, Any]) -> pd.DataFrame: + recs = (run_state or {}).get("records", []) or [] + if not recs: + return pd.DataFrame() + df = pd.DataFrame(recs) + # keep readable review columns first + preferred = [c for c in ["file","paper_title","risk_stance","risk_confidence","row_mode","chemical","endpoint"] if c in df.columns] + rest = [c for c in df.columns if c not in preferred] + return df[preferred + rest].copy() + +def apply_review_edits(df_like: Any, run_state: Dict[str, Any]) -> Tuple[Dict[str, Any], str]: + if run_state is None: + return {"records": [], "evidence": [], "details": []}, "No run state." try: - conf_txt = f"{float(conf):.2f}" if conf != "" else "" - except Exception: - conf_txt = _safe_str(conf) - - key_findings = _safe_str(row.get("key_findings", "")).strip() - dose_metrics = _safe_str(row.get("dose_metrics", "")).strip() - conclusion = _safe_str(row.get("conclusion", "")).strip() - risk_summary = _safe_str(row.get("risk_summary", "")).strip() - - # Keep compact - def _clip(s: str, n: int = 380) -> str: - s = s.strip() - if len(s) <= n: - return s - return s[:n] + "…" - - return f""" -
-
-
Executive Summary
-
{_risk_badge(risk)} confidence: {conf_txt}
-
- -
-
{title}
-
- File: {file_name}   •   - Chemical: {chemical}   •   - Endpoint: {endpoint} -
-
- -
-
-
Key Findings
-
{_clip(key_findings) if key_findings else "(not reported)"}
-
-
-
Dose Metrics
-
{_clip(dose_metrics) if dose_metrics else "(not reported)"}
-
-
-
Conclusion
-
{_clip(conclusion) if conclusion else "(not reported)"}
-
-
-
Risk Summary
-
{_clip(risk_summary) if risk_summary else "(not reported)"}
-
-
-
- """ - + df = df_like if isinstance(df_like, pd.DataFrame) else pd.DataFrame(df_like) + except Exception as e: + return run_state, f"⚠️ Could not parse review table: {e}" + run_state["records"] = df.to_dict(orient="records") + return run_state, f"✅ Saved edits ({len(run_state['records'])} records)." + +def export_reviewed_csv(run_state: Dict[str, Any]) -> Tuple[str, str]: + recs = (run_state or {}).get("records", []) or [] + if not recs: + return "", "No records to export." + df = pd.DataFrame(recs) + ts = int(time.time()) + out_path = f"/tmp/toxra_reviewed_{ts}.csv" + df.to_csv(out_path, index=False) + return out_path, f"✅ Exported reviewed CSV ({len(recs)} records)." # ============================= -# Main extraction handler +# Core pipeline routing (toxra_core if installed; else fallback) # ============================= -def run_extraction( +def run_pipeline( files, - api_key, - model, - selected_endpoints, - field_spec, - vocab_json, - max_pages, - chunk_chars, - max_context_chars, - admin_mode + api_key: str, + model: str, + max_pages: int, + max_context_chars: int, + families: List[str], + tgs: List[str], + vocab_json: str, + spec_json: str, + use_private_core: bool, + core_status_msg: str, ): - if not files: - return ( - "
Executive Summary
Upload PDFs to run extraction.
", - pd.DataFrame(), None, None, "Upload one or more PDFs.", - gr.update(choices=[], value=None), - [], [], pd.DataFrame(columns=["Field","Value"]), "" - ) - - try: - vocab = json.loads(vocab_json or DEFAULT_CONTROLLED_VOCAB_JSON) - except Exception as e: - return ( - "
Executive Summary
Invalid vocab JSON.
", - pd.DataFrame(), None, None, f"Controlled vocab JSON invalid: {e}", - gr.update(choices=[], value=None), - [], [], pd.DataFrame(columns=["Field","Value"]), "" - ) - - field_props, field_instr = parse_field_spec(field_spec or "") - if not field_props: - return ( - "
Executive Summary
No columns defined.
", - pd.DataFrame(), None, None, "No extraction fields are defined. (Check selected endpoints or admin field spec.)", - gr.update(choices=[], value=None), - [], [], pd.DataFrame(columns=["Field","Value"]), "" - ) - - schema = build_extraction_schema(field_props, vocab) - - if admin_mode: - field_key_to_module = {k: "Custom" for k in field_props.keys()} - module_to_keys: Dict[str, List[str]] = {"Custom": list(field_props.keys())} - endpoint_modules_for_rows = ["Custom"] - else: - _, field_key_to_module, module_to_keys = build_rows_from_endpoints(selected_endpoints or []) - endpoint_modules_for_rows = list(selected_endpoints or []) or ["Core"] - - try: - client = get_openai_client(api_key) - except Exception as e: - return ( - "
Executive Summary
Missing API key.
", - pd.DataFrame(), None, None, str(e), - gr.update(choices=[], value=None), - [], [], pd.DataFrame(columns=["Field","Value"]), "" - ) - - paper_details: List[Dict[str, Any]] = [] - output_rows: List[Dict[str, Any]] = [] - - tmpdir = Path(tempfile.mkdtemp(prefix="tox_extract_")) - - for f in files: - pdf_path = f.name - filename = os.path.basename(pdf_path) - - pages, page_count = extract_pages_from_pdf(pdf_path, max_pages=int(max_pages)) - - if _text_based_pdf_warning(pages): - ex = { - "_file": filename, - "_pages_in_pdf": page_count, - "paper_title": "", - "risk_stance": "insufficient_data", - "risk_confidence": 0.0, - "risk_summary": "No extractable text found. This app supports text-based PDFs only (not scanned images).", - "extracted": {k: ([] if field_props[k].get("type") == "array" else "") for k in field_props.keys()}, - "evidence": [] - } + # try toxra_core if requested/available + if use_private_core: + toxra_core, _ = try_import_core() + if toxra_core is None: + if DISABLE_FALLBACK: + return ( + {"records": [], "evidence": [], "details": []}, + f"❌ Private core not available. {core_status_msg}", + pd.DataFrame(), + gr.update(value=None), + gr.update(value=None), + gr.update(choices=[], value=None), + pd.DataFrame(), + "### Evidence used\n", + ) else: - chunks = chunk_pages(pages, target_chars=int(chunk_chars)) - - queries = [ - "regulatory acceptability risk hazard concern conclusion uncertainty evidence NOAEL LOAEL BMD", - "chemical name CAS number", - ] - for ep in (selected_endpoints or []): - queries.extend(ENDPOINT_QUERY_HINTS.get(ep, [])) - for k, ins in field_instr.items(): - queries.append(ins if ins else k) - - selected = select_relevant_chunks(chunks, queries, top_per_query=2, max_chunks=12) - context = build_context(selected, max_chars=int(max_context_chars)) - - ex = openai_structured_extract( - client=client, + # Expected interface (implement inside toxra_core): + # toxra_core.run_extraction(files, api_key, model, max_pages, max_context_chars, families, tgs, vocab_json, spec_json) + # returns: run_state(dict), status(str), overview_df(pd.DataFrame), csv_path(str), details_json_path(str) + run_state, status, overview_df, csv_path, details_path = toxra_core.run_extraction( # type: ignore + files=files, + api_key=api_key, model=model, - schema=schema, - controlled_vocab=vocab, - field_instructions=field_instr, - context=context + max_pages=max_pages, + max_context_chars=max_context_chars, + endpoint_families=families, + oecd_tgs=tgs, + vocab_json=vocab_json, + spec_json=spec_json, + ) + choices = record_choices(run_state) + default_choice = choices[0] if choices else None + vdf = to_vertical_df(run_state["records"][0]) if choices else pd.DataFrame(columns=["Field","Value"]) + ev_md = evidence_markdown_for_record(run_state, 0) if choices else "### Evidence used\n" + return ( + run_state, + status, + overview_df, + gr.update(value=csv_path), + gr.update(value=details_path), + gr.update(choices=choices, value=default_choice), + vdf, + ev_md, ) - ex["_file"] = filename - ex["_pages_in_pdf"] = page_count - - paper_details.append(ex) - - base = { - "file": filename, - "paper_title": ex.get("paper_title", ""), - "risk_stance": ex.get("risk_stance", ""), - "risk_confidence": ex.get("risk_confidence", ""), - "risk_summary": ex.get("risk_summary", ""), - } - - ext = ex.get("extracted") or {} - chemicals = _as_list(ext.get("chemicals")) - if not chemicals: - chemicals = ["-"] - - # Single-chemical => one-row-per-paper - if len(chemicals) <= 1: - chem = chemicals[0] - row = dict(base) - row["chemical"] = chem - row["endpoint"] = "Paper" - row["record_id"] = _record_id(filename, chem, row["endpoint"]) - for k in field_props.keys(): - row[k] = _format_value(ext.get(k, [] if field_props[k].get("type") == "array" else "")) - output_rows.append(row) - - # Multi-chemical => chemical–endpoint rows (ONLY non-empty modules) - else: - core_keys = [k for k, m in field_key_to_module.items() if m == "Core"] if not admin_mode else [] - - # determine which endpoint modules have any data (skip empty ones) - candidate_modules = [m for m in endpoint_modules_for_rows if m != "Core"] - non_empty_modules = [] - for m in candidate_modules: - if _module_has_any_data(ext, module_to_keys.get(m, []), field_props): - non_empty_modules.append(m) - - # If everything empty, fall back to a single Paper row (otherwise you get no rows) - if not non_empty_modules: - row = dict(base) - row["chemical"] = "multiple" - row["endpoint"] = "Paper" - row["record_id"] = _record_id(filename, row["chemical"], row["endpoint"]) - for k in field_props.keys(): - row[k] = _format_value(ext.get(k, [] if field_props[k].get("type") == "array" else "")) - output_rows.append(row) - else: - for chem in chemicals: - for module in non_empty_modules: - row = dict(base) - row["chemical"] = chem - row["endpoint"] = module - row["record_id"] = _record_id(filename, chem, module) - - for k in field_props.keys(): - m = field_key_to_module.get(k, "Custom") - include = (m == module) or admin_mode - if include: - if k == "chemicals": - row[k] = chem - else: - row[k] = _format_value(ext.get(k, [] if field_props[k].get("type") == "array" else "")) - - output_rows.append(row) - - df = pd.DataFrame(output_rows) - records = df.to_dict("records") - - csv_path = tmpdir / "extraction_table.csv" - json_path = tmpdir / "extraction_details.json" - df.to_csv(csv_path, index=False) - json_path.write_text(json.dumps(paper_details, indent=2), encoding="utf-8") - - choices = [r.get("record_id") for r in records if r.get("record_id")] - default = choices[0] if choices else None - vertical = _make_vertical(records, default) if default else pd.DataFrame(columns=["Field","Value"]) - summary_html = render_summary_card(default, records) if default else render_summary_card("", []) - allowed_fields = None - file_for_evidence = None - if default: - selected_row = next((r for r in records if r.get("record_id") == default), {}) - allowed_fields = set([k for k in selected_row.keys() if k not in {"record_id"}]) - file_for_evidence = (default.split(" | ")[0] or "").strip() + # fallback + run_state, status, overview_df, csv_path, details_path = fallback_grounded_extract( + files=files, + api_key=api_key, + model=model, + max_pages=max_pages, + max_context_chars=max_context_chars, + endpoint_families=families, + oecd_tgs=tgs, + vocab_json=vocab_json, + spec_json=spec_json, + ) - evidence = _render_evidence(paper_details, file_for_evidence, allowed_fields=allowed_fields) if file_for_evidence else "" - overview = _overview_df_from_records(records) - status = "✅ Done. Review in the report below and export when ready." + choices = record_choices(run_state) + default_choice = choices[0] if choices else None + vdf = to_vertical_df(run_state["records"][0]) if choices else pd.DataFrame(columns=["Field","Value"]) + ev_md = evidence_markdown_for_record(run_state, 0) if choices else "### Evidence used\n" return ( - summary_html, - overview, - str(csv_path), - str(json_path), + run_state, status, - gr.update(choices=choices, value=default), - records, - paper_details, - vertical, - evidence + overview_df, + gr.update(value=csv_path), + gr.update(value=details_path), + gr.update(choices=choices, value=default_choice), + vdf, + ev_md, ) - -# ============================= -# Review mode handlers -# ============================= -def on_pick(record_id: str, records: List[Dict[str, Any]], details: List[Dict[str, Any]]): - if not record_id: - return render_summary_card("", []), pd.DataFrame(columns=["Field","Value"]), "" - row = next((r for r in (records or []) if r.get("record_id") == record_id), {}) - file_name = (row.get("file") or "") - allowed_fields = set(row.keys()) - {"record_id"} - return render_summary_card(record_id, records), _make_vertical(records, record_id), _render_evidence(details, file_name, allowed_fields=allowed_fields) - - -def toggle_review_mode(is_on: bool): - return gr.update(interactive=bool(is_on)) - - -def save_review_changes(record_id: str, vertical_df: Any, records: List[Dict[str, Any]]): - if not record_id or not records: - return pd.DataFrame(), records, "Nothing to save.", render_summary_card("", []) - - try: - dfv = vertical_df if isinstance(vertical_df, pd.DataFrame) else pd.DataFrame(vertical_df, columns=["Field", "Value"]) - except Exception: - return _overview_df_from_records(records), records, "Could not parse edited vertical table.", render_summary_card(record_id, records) - - dfv = dfv.dropna(subset=["Field"]) - updates = {str(r["Field"]): r["Value"] for _, r in dfv.iterrows() if str(r["Field"]).strip()} - - new_records = [] - updated = False - for r in records: - if r.get("record_id") == record_id: - rr = dict(r) - for k, v in updates.items(): - rr[k] = v - new_records.append(rr) - updated = True - else: - new_records.append(r) - - msg = "Saved changes into session data. Export reviewed CSV to download." if updated else "Record not found." - return _overview_df_from_records(new_records), new_records, msg, render_summary_card(record_id, new_records) - - -def export_reviewed_csv(records: List[Dict[str, Any]]): - if not records: - return None, "No reviewed data to export." - tmpdir = Path(tempfile.mkdtemp(prefix="tox_review_")) - path = tmpdir / "reviewed_extraction_table.csv" - pd.DataFrame(records).to_csv(path, index=False) - return str(path), "Reviewed CSV ready to download." +def on_select_record(choice: str, run_state: Dict[str, Any]): + if not run_state or not run_state.get("records"): + return pd.DataFrame(columns=["Field","Value"]), "### Evidence used\n" + idx = parse_choice_index(choice) + idx = max(0, min(idx, len(run_state["records"]) - 1)) + vdf = to_vertical_df(run_state["records"][idx]) + ev_md = evidence_markdown_for_record(run_state, idx) + return vdf, ev_md # ============================= -# Synthesis tab handler +# Admin lock UI handlers # ============================= -def run_synthesis(api_key, model, extraction_json_file): - if extraction_json_file is None: - return "Upload the extraction_details.json from Extract tab first." - try: - client = get_openai_client(api_key) - except Exception as e: - return str(e) - rows = json.loads(Path(extraction_json_file.name).read_text(encoding="utf-8")) - return openai_synthesize_across_papers(client, model, rows) +def unlock_admin(pw: str): + ok = check_admin_password(pw) + if ok: + return True, "✅ Admin unlocked.", gr.update(visible=True), gr.update(visible=True) + return False, "❌ Wrong password.", gr.update(visible=False), gr.update(visible=False) + +def reset_admin_defaults(): + return json.dumps(DEFAULT_CONTROLLED_VOCAB, indent=2), json.dumps(DEFAULT_FIELD_SPEC, indent=2), "✅ Reset to defaults." # ============================= -# Admin visibility helpers +# Family → TG dependent UI # ============================= -def set_admin_visibility(is_admin: bool): - return ( - gr.update(visible=bool(is_admin)), - gr.update(visible=bool(is_admin)), - gr.update(visible=bool(is_admin)) - ) +def update_tg_choices(families: List[str]): + families = families or [] + tgs: List[str] = [] + for f in families: + tgs += OECD_TG_BY_FAMILY.get(f, []) + # dedupe preserve order + seen = set() + tgs2 = [] + for x in tgs: + if x not in seen: + seen.add(x) + tgs2.append(x) + show = len(tgs2) > 0 + return gr.update(choices=tgs2, value=[], visible=show) # ============================= -# Gradio UI +# Build app # ============================= -with gr.Blocks(title="Toxicology PDF → Grounded Extractor") as demo: - gr.Markdown( - "# Toxicology PDF → Grounded Extractor\n" - "Upload PDFs → choose endpoints → Run → review report → export.\n\n" - "**Note:** Text-based PDFs only (not scanned/image PDFs)." - ) - - state_records = gr.State([]) - state_details = gr.State([]) - vocab_state = gr.State({}) - field_rows_state = gr.State([]) - - field_spec = gr.Textbox(visible=False, interactive=False, lines=8) - vocab_json = gr.Textbox(visible=False, interactive=False, lines=8) +# Attempt to install private core at startup (safe if not configured) +private_core_installed, private_core_status = ensure_private_core_installed() +toxra_core_mod, toxra_core_import_status = try_import_core() + +CORE_STATUS_BANNER = f"{private_core_status} | {toxra_core_import_status}" + +with gr.Blocks(css=TOXRA_CSS, title=APP_NAME, theme=gr.themes.Soft()) as demo: + gr.HTML(f""" +
+
+
{APP_NAME}
+
Grounded toxicology extraction & literature exploration
+
+ Text-based PDFs only + Results-first reporting + Admin-configurable extraction +
+
+
Production • Beta
+
+ """) - with gr.Tab("Extract"): - # --- Run section (simple) --- - with gr.Group(): - files = gr.File(label="Upload toxicology PDFs", file_types=[".pdf"], file_count="multiple") + # Shared states + run_state = gr.State({"records": [], "evidence": [], "details": []}) + admin_unlocked = gr.State(False) - with gr.Row(): - api_key = gr.Textbox(label="OpenAI API key (optional if set as OPENAI_API_KEY secret)", type="password") - model = gr.Dropdown(label="Model", choices=["gpt-4o-2024-08-06", "gpt-4o", "gpt-4o-mini"], value="gpt-4o-2024-08-06") + # Admin JSON stored in hidden state (used by pipeline) + vocab_json_state = gr.State(json.dumps(DEFAULT_CONTROLLED_VOCAB, indent=2)) + spec_json_state = gr.State(json.dumps(DEFAULT_FIELD_SPEC, indent=2)) + with gr.Tabs(): + # ============================= + # TAB 1 — Run Assessment (sidebar + report) + # ============================= + with gr.Tab("Run Assessment"): with gr.Row(): - endpoint_preset = gr.Dropdown( - label="Endpoint preset", - choices=list(ENDPOINT_PRESETS.keys()), - value="Required – Safety Assessor" - ) - endpoints = gr.Dropdown( - label="Endpoints to extract (Core included automatically)", - choices=list(ENDPOINT_MODULES.keys()), - multiselect=True, - value=ENDPOINT_PRESETS["Required – Safety Assessor"] - ) - - extract_btn = gr.Button("Run Extraction", variant="primary") - status = gr.Textbox(label="Status", interactive=False) - - # --- Report (results-first) --- - gr.Markdown("## Report") - summary_card = gr.HTML(render_summary_card("", [])) - - overview_df = gr.Dataframe( - label="Batch Overview", - interactive=False, - wrap=True, - show_row_numbers=True - ) - - with gr.Row(): - out_csv = gr.File(label="Download: extraction_table.csv") - out_json = gr.File(label="Download: extraction_details.json (evidence + structured data)") - - record_pick = gr.Dropdown(label="Select record", choices=[], value=None) - - with gr.Row(): - review_mode = gr.Checkbox(label="Review mode (enable editing)", value=False) - save_btn = gr.Button("Save edits") - export_btn = gr.Button("Export reviewed CSV") - - review_status = gr.Textbox(label="Review status", interactive=False) + # Sidebar + with gr.Column(scale=1, elem_classes=["toxra_card", "toxra_sidebar"]): + gr.Markdown("### Run Assessment") + gr.Markdown( + "
Upload PDF(s), select endpoints, and run. " + "Outputs appear on the right as a report.
" + ) + + pdfs = gr.File(label="Upload toxicology PDFs", file_types=[".pdf"], file_count="multiple") + families = gr.Dropdown(label="Endpoint families", choices=FAMILIES, multiselect=True, value=["Genotoxicity"]) + oecd_tgs = gr.Dropdown(label="OECD TGs (optional)", choices=OECD_TG_BY_FAMILY["Genotoxicity"], multiselect=True, value=[], visible=True) + + families.change(update_tg_choices, inputs=[families], outputs=[oecd_tgs]) + + run_btn = gr.Button("Run assessment", variant="primary") + + with gr.Accordion("Connection & Model", open=False): + api_key = gr.Textbox(label="OpenAI API key (optional if set as OPENAI_API_KEY secret)", type="password") + model = gr.Dropdown( + label="Model", + choices=["gpt-4o-2024-08-06", "gpt-4o", "gpt-4o-mini"], + value="gpt-4o-2024-08-06", + ) + + with gr.Accordion("Advanced (limits)", open=False): + max_pages = gr.Slider(1, 50, value=MAX_PAGES_DEFAULT, step=1, label="Max pages per PDF") + max_context_chars = gr.Slider(5000, 60000, value=MAX_CONTEXT_CHARS_DEFAULT, step=1000, label="Max context sent to model (chars)") + use_private_core = gr.Checkbox( + label="Use private toxra_core (recommended for IP protection)", + value=True, + ) + gr.Markdown(f"
{CORE_STATUS_BANNER}
") + + # Report panel + with gr.Column(scale=3, elem_classes=["toxra_card"]): + gr.Markdown("### Report") + status = gr.Textbox(label="Status", interactive=False) + + overview_df = gr.Dataframe(label="Batch overview (compact)", interactive=False, wrap=True) + + with gr.Row(): + out_csv = gr.File(label="Download: extraction_table.csv", interactive=False) + out_details = gr.File(label="Download: extraction_details.json (evidence + structured)", interactive=False) + + gr.Markdown("#### Readable view (vertical) + evidence") + record_pick = gr.Dropdown(label="Select record", choices=[], value=None) + + vertical_df = gr.Dataframe(label="Vertical record view (Field → Value)", interactive=False, wrap=True) + evidence_md = gr.Markdown() + + run_btn.click( + fn=run_pipeline, + inputs=[ + pdfs, + api_key, + model, + max_pages, + max_context_chars, + families, + oecd_tgs, + vocab_json_state, + spec_json_state, + use_private_core, + gr.State(CORE_STATUS_BANNER), + ], + outputs=[ + run_state, + status, + overview_df, + out_csv, + out_details, + record_pick, + vertical_df, + evidence_md, + ], + ) - with gr.Row(): - vertical_view = gr.Dataframe( - headers=["Field", "Value"], - interactive=False, - wrap=True, - show_row_numbers=False, - label="Extracted fields (vertical)" + record_pick.change( + fn=on_select_record, + inputs=[record_pick, run_state], + outputs=[vertical_df, evidence_md], ) - evidence_md = gr.Markdown() - reviewed_csv = gr.File(label="Download: reviewed_extraction_table.csv") + # ============================= + # TAB 2 — Review & Export + # ============================= + with gr.Tab("Review & Export"): + gr.Markdown("### Review & Export") + gr.Markdown("
Edit extracted fields (if needed) and export a reviewed CSV.
") - # --- Advanced runtime settings (collapsed) --- - with gr.Accordion("Advanced runtime settings", open=False): + review_df = gr.Dataframe(label="Editable extracted table", interactive=True, wrap=True) with gr.Row(): - max_pages = gr.Slider(0, 250, value=0, step=1, label="Max pages to read (0 = all)") - chunk_chars = gr.Slider(1200, 9000, value=3200, step=100, label="Chunk size (chars)") - max_context_chars = gr.Slider(5000, 45000, value=20000, step=1000, label="Max context sent to GPT (chars)") - - # --- Admin tools (collapsed) --- - with gr.Accordion("Admin tools (taxonomy + custom columns)", open=False): - admin_mode = gr.Checkbox(label="Enable Admin mode", value=False) - - admin_group = gr.Group(visible=False) - admin_vocab_group = gr.Group(visible=False) - admin_fields_group = gr.Group(visible=False) - - with admin_group: - gr.Markdown("### Admin: Configure extraction taxonomy + custom columns.") + save_review_btn = gr.Button("Save edits", variant="secondary") + export_btn = gr.Button("Export reviewed CSV", variant="primary") + review_status = gr.Textbox(label="Review status", interactive=False) + reviewed_csv = gr.File(label="Download: reviewed.csv", interactive=False) + + def load_review_table(run_state: Dict[str, Any]): + df = build_review_df(run_state) + return df + + # Populate table when tab loads (user can click “Save edits” later) + # Gradio doesn't have tab-load event reliably across versions; use a buttonless trick: + refresh_review_btn = gr.Button("Refresh from latest run", variant="secondary") + refresh_review_btn.click(load_review_table, inputs=[run_state], outputs=[review_df]) + + save_review_btn.click( + fn=apply_review_edits, + inputs=[review_df, run_state], + outputs=[run_state, review_status], + ) - with admin_vocab_group: - gr.Markdown("### Controlled vocabulary (lists only)") - vocab_category = gr.Dropdown(label="Category (lists only)", choices=[], value=None) - vocab_search = gr.Textbox(label="Search terms", placeholder="Type to filter (e.g., 471, AMES, comet)", lines=1) + export_btn.click( + fn=export_reviewed_csv, + inputs=[run_state], + outputs=[reviewed_csv, review_status], + ) - with gr.Row(): - vocab_term_add = gr.Textbox(label="Add term", placeholder="type term and click Add") - vocab_add_btn = gr.Button("Add") - with gr.Row(): - vocab_term_remove = gr.Textbox(label="Remove term", placeholder="type exact term and click Remove") - vocab_remove_btn = gr.Button("Remove") - vocab_apply_btn = gr.Button("Apply full list to category") - vocab_reset_btn = gr.Button("Reset vocab to defaults") + # ============================= + # TAB 3 — Literature Search (module) + # ============================= + with gr.Tab("Literature Search"): + if build_literature_explorer_tab is None: + gr.Markdown("⚠️ literature_explorer.py not found. Add it to enable this tab.") + else: + build_literature_explorer_tab() - vocab_terms_df = gr.Dataframe(headers=["term"], label="Terms (full list; edit directly)", interactive=True, wrap=True) - vocab_terms_filtered = gr.Dataframe(headers=["term"], label="Filtered preview (read-only)", interactive=False, wrap=True) - vocab_status = gr.Textbox(label="Vocab status", interactive=False) + # ============================= + # TAB 4 — Admin (locked) + # ============================= + with gr.Tab("Admin"): + gr.Markdown("### Admin (locked)") + gr.Markdown("
Admin controls: controlled vocabulary & extraction field spec.
") - with gr.Accordion("Raw vocab JSON (auto-generated)", open=False): - vocab_json_admin = gr.Textbox(label="Controlled vocab JSON", lines=12, interactive=False) + admin_pw = gr.Textbox(label="Admin password", type="password") + unlock_btn = gr.Button("Unlock admin", variant="primary") + admin_msg = gr.Textbox(label="Admin status", interactive=False) - with admin_fields_group: - gr.Markdown("### Custom columns (Field Builder)") - gr.Markdown("Tip: Use endpoint selection to start, then tweak fields.") + admin_panel = gr.Column(visible=False) + admin_panel2 = gr.Column(visible=False) - with gr.Row(): - admin_apply_endpoints_btn = gr.Button("Load selected endpoints into builder (Replace)", variant="secondary") - fields_apply_btn = gr.Button("Apply builder table") + with admin_panel: + gr.Markdown("#### Controlled Vocabulary (JSON)") + vocab_json = gr.Code(label="Controlled vocab JSON", language="json") + gr.Markdown("#### Field Spec (JSON)") + spec_json = gr.Code(label="Extraction field spec JSON", language="json") with gr.Row(): - field_name_in = gr.Textbox(label="Field name", placeholder="e.g., genotoxicity_result") - field_type_in = gr.Dropdown(label="Type", choices=TYPE_CHOICES, value="str") - - enum_values_in = gr.Textbox(label="Enum values (comma-separated; for enum/list[enum])", placeholder="a,b,c", lines=2) - instructions_in = gr.Textbox(label="Instructions", placeholder="Tell the extractor exactly what to pull.", lines=2) + save_admin_btn = gr.Button("Save admin config", variant="secondary") + reset_admin_btn = gr.Button("Reset to defaults", variant="secondary") + admin_save_status = gr.Textbox(label="Config status", interactive=False) - add_update_field_btn = gr.Button("Add/Update field") + with admin_panel2: + gr.Markdown("
Tip: keep this tab for admins only; normal users should not edit schemas.
") - fields_df = gr.Dataframe( - label="Fields (edit and click Apply)", - headers=["field","type","enum_values","instructions"], - interactive=True, - wrap=True - ) - - fields_status = gr.Textbox(label="Field builder status", interactive=False) - - # --- Wiring --- - admin_mode.change( - fn=set_admin_visibility, - inputs=[admin_mode], - outputs=[admin_group, admin_vocab_group, admin_fields_group] - ) - - endpoint_preset.change( - fn=apply_endpoint_preset, - inputs=[endpoint_preset], - outputs=[endpoints] - ) - - endpoints.change( - fn=sync_fields_from_endpoints, - inputs=[endpoints, admin_mode, field_rows_state, field_spec], - outputs=[field_rows_state, fields_df, field_spec, status] - ) - - extract_btn.click( - fn=run_extraction, - inputs=[files, api_key, model, endpoints, field_spec, vocab_json, max_pages, chunk_chars, max_context_chars, admin_mode], - outputs=[summary_card, overview_df, out_csv, out_json, status, record_pick, state_records, state_details, vertical_view, evidence_md] - ) - - record_pick.change( - fn=on_pick, - inputs=[record_pick, state_records, state_details], - outputs=[summary_card, vertical_view, evidence_md] - ) - - review_mode.change(fn=toggle_review_mode, inputs=[review_mode], outputs=[vertical_view]) - - save_btn.click( - fn=save_review_changes, - inputs=[record_pick, vertical_view, state_records], - outputs=[overview_df, state_records, review_status, summary_card] - ) - - export_btn.click( - fn=export_reviewed_csv, - inputs=[state_records], - outputs=[reviewed_csv, review_status] - ) - - # Admin vocab wiring - vocab_search.change(fn=vocab_filter_preview, inputs=[vocab_terms_df, vocab_search], outputs=[vocab_terms_filtered]) - - vocab_category.change( - fn=vocab_load_category, - inputs=[vocab_state, vocab_category, vocab_search], - outputs=[vocab_terms_df, vocab_terms_filtered, vocab_status] - ) - - vocab_add_btn.click( - fn=vocab_add_term, - inputs=[vocab_state, vocab_category, vocab_term_add, vocab_search], - outputs=[vocab_terms_df, vocab_terms_filtered, vocab_term_add, vocab_status] - ) - - vocab_remove_btn.click( - fn=vocab_remove_term, - inputs=[vocab_state, vocab_category, vocab_term_remove, vocab_search], - outputs=[vocab_terms_df, vocab_terms_filtered, vocab_term_remove, vocab_status] - ) - - vocab_apply_btn.click( - fn=vocab_apply_df, - inputs=[vocab_state, vocab_category, vocab_terms_df, vocab_search], - outputs=[vocab_json_admin, vocab_terms_filtered, vocab_status] - ).then( - fn=lambda x: x, - inputs=[vocab_json_admin], - outputs=[vocab_json] - ) - - vocab_reset_btn.click( - fn=vocab_reset_defaults_ui, - inputs=None, - outputs=[vocab_state, vocab_category, vocab_terms_df, vocab_terms_filtered, vocab_json_admin, vocab_status, vocab_json] - ) - - # Admin field builder wiring - admin_apply_endpoints_btn.click( - fn=admin_apply_endpoints, - inputs=[endpoints], - outputs=[field_rows_state, fields_df, field_spec, fields_status] - ) - - add_update_field_btn.click( - fn=fields_add_or_update, - inputs=[field_name_in, field_type_in, enum_values_in, instructions_in, field_rows_state], - outputs=[field_rows_state, fields_df, field_spec, fields_status] - ) - - fields_apply_btn.click( - fn=fields_apply_df, - inputs=[field_rows_state, fields_df], - outputs=[field_rows_state, fields_df, field_spec, fields_status] - ) - - # Init - def _init_all(): - vocab, keys, k0, full_df, filtered_df, vjson, vmsg = vocab_init_state(DEFAULT_CONTROLLED_VOCAB_JSON) + unlock_btn.click( + fn=unlock_admin, + inputs=[admin_pw], + outputs=[admin_unlocked, admin_msg, admin_panel, admin_panel2], + ) - default_endpoints = ENDPOINT_PRESETS["Required – Safety Assessor"] - rows, _, _ = build_rows_from_endpoints(default_endpoints) - fdf = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"]) - fspec = build_spec_from_field_rows(rows) + # initialize JSON editors from state + def init_admin_editors(vocab_state: str, spec_state: str): + return vocab_state, spec_state - return ( - vocab, - gr.update(choices=keys, value=k0), - full_df, - filtered_df, - vjson, - vmsg, - vjson, - rows, - fdf, - fspec, - "✅ Ready." + gr.Button("Load current config", variant="secondary").click( + fn=init_admin_editors, + inputs=[vocab_json_state, spec_json_state], + outputs=[vocab_json, spec_json], ) - demo.load( - _init_all, - inputs=None, - outputs=[ - vocab_state, - vocab_category, - vocab_terms_df, - vocab_terms_filtered, - vocab_json_admin, - vocab_status, - vocab_json, - field_rows_state, - fields_df, - field_spec, - status - ] - ) + def save_admin_config(vocab_text: str, spec_text: str, is_admin: bool): + if not is_admin: + return gr.update(), gr.update(), "❌ Admin locked." + # validate parse + try: + json.loads(vocab_text or "{}") + json.loads(spec_text or "[]") + except Exception as e: + return gr.update(), gr.update(), f"⚠️ JSON invalid: {e}" + return vocab_text, spec_text, "✅ Saved (used for next runs)." + + save_admin_btn.click( + fn=save_admin_config, + inputs=[vocab_json, spec_json, admin_unlocked], + outputs=[vocab_json_state, spec_json_state, admin_save_status], + ) - with gr.Tab("Literature Explorer"): - build_literature_explorer_tab() + reset_admin_btn.click( + fn=reset_admin_defaults, + inputs=[], + outputs=[vocab_json, spec_json, admin_save_status], + ) - with gr.Tab("Cross-paper Synthesis"): - gr.Markdown("Upload `extraction_details.json` from Extract tab. Synthesis is based strictly on grounded extractions.") - api_key2 = gr.Textbox(label="OpenAI API key (optional if set as OPENAI_API_KEY secret)", type="password") - model2 = gr.Dropdown(label="Model", choices=["gpt-4o-2024-08-06", "gpt-4o", "gpt-4o-mini"], value="gpt-4o-2024-08-06") - extraction_json_file = gr.File(label="Upload extraction_details.json", file_types=[".json"], file_count="single") - synth_btn = gr.Button("Synthesize Across Papers") - synth_md = gr.Markdown() - synth_btn.click(fn=run_synthesis, inputs=[api_key2, model2, extraction_json_file], outputs=[synth_md]) +# Run if __name__ == "__main__": - port = int(os.environ.get("PORT", "7860")) - demo.queue().launch(server_name="0.0.0.0", server_port=port) \ No newline at end of file + demo.launch() \ No newline at end of file