import os import re import json import tempfile from pathlib import Path from typing import Dict, List, Tuple, Any, Optional import gradio as gr import numpy as np import pandas as pd from pypdf import PdfReader try: from sklearn.feature_extraction.text import TfidfVectorizer except Exception: # pragma: no cover - fallback path for minimal runtime TfidfVectorizer = None from openai import OpenAI from literature_explorer import build_literature_explorer_tab from toxra_core.artifacts import make_run_dir, write_dataframe_csv, write_json, write_markdown from toxra_core.calculation_client import MCPClientError, run_batch_cancer_risk from toxra_core.contracts import CANCER_RISK_TEMPLATE_COLUMNS from toxra_core.nlp_pipeline import extract_evidence_span, expand_regulatory_queries, hybrid_rank_text_items from toxra_core.regulatory_mapper import map_extraction_to_framework # ============================= # UI theme # ============================= APP_CSS = """ @import url('https://fonts.googleapis.com/css2?family=IBM+Plex+Sans:wght@400;500;600;700&display=swap'); :root { --bg: #f5f7fb; --panel: #ffffff; --ink: #0f172a; --muted: #516079; --line: #e2e8f0; --accent: #2563eb; --accent-2: #0ea5e9; --accent-soft: #e6efff; --shadow: 0 10px 28px rgba(15, 23, 42, 0.08); --radius: 14px; } .gradio-container { background: var(--bg); color: var(--ink); font-family: "IBM Plex Sans", ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, "Helvetica Neue", Arial, "Noto Sans", "Apple Color Emoji", "Segoe UI Emoji"; } .hero { background: linear-gradient(180deg, #edf3ff 0%, #f4f8ff 100%); color: var(--ink); border-radius: 16px; padding: 18px 22px; box-shadow: var(--shadow); border: 1px solid #dbe5f4; display: flex; align-items: center; justify-content: space-between; gap: 16px; flex-wrap: wrap; } .hero-left { min-width: 240px; } .hero-right { margin-left: auto; } .hero-title { font-size: 22px; font-weight: 700; letter-spacing: 0.08em; } .hero-sub { margin-top: 4px; font-size: 13px; color: #3b4b63; } .hero-pills { margin-top: 10px; display: flex; gap: 8px; flex-wrap: wrap; } .hero-pill { background: var(--accent-soft); color: #1e3a8a; border: 1px solid #d6e3f6; border-radius: 999px; padding: 4px 10px; font-size: 11px; font-weight: 600; } .hero-status { background: #ffffff; color: #334155; border: 1px solid #d9e2ef; border-radius: 999px; padding: 6px 12px; font-size: 12px; font-weight: 600; box-shadow: 0 6px 16px rgba(15, 23, 42, 0.06); } .split-row { gap: 18px; } .card { background: var(--panel); border: 1px solid var(--line); border-radius: var(--radius); padding: 16px; box-shadow: var(--shadow); } .left-rail .card + .card { margin-top: 16px; } .right-panel .card { margin-bottom: 14px; } .section-title { font-size: 12px; text-transform: uppercase; letter-spacing: 0.14em; color: var(--muted); margin-bottom: 8px; } .gradio-container input, .gradio-container textarea, .gradio-container select { border-radius: 10px !important; border-color: var(--line) !important; } .gradio-container button.primary { background: var(--accent) !important; border-color: var(--accent) !important; } .gradio-container button.primary:hover { background: #1d4ed8 !important; } .gradio-container .tab-nav { gap: 8px; } .gradio-container .tab-nav button { background: var(--panel); border: 1px solid var(--line); border-radius: 999px; padding: 6px 14px; font-size: 12px; color: var(--muted); } .gradio-container .tab-nav button.selected { background: var(--accent); border-color: var(--accent); color: #ffffff; } .gradio-container .accordion { border: 1px solid var(--line); border-radius: var(--radius); } """ # ============================= # Defaults # ============================= DEFAULT_CONTROLLED_VOCAB_JSON = """{ "risk_stance_enum": ["acceptable","acceptable_with_uncertainty","not_acceptable","insufficient_data"], "fda_ctp_tier_enum": ["Tier_1_high_priority","Tier_2_moderate_priority","Tier_3_lower_priority","enough data is not available"], "approach_enum": ["in_vivo","in_vitro","in_silico","nams","mixed","not_reported"], "in_silico_method_enum": [ "qsar","read_across","molecular_docking","molecular_dynamics","pbpk_pbtK","aop_based","ml_model","other","not_reported" ], "nams_method_enum": [ "high_throughput_screening_hts","omics_transcriptomics","omics_proteomics","omics_metabolomics", "organ_on_chip","microphysiological_system_mps","3d_tissue_model","in_chemico_assay", "in_silico_as_nams","other","not_reported" ], "exposure_route_enum": ["oral","inhalation","dermal","parenteral","multiple","not_reported"], "species_enum": ["human","rat","mouse","rabbit","dog","non_human_primate","cell_line","other","not_reported"], "genotoxicity_oecd_tg_in_vitro_enum": [ "OECD_TG_471_Bacterial Reverse mutation test(AMES test)", "OECD_TG_473_In Vitro Mammalian Chromosomal Aberration Test", "OECD_TG_476_In Vitro Mammalian Cell Gene Mutation Tests (Hprt & xprt)", "OECD_TG_487_In Vitro Mammalian Cell Micronucleus Test", "OECD_TG_490_In Vitro Mammalian Cell Gene Mutation Tests (Thymidine Kinase)", "not_reported" ], "genotoxicity_oecd_tg_in_vivo_enum": [ "OECD_TG_474_In Vivo Mammalian Erythrocyte Micronucleus Test", "OECD_TG_475_Mammalian Bone Marrow Chromosomal Aberration Test", "OECD_TG_488_Transgenic Rodent Somatic & Germ Cell Gene Mutation Assays", "OECD_TG_489_In Vivo Mammalian Alkaline Comet Assay", "not_reported" ], "genotoxicity_result_enum": ["positive","negative","equivocal","not_reported"], "binary_result_enum": ["positive","negative","equivocal","not_reported"], "carcinogenicity_result_enum": ["carcinogenic","not_carcinogenic","insufficient_data","not_reported"] }""" # ============================= # Endpoint modules (what users choose) # ============================= PRESET_CORE = [ {"field": "chemicals", "type": "list[str]", "enum_values": "", "instructions": "List chemical(s) studied. If multiple, include each separately."}, {"field": "cas_numbers", "type": "list[str]", "enum_values": "", "instructions": "Extract CAS number(s) mentioned (may be multiple)."}, {"field": "study_type", "type": "enum", "enum_values": "in_vivo,in_vitro,epidemiology,in_silico,review,methodology,other,not_reported", "instructions": "Choose best match."}, {"field": "exposure_route", "type": "enum", "enum_values": "oral,inhalation,dermal,parenteral,multiple,not_reported", "instructions": "Choose best match."}, {"field": "species", "type": "enum", "enum_values": "human,rat,mouse,rabbit,dog,non_human_primate,cell_line,other,not_reported", "instructions": "Choose best match."}, {"field": "dose_metrics", "type": "list[str]", "enum_values": "", "instructions": "Capture NOAEL/LOAEL/BMD/BMDL/LD50/LC50 etc with units and route if available."}, {"field": "key_findings", "type": "str", "enum_values": "", "instructions": "2–4 short sentences summarizing major findings. Grounded to text."}, {"field": "conclusion", "type": "str", "enum_values": "", "instructions": "Paper's conclusion about safety/risk (grounded)."}, ] PRESET_NAMS_INSILICO = [ {"field": "approach", "type": "enum", "enum_values": "in_vivo,in_vitro,in_silico,nams,mixed,not_reported", "instructions": "Identify if results are in silico or NAMs; use mixed if multiple."}, {"field": "in_silico_methods", "type": "list[enum]", "enum_values": "qsar,read_across,molecular_docking,molecular_dynamics,pbpk_pbtK,aop_based,ml_model,other,not_reported", "instructions": "If in_silico, list methods used (multiple allowed)."}, {"field": "nams_methods", "type": "list[enum]", "enum_values": "high_throughput_screening_hts,omics_transcriptomics,omics_proteomics,omics_metabolomics,organ_on_chip,microphysiological_system_mps,3d_tissue_model,in_chemico_assay,in_silico_as_nams,other,not_reported", "instructions": "If NAMs, list methods used (multiple allowed)."}, {"field": "nams_or_insilico_key_results", "type": "str", "enum_values": "", "instructions": "Summarize in silico / NAMs results and key metrics (grounded)."}, ] PRESET_GENOTOX_OECD = [ { "field": "genotox_oecd_tg_in_vitro", "type": "list[enum]", "enum_values": "OECD_TG_471_Bacterial Reverse mutation test(AMES test),OECD_TG_473_In Vitro Mammalian Chromosomal Aberration Test,OECD_TG_476_In Vitro Mammalian Cell Gene Mutation Tests (Hprt & xprt),OECD_TG_487_In Vitro Mammalian Cell Micronucleus Test,OECD_TG_490_In Vitro Mammalian Cell Gene Mutation Tests (Thymidine Kinase),not_reported", "instructions": "Select all in vitro OECD TGs explicitly reported (or clearly described). If none, use not_reported." }, { "field": "genotox_oecd_tg_in_vivo", "type": "list[enum]", "enum_values": "OECD_TG_474_In Vivo Mammalian Erythrocyte Micronucleus Test,OECD_TG_475_Mammalian Bone Marrow Chromosomal Aberration Test,OECD_TG_488_Transgenic Rodent Somatic & Germ Cell Gene Mutation Assays,OECD_TG_489_In Vivo Mammalian Alkaline Comet Assay,not_reported", "instructions": "Select all in vivo OECD TGs explicitly reported (or clearly described). If none, use not_reported." }, { "field": "fda_ctp_carcinogenicity_tier", "type": "enum", "enum_values": "Tier_1_high_priority,Tier_2_moderate_priority,Tier_3_lower_priority,enough data is not available", "instructions": "Assign FDA CTP carcinogenicity/genotoxicity tier based strictly on provided evidence. If decision cannot be made from excerpts, use exactly: enough data is not available." }, {"field": "genotoxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Classify overall genotoxicity outcome as reported. If unclear, not_reported."}, {"field": "genotoxicity_result_notes", "type": "str", "enum_values": "", "instructions": "Short explanation grounded to text + test context (e.g., AMES, micronucleus)."}, ] PRESET_ACUTE_TOX = [ {"field": "acute_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "If acute toxicity is assessed, classify as positive/negative/equivocal; otherwise not_reported."}, {"field": "acute_toxicity_key_metrics", "type": "list[str]", "enum_values": "", "instructions": "Extract LD50/LC50/EC50/IC50 etc with units/route/species if available."}, {"field": "acute_toxicity_notes", "type": "str", "enum_values": "", "instructions": "Grounded summary of acute toxicity findings."}, ] PRESET_REPEATED_DOSE = [ {"field": "repeated_dose_noael_loael", "type": "list[str]", "enum_values": "", "instructions": "Extract NOAEL/LOAEL (and study duration) with units/route if available."}, {"field": "repeated_dose_target_organs", "type": "list[str]", "enum_values": "", "instructions": "List target organs/critical effects explicitly reported."}, {"field": "repeated_dose_notes", "type": "str", "enum_values": "", "instructions": "Grounded summary of repeated-dose toxicity conclusions."}, ] PRESET_IRR_SENS = [ {"field": "skin_irritation_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Skin irritation outcome (as reported)."}, {"field": "eye_irritation_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Eye irritation outcome (as reported)."}, {"field": "skin_sensitization_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Skin sensitization outcome (as reported)."}, {"field": "irritation_sensitization_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including method/model if stated."}, ] PRESET_REPRO_DEV = [ {"field": "reproductive_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Reproductive toxicity outcome (as reported)."}, {"field": "developmental_toxicity_result", "type": "enum", "enum_values": "positive,negative,equivocal,not_reported", "instructions": "Developmental toxicity outcome (as reported)."}, {"field": "repro_dev_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including endpoints and study design if stated."}, ] PRESET_CARCINOGENICITY = [ {"field": "carcinogenicity_result", "type": "enum", "enum_values": "carcinogenic,not_carcinogenic,insufficient_data,not_reported", "instructions": "As reported. If evidence insufficient, insufficient_data."}, {"field": "carcinogenicity_notes", "type": "str", "enum_values": "", "instructions": "Grounded notes including species, duration, tumor findings if stated."}, ] ENDPOINT_MODULES: Dict[str, List[Dict[str, Any]]] = { "Genotoxicity (OECD TG)": PRESET_GENOTOX_OECD, "NAMs / In Silico": PRESET_NAMS_INSILICO, "Acute toxicity": PRESET_ACUTE_TOX, "Repeated dose toxicity": PRESET_REPEATED_DOSE, "Irritation / Sensitization": PRESET_IRR_SENS, "Repro / Developmental": PRESET_REPRO_DEV, "Carcinogenicity": PRESET_CARCINOGENICITY, } # Endpoint presets (requested) ENDPOINT_PRESETS: Dict[str, List[str]] = { "Required – Safety Assessor": [ "Genotoxicity (OECD TG)", "Repeated dose toxicity", "Irritation / Sensitization", "Repro / Developmental", "Acute toxicity", ], "Core only (fast)": [], "Screening – NAMs + Genotox": ["NAMs / In Silico", "Genotoxicity (OECD TG)"], "Full – All endpoints": list(ENDPOINT_MODULES.keys()), } ENDPOINT_QUERY_HINTS: Dict[str, List[str]] = { "Genotoxicity (OECD TG)": ["genotoxicity", "mutagenicity", "AMES", "micronucleus", "comet assay", "chromosomal aberration", "OECD TG 471 473 476 487 490 474 489", "carcinogenicity tiering", "FDA CTP tier"], "NAMs / In Silico": ["in silico", "QSAR", "read-across", "AOP", "PBPK", "high-throughput", "omics", "organ-on-chip", "microphysiological"], "Acute toxicity": ["acute toxicity", "LD50", "LC50", "single dose", "lethality", "mortality"], "Repeated dose toxicity": ["repeated dose", "subchronic", "chronic", "NOAEL", "LOAEL", "target organ", "90-day", "28-day"], "Irritation / Sensitization": ["skin irritation", "eye irritation", "sensitization", "LLNA", "Draize"], "Repro / Developmental": ["reproductive toxicity", "fertility", "developmental toxicity", "teratogenic", "prenatal", "postnatal"], "Carcinogenicity": ["carcinogenicity", "tumor", "neoplasm", "cancer", "two-year bioassay"], } # ============================= # PDF extraction (text-based PDFs only) # ============================= def extract_pages_from_pdf(pdf_path: str, max_pages: int = 0) -> Tuple[List[Tuple[int, str]], int]: reader = PdfReader(pdf_path) page_count = len(reader.pages) pages_to_read = page_count if (max_pages is None or max_pages <= 0) else min(page_count, int(max_pages)) pages: List[Tuple[int, str]] = [] for i in range(pages_to_read): try: t = reader.pages[i].extract_text() or "" except Exception: t = "" pages.append((i + 1, t or "")) return pages, page_count def clean_text(t: str) -> str: t = t or "" t = t.replace("\x00", " ") t = re.sub(r"\s+", " ", t).strip() return t def chunk_pages(pages: List[Tuple[int, str]], target_chars: int = 3000) -> List[Dict[str, Any]]: chunks = [] buf = [] start_page = None cur_len = 0 for pno, txt in pages: txt = clean_text(txt) if not txt: continue if start_page is None: start_page = pno if cur_len + len(txt) + 1 > target_chars and buf: end_page = pno - 1 end_page = end_page if end_page >= start_page else start_page chunks.append({"pages": f"{start_page}-{end_page}", "text": " ".join(buf)}) buf = [txt] start_page = pno cur_len = len(txt) else: buf.append(txt) cur_len += len(txt) + 1 if buf and start_page is not None: end_page = pages[-1][0] if pages else start_page chunks.append({"pages": f"{start_page}-{end_page}", "text": " ".join(buf)}) return chunks def _text_based_pdf_warning(pages: List[Tuple[int, str]]) -> bool: joined = " ".join([clean_text(t) for _, t in pages if clean_text(t)]) return len(joined.strip()) < 200 # ============================= # Lightweight retrieval (TF-IDF) # ============================= def select_relevant_chunks( chunks: List[Dict[str, Any]], queries: List[str], top_per_query: int = 2, max_chunks: int = 12 ) -> List[Dict[str, Any]]: texts = [c["text"] for c in chunks] if not texts: return [] if TfidfVectorizer is None: selected_idx: List[int] = [] for q in queries: q_tokens = set([w for w in re.findall(r"[a-zA-Z0-9\\-]+", (q or "").lower()) if len(w) >= 3]) scored = [] for i, t in enumerate(texts): tl = t.lower() scored.append((sum(1 for tok in q_tokens if tok in tl), i)) scored.sort(key=lambda x: x[0], reverse=True) for _, i in scored[:top_per_query]: if i not in selected_idx: selected_idx.append(i) if not selected_idx: selected_idx = list(range(min(len(chunks), max_chunks))) return [chunks[i] for i in selected_idx[:max_chunks]] vectorizer = TfidfVectorizer(stop_words="english", ngram_range=(1, 2), max_features=20000) X = vectorizer.fit_transform(texts) selected_idx: List[int] = [] for q in queries: q = (q or "").strip() if not q: continue qv = vectorizer.transform([q]) sims = (X @ qv.T).toarray().ravel() idx = np.argsort(sims)[::-1] for i in idx[:top_per_query]: if i not in selected_idx: selected_idx.append(i) if not selected_idx: selected_idx = list(range(min(len(chunks), max_chunks))) return [chunks[i] for i in selected_idx[:max_chunks]] def build_context(selected_chunks: List[Dict[str, Any]], max_chars: int = 20000) -> str: parts = [] total = 0 for c in selected_chunks: block = f"[pages {c['pages']}]\n{c['text']}\n" if total + len(block) > max_chars: break parts.append(block) total += len(block) return "\n".join(parts).strip() # ============================= # Spec -> JSON schema # ============================= def slugify_field(name: str) -> str: name = (name or "").strip() name = re.sub(r"[^\w\s-]", "", name) name = re.sub(r"[\s-]+", "_", name).lower() return name[:80] if name else "field" def parse_field_spec(spec: str) -> Tuple[Dict[str, Any], Dict[str, str]]: props: Dict[str, Any] = {} instr: Dict[str, str] = {} for raw_line in (spec or "").splitlines(): line = raw_line.strip() if not line or line.startswith("#"): continue parts = [p.strip() for p in line.split("|")] if len(parts) < 2: continue field_name = parts[0] ftype = parts[1] finstr = parts[2] if len(parts) >= 3 else "" key = slugify_field(field_name) instr[key] = finstr schema: Dict[str, Any] = {"type": "string"} if ftype == "str": schema = {"type": "string"} elif ftype == "num": schema = {"type": "number"} elif ftype == "bool": schema = {"type": "boolean"} elif ftype.startswith("list[enum[") and ftype.endswith("]]"): inside = ftype[len("list[enum["):-2].strip() vals = [v.strip() for v in inside.split(",") if v.strip()] schema = {"type": "array", "items": {"type": "string", "enum": vals}} elif ftype.startswith("list[str]"): schema = {"type": "array", "items": {"type": "string"}} elif ftype.startswith("list[num]"): schema = {"type": "array", "items": {"type": "number"}} elif ftype.startswith("enum[") and ftype.endswith("]"): inside = ftype[len("enum["):-1].strip() vals = [v.strip() for v in inside.split(",") if v.strip()] schema = {"type": "string", "enum": vals} else: schema = {"type": "string"} props[key] = schema return props, instr def build_extraction_schema(field_props: Dict[str, Any], vocab: Dict[str, Any]) -> Dict[str, Any]: risk_enum = vocab.get("risk_stance_enum", ["acceptable","acceptable_with_uncertainty","not_acceptable","insufficient_data"]) all_field_keys = list(field_props.keys()) return { "type": "object", "additionalProperties": False, "properties": { "paper_title": {"type": "string"}, "risk_stance": {"type": "string", "enum": risk_enum}, "risk_confidence": {"type": "number", "minimum": 0, "maximum": 1}, "risk_summary": {"type": "string"}, "extracted": { "type": "object", "additionalProperties": False, "properties": field_props, "required": all_field_keys }, "evidence": { "type": "array", "items": { "type": "object", "additionalProperties": False, "properties": { "field": {"type": "string"}, "quote": {"type": "string"}, "pages": {"type": "string"} }, "required": ["field", "quote", "pages"] } } }, "required": ["paper_title","risk_stance","risk_confidence","risk_summary","extracted","evidence"] } # ============================= # OpenAI client + extraction # ============================= def get_openai_client(api_key: str) -> OpenAI: key = (api_key or "").strip() or os.getenv("OPENAI_API_KEY", "").strip() if not key: raise ValueError("Missing OpenAI API key. Provide it in the UI or set OPENAI_API_KEY secret in Hugging Face.") return OpenAI(api_key=key) def openai_structured_extract( client: OpenAI, model: str, schema: Dict[str, Any], controlled_vocab: Dict[str, Any], field_instructions: Dict[str, str], context: str ) -> Dict[str, Any]: field_instr_lines = [f"- {k}: {v if v else '(no extra instructions)'}" for k, v in field_instructions.items()] vocab_text = json.dumps(controlled_vocab, indent=2) has_fda_tier_field = "fda_ctp_carcinogenicity_tier" in field_instructions system_msg = ( "You are a toxicology research paper data-extraction assistant for an industry safety assessor.\n" "Grounding rules (must follow):\n" "1) Use ONLY the provided excerpts; do NOT invent details.\n" "2) If a value is not explicitly stated, output empty string or empty list, OR the enum value 'not_reported'/'insufficient_data' when applicable.\n" "3) Provide evidence quotes + page ranges for extracted fields.\n" "4) risk_stance is regulatory: acceptable / acceptable_with_uncertainty / not_acceptable / insufficient_data.\n" "5) Prefer controlled vocab terms when applicable.\n" "6) Use an INTERNAL Tree-of-Thought process before finalizing JSON:\n" " - Branch evidence by endpoint/theme.\n" " - Test competing interpretations.\n" " - Prune branches that are not directly supported by excerpts.\n" " - Select the most evidence-grounded branch only.\n" " - Do NOT output reasoning traces; output JSON only.\n" "7) If the FDA CTP tier field is requested but evidence is insufficient, output exactly: 'enough data is not available'.\n" ) user_msg = ( "CONTROLLED VOCAB (JSON):\n" f"{vocab_text}\n\n" "TREE-OF-THOUGHT EXECUTION FRAMEWORK (internal only, do not output):\n" "A) Build evidence map: claims -> quotes -> page ranges.\n" "B) Generate candidate interpretations per endpoint.\n" "C) Eliminate candidates lacking direct quote support.\n" "D) Select final grounded interpretation and populate schema fields.\n" "E) For uncertain fields, use explicit fallback values from enum/instructions.\n\n" "FIELD INSTRUCTIONS:\n" + "\n".join(field_instr_lines) + "\n\n" "EXCERPTS (with page ranges):\n" f"{context}\n\n" + ( "IMPORTANT: `fda_ctp_carcinogenicity_tier` must be one of " "[Tier_1_high_priority, Tier_2_moderate_priority, Tier_3_lower_priority, enough data is not available].\n" if has_fda_tier_field else "" ) ) resp = client.responses.create( model=model, input=[ {"role": "system", "content": system_msg}, {"role": "user", "content": user_msg} ], text={ "format": { "type": "json_schema", "name": "tox_extraction", "schema": schema, "strict": True } } ) return json.loads(resp.output_text) def openai_synthesize_across_papers(client: OpenAI, model: str, rows: List[Dict[str, Any]]) -> str: system_msg = ( "You are a senior toxicology safety assessor summarizing multiple papers.\n" "Create a concise synthesis: consensus, disagreements, data gaps, and actionable next steps.\n" "Base strictly on the provided extracted JSON (which is evidence-backed).\n" ) user_msg = "EXTRACTED_ROWS_JSON:\n" + json.dumps(rows, indent=2) resp = client.responses.create(model=model, input=[{"role":"system","content":system_msg},{"role":"user","content":user_msg}]) return resp.output_text # ============================= # Controlled vocab editor helpers (lists only) + search filter # ============================= def _filter_terms_df(df: pd.DataFrame, query: str) -> pd.DataFrame: if df is None or df.empty: return pd.DataFrame(columns=["term"]) q = (query or "").strip().lower() if not q: return df[["term"]].copy() mask = df["term"].astype(str).str.lower().str.contains(q, na=False) return df.loc[mask, ["term"]].copy() def vocab_init_state(vocab_json: str): try: vocab = json.loads(vocab_json or DEFAULT_CONTROLLED_VOCAB_JSON) except Exception: vocab = json.loads(DEFAULT_CONTROLLED_VOCAB_JSON) list_keys = sorted([k for k, v in vocab.items() if isinstance(v, list)]) default_key = list_keys[0] if list_keys else None terms = vocab.get(default_key, []) if default_key else [] full_df = pd.DataFrame({"term": terms}) filtered_df = _filter_terms_df(full_df, "") return vocab, list_keys, default_key, full_df, filtered_df, json.dumps(vocab, indent=2), "✅ Vocab loaded." def vocab_reset_defaults_ui(): vocab, keys, k0, full_df, filtered_df, vjson, msg = vocab_init_state(DEFAULT_CONTROLLED_VOCAB_JSON) return vocab, gr.update(choices=keys, value=k0), full_df, filtered_df, vjson, msg, vjson def vocab_load_category(vocab_state: Dict[str, Any], category: str, search: str): if not category or category not in vocab_state: empty = pd.DataFrame(columns=["term"]) return empty, empty, "Select a category." terms = vocab_state.get(category, []) if not isinstance(terms, list): empty = pd.DataFrame(columns=["term"]) return empty, empty, "This category is not a list." full = pd.DataFrame({"term": terms}) filtered = _filter_terms_df(full, search) return full, filtered, f"Editing: {category}" def vocab_add_term(vocab_state: Dict[str, Any], category: str, term: str, search: str): term = (term or "").strip() if not term: return gr.update(), gr.update(), "", "Enter a term to add." if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list): return gr.update(), gr.update(), "", "Pick a list category first." if term not in vocab_state[category]: vocab_state[category].append(term) full = pd.DataFrame({"term": vocab_state[category]}) filtered = _filter_terms_df(full, search) return full, filtered, "", f"Added: {term}" def vocab_remove_term(vocab_state: Dict[str, Any], category: str, term: str, search: str): term = (term or "").strip() if not term: return gr.update(), gr.update(), "", "Enter a term to remove." if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list): return gr.update(), gr.update(), "", "Pick a list category first." vocab_state[category] = [t for t in vocab_state[category] if t != term] full = pd.DataFrame({"term": vocab_state[category]}) filtered = _filter_terms_df(full, search) return full, filtered, "", f"Removed: {term}" def vocab_apply_df(vocab_state: Dict[str, Any], category: str, terms_df: Any, search: str): if not category or category not in vocab_state or not isinstance(vocab_state.get(category), list): return json.dumps(vocab_state, indent=2), pd.DataFrame(columns=["term"]), "Pick a list category first." try: df = terms_df if isinstance(terms_df, pd.DataFrame) else pd.DataFrame(terms_df, columns=["term"]) except Exception: return json.dumps(vocab_state, indent=2), pd.DataFrame(columns=["term"]), "Could not parse terms table." terms = [] for t in df.get("term", []).tolist(): t = (str(t) if t is not None else "").strip() if t and t not in terms: terms.append(t) vocab_state[category] = terms vjson = json.dumps(vocab_state, indent=2) filtered = _filter_terms_df(pd.DataFrame({"term": terms}), search) return vjson, filtered, f"✅ Applied {len(terms)} terms to {category}." def vocab_filter_preview(terms_df, search): try: df = terms_df if isinstance(terms_df, pd.DataFrame) else pd.DataFrame(terms_df, columns=["term"]) except Exception: df = pd.DataFrame(columns=["term"]) return _filter_terms_df(df, search) # ============================= # Field mapping from endpoints # ============================= TYPE_CHOICES = ["str", "num", "bool", "list[str]", "list[num]", "enum", "list[enum]"] def build_spec_from_field_rows(rows: List[Dict[str, Any]]) -> str: lines = [ "# One field per line: Field Name | type | instructions", "# types: str, num, bool, list[str], list[num], enum[a,b,c], list[enum[a,b,c]]", "" ] for r in rows: field = str(r.get("field","")).strip() ftype = str(r.get("type","")).strip() enums = str(r.get("enum_values","")).strip() instr = str(r.get("instructions","")).strip() if not field or not ftype: continue if ftype == "enum": vals = [v.strip() for v in enums.split(",") if v.strip()] type_str = f"enum[{','.join(vals)}]" if vals else "str" elif ftype == "list[enum]": vals = [v.strip() for v in enums.split(",") if v.strip()] type_str = f"list[enum[{','.join(vals)}]]" if vals else "list[str]" else: type_str = ftype lines.append(f"{field} | {type_str} | {instr}") return "\n".join(lines).strip() + "\n" def build_rows_from_endpoints(selected_endpoints: List[str]) -> Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, List[str]]]: selected_endpoints = selected_endpoints or [] rows: List[Dict[str, Any]] = [] field_key_to_module: Dict[str, str] = {} module_to_keys: Dict[str, List[str]] = {} for r in PRESET_CORE: rows.append(dict(r)) k = slugify_field(r["field"]) field_key_to_module[k] = "Core" module_to_keys.setdefault("Core", []).append(k) for module in selected_endpoints: preset = ENDPOINT_MODULES.get(module) if not preset: continue for r in preset: rows.append(dict(r)) k = slugify_field(r["field"]) field_key_to_module[k] = module module_to_keys.setdefault(module, []).append(k) seen = set() deduped: List[Dict[str, Any]] = [] for r in rows: k = str(r.get("field","")).strip().lower() if not k or k in seen: continue seen.add(k) deduped.append(r) # Rebuild module_to_keys to match deduped dedup_keys = set([slugify_field(r["field"]) for r in deduped]) module_to_keys = {m: [k for k in ks if k in dedup_keys] for m, ks in module_to_keys.items()} return deduped, field_key_to_module, module_to_keys def apply_endpoint_preset(preset_name: str): vals = ENDPOINT_PRESETS.get(preset_name, []) return gr.update(value=vals) def sync_fields_from_endpoints(selected_endpoints: List[str], admin_mode: bool, current_rows: List[Dict[str, Any]], current_spec: str): if admin_mode: df = pd.DataFrame(current_rows or [], columns=["field","type","enum_values","instructions"]) return current_rows, df, current_spec, "Admin mode: endpoint selection will not overwrite custom columns." rows, _, _ = build_rows_from_endpoints(selected_endpoints or []) df = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"]) spec = build_spec_from_field_rows(rows) return rows, df, spec, "✅ Columns updated from selected endpoints." def admin_apply_endpoints(selected_endpoints: List[str]): rows, _, _ = build_rows_from_endpoints(selected_endpoints or []) df = pd.DataFrame(rows, columns=["field","type","enum_values","instructions"]) spec = build_spec_from_field_rows(rows) return rows, df, spec, "✅ Loaded selected endpoints into the builder (Replace)." def fields_add_or_update(field_name: str, ftype: str, enum_values: str, instructions: str, field_rows: List[Dict[str, Any]]): field_name = (field_name or "").strip() ftype = (ftype or "").strip() enum_values = (enum_values or "").strip() instructions = (instructions or "").strip() if not field_name or not ftype: df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"]) return field_rows, df, build_spec_from_field_rows(field_rows), "Field name and type are required." updated = False for r in field_rows: if str(r.get("field","")).strip().lower() == field_name.lower(): r["type"] = ftype r["enum_values"] = enum_values r["instructions"] = instructions updated = True break if not updated: field_rows.append({"field": field_name, "type": ftype, "enum_values": enum_values, "instructions": instructions}) df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"]) return field_rows, df, build_spec_from_field_rows(field_rows), ("Updated field." if updated else "Added field.") def fields_apply_df(field_rows: List[Dict[str, Any]], df_in: Any): try: df = df_in if isinstance(df_in, pd.DataFrame) else pd.DataFrame(df_in, columns=["field","type","enum_values","instructions"]) except Exception: df = pd.DataFrame(field_rows, columns=["field","type","enum_values","instructions"]) return field_rows, df, build_spec_from_field_rows(field_rows), "Could not parse builder table." cleaned = [] seen = set() for _, r in df.iterrows(): field = str(r.get("field","")).strip() ftype = str(r.get("type","")).strip() enums = str(r.get("enum_values","")).strip() instr = str(r.get("instructions","")).strip() if not field or not ftype: continue k = field.lower() if k in seen: continue seen.add(k) cleaned.append({"field": field, "type": ftype, "enum_values": enums, "instructions": instr}) df2 = pd.DataFrame(cleaned, columns=["field","type","enum_values","instructions"]) spec = build_spec_from_field_rows(cleaned) return cleaned, df2, spec, f"✅ Applied builder table ({len(cleaned)} fields)." # ============================= # Row building + “non-empty module” logic # ============================= def _as_list(x) -> List[str]: if x is None: return [] if isinstance(x, list): out = [] for v in x: s = str(v).strip() if s: out.append(s) return out s = str(x).strip() return [s] if s else [] def _format_value(v: Any) -> Any: if isinstance(v, list): return "; ".join([str(x) for x in v if str(x).strip()]) return v EMPTY_STRINGS = {"", "not_reported", "insufficient_data", "none", "na", "n/a", "null"} def _is_empty_value(v: Any) -> bool: if v is None: return True if isinstance(v, float) and np.isnan(v): return True if isinstance(v, list): cleaned = [str(x).strip() for x in v if str(x).strip()] if not cleaned: return True # empty if all items are not_reported / similar return all((c.lower() in EMPTY_STRINGS) for c in cleaned) s = str(v).strip() if not s: return True return s.lower() in EMPTY_STRINGS def _json_default(o: Any): if isinstance(o, np.integer): return int(o) if isinstance(o, np.floating): return float(o) if isinstance(o, np.ndarray): return o.tolist() raise TypeError(f"Object of type {o.__class__.__name__} is not JSON serializable") def _record_id(file_name: str, chemical: str, endpoint: str) -> str: chemical = (chemical or "").strip() or "-" endpoint = (endpoint or "").strip() or "Paper" return f"{file_name} | {chemical} | {endpoint}" def _module_has_any_data(ext: Dict[str, Any], module_keys: List[str], field_props: Dict[str, Any]) -> bool: for k in (module_keys or []): v = ext.get(k, None) if not _is_empty_value(v): return True return False # ============================= # Evidence + report helpers # ============================= def _make_vertical(records: List[Dict[str, Any]], record_id: str) -> pd.DataFrame: if not records or not record_id: return pd.DataFrame(columns=["Field", "Value"]) row = next((r for r in records if r.get("record_id") == record_id), None) if not row: return pd.DataFrame(columns=["Field", "Value"]) hidden = {"record_id"} keys = [k for k in row.keys() if k not in hidden] return pd.DataFrame({"Field": keys, "Value": [row.get(k, "") for k in keys]}) def _render_evidence(details: List[Dict[str, Any]], file_name: str, allowed_fields: Optional[set] = None, max_items: int = 120) -> str: if not details or not file_name: return "" d = next((x for x in details if x.get("_file") == file_name), None) if not d: return "" ev = d.get("evidence", []) or [] lines = [] for e in ev: field = (e.get("field", "") or "").strip() if allowed_fields is not None and field and field not in allowed_fields: continue quote = (e.get("quote", "") or "").strip() pages = (e.get("pages", "") or "").strip() if quote: if len(quote) > 320: quote = quote[:320] + "…" lines.append(f"- **{field}** (pages {pages}): “{quote}”") if len(lines) >= max_items: break header = "### Evidence (grounding)\n" return header + ("\n".join(lines) if lines else "- (no evidence returned)") def _overview_df_from_records(records: List[Dict[str, Any]]) -> pd.DataFrame: if not records: return pd.DataFrame(columns=["record_id","file","paper_title","chemical","endpoint","risk_stance","risk_confidence"]) df = pd.DataFrame(records) cols = ["record_id","file","paper_title","chemical","endpoint","risk_stance","risk_confidence"] cols = [c for c in cols if c in df.columns] return df[cols].copy() if cols else df.head(50) def _risk_badge(risk: str) -> str: r = (risk or "").strip().lower() if r == "acceptable": bg = "#e7f7ed"; fg = "#0f5132" elif r == "acceptable_with_uncertainty": bg = "#fff3cd"; fg = "#664d03" elif r == "not_acceptable": bg = "#f8d7da"; fg = "#842029" else: bg = "#e2e3e5"; fg = "#41464b" label = risk if risk else "unknown" return f'{label}' def _safe_str(x: Any) -> str: if x is None: return "" if isinstance(x, float) and np.isnan(x): return "" return str(x) def render_summary_card(record_id: str, records: List[Dict[str, Any]]) -> str: if not record_id or not records: return "