import os import re import json import textwrap from typing import Any, Dict, List, Tuple import gradio as gr import numpy as np import pandas as pd from pypdf import PdfReader from openai import OpenAI from toxra_core.nlp_pipeline import ( expand_regulatory_queries, extract_evidence_span, hybrid_rank_text_items, ) # ============================= # Pilot limits # ============================= MAX_PDFS = 5 MAX_PAGES_PER_PDF = 20 MAX_CHARS_PER_PAGE_FOR_INDEX = 7000 # cap for cost/stability DEFAULT_EMBEDDING_MODEL = "text-embedding-3-small" DEFAULT_SUMMARY_MODEL = "gpt-4o-mini" # ============================= # Endpoint fallback inference lexicon (Explorer-only) # ============================= ENDPOINT_HINTS: Dict[str, List[str]] = { "Genotoxicity (OECD TG)": [ "genotoxic", "mutagen", "clastogen", "ames", "micronucleus", "comet assay", "chromosomal aberration", "dna damage", "oecd tg 471", "tg471", "oecd tg 473", "tg473", "oecd tg 476", "tg476", "oecd tg 487", "tg487", "oecd tg 490", "tg490", "oecd tg 474", "tg474", "oecd tg 475", "tg475", "oecd tg 488", "tg488", "oecd tg 489", "tg489" ], "NAMs / In Silico": ["in silico", "qsar", "read-across", "aop", "pbpk", "high-throughput", "omics", "organ-on-chip", "microphysiological"], "Acute toxicity": ["acute toxicity", "ld50", "lc50", "single dose", "mortality", "lethality"], "Repeated dose toxicity": ["repeated dose", "subchronic", "chronic", "noael", "loael", "28-day", "90-day", "target organ"], "Irritation / Sensitization": ["skin irritation", "eye irritation", "draize", "sensitization", "llna", "patch test"], "Repro / Developmental": ["reproductive toxicity", "fertility", "developmental toxicity", "teratogen", "prenatal", "postnatal"], "Carcinogenicity": ["carcinogenic", "tumor", "neoplasm", "cancer", "two-year", "bioassay"], } # ============================= # Organ inference (automatic only) # ============================= ORGANS = ["liver", "lung", "kidney", "skin", "gi", "cns", "reproductive", "immune_blood", "mixed", "unknown"] ORGAN_HINTS: Dict[str, List[str]] = { "liver": ["liver", "hepatic", "hepatocyte", "hepatotoxic", "bile", "cholest", "alt", "ast"], "lung": ["lung", "pulmonary", "bronch", "alveol", "airway", "inhalation", "respiratory"], "kidney": ["kidney", "renal", "nephro", "glomerul", "tubul", "creatinine", "bun"], "skin": ["skin", "dermal", "epiderm", "cutaneous", "topical"], "gi": ["gastro", "intestinal", "gut", "colon", "stomach", "oral", "ingestion"], "cns": ["brain", "cns", "neuro", "neuronal", "glia", "blood-brain", "dopamin", "seroton"], "reproductive": ["repro", "testis", "ovary", "uterus", "placent", "fetus", "embryo", "sperm", "oocyte"], "immune_blood": ["immune", "cytok", "inflamm", "blood", "plasma", "serum", "hemat", "lymph", "macrophage"], } def infer_organ_label(doc_text: str) -> str: t = (doc_text or "").lower() scores = {k: 0 for k in ORGAN_HINTS.keys()} for organ, hints in ORGAN_HINTS.items(): for h in hints: if h in t: scores[organ] += 1 best = sorted(scores.items(), key=lambda x: x[1], reverse=True) if not best or best[0][1] == 0: return "unknown" top_org, top_score = best[0] if len(best) > 1 and best[1][1] > 0 and (top_score - best[1][1]) <= 1: return "mixed" return top_org # ============================= # Curated enzymes by organ (starter list) # ============================= ENZYMES_BY_ORGAN: Dict[str, List[str]] = { "liver": ["CYP1A2","CYP2C9","CYP2C19","CYP2D6","CYP2E1","CYP3A4","CYP3A5","UGT1A1","UGT2B7","SULT1A1","GSTA1","GSTP1","ADH","ALDH","CES1","CES2"], "lung": ["CYP1A1","CYP1B1","CYP2F1","GSTP1","MPO","ALDH"], "kidney": ["OAT1","OAT3","OCT2","MATE1","MATE2","GSTP1","GSTA1"], "skin": ["CYP1A1","GSTP1","UGT1A1","SULT1A1","ESTERASE","CES1","CES2"], "gi": ["CYP3A4","UGT1A1","UGT2B7","SULT1A1","ABCB1","P-GP","CES1","CES2"], "cns": ["MAO-A","MAO-B","MAOA","MAOB","COMT","ALDH"], "reproductive": ["AROMATASE","CYP19A1","HSD17B","CYP17A1","UGT2B7"], "immune_blood": ["MPO","COX","PTGS1","PTGS2","LOX","ALOX5"], "mixed": [], "unknown": [], } ENZYME_REGEXES = [ re.compile(r"\bCYP\s?(\d[A-Z]?\d?[A-Z]?\d?)\b", re.IGNORECASE), re.compile(r"\bUGT\s?(\d[A-Z0-9]+)\b", re.IGNORECASE), re.compile(r"\bSULT\s?(\d[A-Z0-9]+)\b", re.IGNORECASE), re.compile(r"\bGST\s?([A-Z0-9]+)\b", re.IGNORECASE), re.compile(r"\bEC\s?(\d+\.\d+\.\d+\.\d+)\b", re.IGNORECASE), ] def detect_enzymes(text: str, organ: str) -> List[str]: t = text or "" up = t.upper() base = ENZYMES_BY_ORGAN.get(organ, []) if organ in ("mixed", "unknown"): base = ["CYP3A4","CYP2D6","CYP2E1","UGT1A1","SULT1A1","GSTP1","ALDH","ADH"] out: List[str] = [] for e in base: if e in up: out.append(e) for rx in ENZYME_REGEXES: for m in rx.finditer(t): g = (m.group(1) or "").upper() if not g: continue if rx.pattern.lower().startswith(r"\bcyp"): v = f"CYP{g}" elif rx.pattern.lower().startswith(r"\bugt"): v = f"UGT{g}" elif rx.pattern.lower().startswith(r"\bsult"): v = f"SULT{g}" elif rx.pattern.lower().startswith(r"\bgst"): v = f"GST{g}" else: v = f"EC {g}" if v not in out: out.append(v) # normalize P-gp variants out2 = [] for x in out: if x in ("P-GP", "PGP", "PGLYCO"): x = "P-gp" out2.append(x) seen = set() final = [] for x in out2: k = x.lower() if k not in seen: seen.add(k) final.append(x) return final # ============================= # Named pathways (starter lexicon) # ============================= PATHWAY_TERMS = [ "oxidative stress", "Nrf2", "AhR", "NF-kB", "p53", "MAPK", "PPAR", "apoptosis", "DNA damage response", "mitochondrial dysfunction", "estrogen receptor", "androgen receptor", "inflammation", "cytokine signaling", ] def detect_pathways(text: str) -> List[str]: t = text or "" tl = t.lower() out = [] for term in PATHWAY_TERMS: if term.lower() in tl: out.append(term) if re.search(r"\bNF[-\s]?κ?B\b", t, flags=re.IGNORECASE) and "NF-kB" not in out: out.append("NF-kB") seen = set() final = [] for x in out: k = x.lower() if k not in seen: seen.add(k) final.append(x) return final # ============================= # PDF utils # ============================= def extract_pages(pdf_path: str, max_pages: int) -> Tuple[List[Tuple[int, str]], int]: reader = PdfReader(pdf_path) total = len(reader.pages) n = min(total, max_pages) pages: List[Tuple[int, str]] = [] for i in range(n): try: txt = reader.pages[i].extract_text() or "" except Exception: txt = "" pages.append((i + 1, txt)) return pages, total def clean_text(t: str) -> str: t = (t or "").replace("\x00", " ") t = re.sub(r"\s+", " ", t).strip() return t def is_text_based(pages: List[Tuple[int, str]]) -> bool: joined = " ".join([clean_text(t) for _, t in pages if clean_text(t)]) return len(joined) >= 200 def hard_wrap(s: str, width: int = 110) -> str: s = (s or "").strip() if not s: return "" return "\n".join(textwrap.fill(line, width=width, break_long_words=True, break_on_hyphens=True) for line in s.splitlines() if line.strip()) # ============================= # OpenAI helpers # ============================= def get_client(api_key: str) -> OpenAI: key = (api_key or "").strip() or os.getenv("OPENAI_API_KEY", "").strip() if not key: raise ValueError("Missing OpenAI API key. Provide it here or set OPENAI_API_KEY secret.") return OpenAI(api_key=key) def batched(xs: List[Any], n: int) -> List[List[Any]]: return [xs[i:i+n] for i in range(0, len(xs), n)] def embed_texts(client: OpenAI, model: str, texts: List[str]) -> np.ndarray: embs: List[List[float]] = [] for b in batched(texts, 64): resp = client.embeddings.create(model=model, input=b) for item in resp.data: embs.append(item.embedding) arr = np.array(embs, dtype=np.float32) norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12 return arr / norms # ============================= # Endpoint detection # ============================= def detect_endpoints(text: str) -> List[str]: t = (text or "").lower() found: List[str] = [] for ep, hints in ENDPOINT_HINTS.items(): for h in hints: if h in t: found.append(ep) break return found # ============================= # Expanded context = 3–5 sentences (PDF lines unreliable) # ============================= def split_sentences(text: str) -> List[str]: t = re.sub(r"\s+", " ", (text or "")).strip() if not t: return [] parts = re.split(r"(?<=[\.\?\!])\s+", t) return [p.strip() for p in parts if p.strip()] def expanded_context(page_text: str, query: str, n_sentences: int = 5) -> str: sents = split_sentences(page_text) if not sents: return "" q = (query or "").strip().lower() if not q: return " ".join(sents[:n_sentences]) qwords = [w for w in re.findall(r"[a-zA-Z0-9\-]+", q) if len(w) >= 3] hit_i = None for i, s in enumerate(sents): sl = s.lower() if any(w in sl for w in qwords): hit_i = i break if hit_i is None: return " ".join(sents[:n_sentences]) start = max(0, hit_i - 2) end = min(len(sents), hit_i + 3) return " ".join(sents[start:end]) # ============================= # Index state object (stored in gr.State) # ============================= def empty_index() -> Dict[str, Any]: return { "papers": [], # {paper_id, file, organ, pages_indexed, text_based} "pages": [], # {paper_id, file, page, text, endpoints, enzymes, pathways} "embeddings": None, # np.ndarray normalized "embedding_model": None, "has_embeddings": False, "enzymes_vocab": [], "pathways_vocab": [], } def build_index(files, api_key: str, embedding_model: str): if not files: return empty_index(), pd.DataFrame(), pd.DataFrame(), "Upload PDFs then click Build Search Index.", gr.update(choices=[""], value=""), gr.update(choices=[""], value="") if len(files) > MAX_PDFS: return empty_index(), pd.DataFrame(), pd.DataFrame(), f"Upload limit exceeded: max {MAX_PDFS} PDFs for pilot.", gr.update(choices=[""], value=""), gr.update(choices=[""], value="") idx = empty_index() papers_rows: List[Dict[str, Any]] = [] page_rows: List[Dict[str, Any]] = [] for f in files: pdf_path = f.name filename = os.path.basename(pdf_path) pages, total = extract_pages(pdf_path, MAX_PAGES_PER_PDF) text_ok = is_text_based(pages) doc_text = " ".join([clean_text(t) for _, t in pages if clean_text(t)]) organ = infer_organ_label(doc_text) if text_ok else "unknown" paper_id = filename papers_rows.append({ "paper_id": paper_id, "file": filename, "organ": organ, "pages_indexed": min(total, MAX_PAGES_PER_PDF), "text_based": bool(text_ok), }) if not text_ok: continue for pno, raw in pages: txt = clean_text(raw) if not txt: continue txt = txt[:MAX_CHARS_PER_PAGE_FOR_INDEX] eps = detect_endpoints(txt) enz = detect_enzymes(txt, organ) pws = detect_pathways(txt) page_rows.append({ "paper_id": paper_id, "file": filename, "page": pno, "text": txt, "endpoints": eps, "enzymes": enz, "pathways": pws, }) idx["papers"] = papers_rows idx["pages"] = page_rows papers_df = pd.DataFrame(papers_rows, columns=["file","organ","pages_indexed","text_based"]) # ✅ Endpoint correlation: present/absent per paper (cleaner) endpoint_names = list(ENDPOINT_HINTS.keys()) matrix = [] for p in papers_rows: if not p.get("text_based"): continue pid = p["paper_id"] p_pages = [r for r in page_rows if r["paper_id"] == pid] row = {"file": p["file"], "organ": p["organ"]} for ep in endpoint_names: present = any(ep in (r.get("endpoints") or []) for r in p_pages) row[ep] = "present" if present else "" matrix.append(row) endpoint_matrix_df = pd.DataFrame(matrix) if matrix else pd.DataFrame(columns=["file","organ"] + endpoint_names) # vocab lists for filters (computed at indexing time) enzymes_vocab = sorted({e for r in page_rows for e in (r.get("enzymes") or [])}) pathways_vocab = sorted({p for r in page_rows for p in (r.get("pathways") or [])}) idx["enzymes_vocab"] = enzymes_vocab idx["pathways_vocab"] = pathways_vocab # embeddings status = "✅ Indexed pages locally (no embeddings)." try: client = get_client(api_key) texts = [r["text"] for r in page_rows] if texts: em = embed_texts(client, embedding_model or DEFAULT_EMBEDDING_MODEL, texts) idx["embeddings"] = em idx["embedding_model"] = embedding_model or DEFAULT_EMBEDDING_MODEL idx["has_embeddings"] = True status = f"✅ Indexed {len(papers_rows)} paper(s), {len(texts)} page(s). Embeddings built ({idx['embedding_model']})." else: status = "⚠️ No text pages found to index (text-based PDFs only)." except Exception as e: status = f"⚠️ Indexed pages, but embeddings unavailable: {e}. You can still run search with fallback ranking." return ( idx, papers_df, endpoint_matrix_df, status, gr.update(choices=[""] + enzymes_vocab, value=""), gr.update(choices=[""] + pathways_vocab, value="") ) def search( query: str, idx: Dict[str, Any], api_key: str, embedding_model: str, summary_model: str, endpoint_filter: List[str], organ_filter: str, enzyme_filter: str, pathway_filter: str, top_k: int, ): query = (query or "").strip() if not query: return pd.DataFrame(), "### Grounded mini-summary\n(type a query)", "### Evidence used\n" if not idx or not idx.get("pages"): return pd.DataFrame(), "### Grounded mini-summary\n(Build the index first)", "### Evidence used\n" pages = idx["pages"] papers = {p["paper_id"]: p for p in (idx.get("papers") or [])} def passes(r: Dict[str, Any]) -> bool: if organ_filter and organ_filter != "any": org = (papers.get(r["paper_id"], {}) or {}).get("organ", "unknown") if org != organ_filter: return False if endpoint_filter: eps = r.get("endpoints") or [] if not any(e in eps for e in endpoint_filter): return False if enzyme_filter: enz = r.get("enzymes") or [] if enzyme_filter not in enz: return False if pathway_filter: pws = r.get("pathways") or [] if pathway_filter not in pws: return False return True filtered_idx = [i for i, r in enumerate(pages) if passes(r)] if not filtered_idx: return pd.DataFrame(), "### Grounded mini-summary\n(No pages match your filters)", "### Evidence used\n" filtered_pages = [pages[i] for i in filtered_idx] emb_mat = None qemb = None if idx.get("has_embeddings") and idx.get("embeddings") is not None: try: client = get_client(api_key) qemb = embed_texts(client, embedding_model or idx.get("embedding_model") or DEFAULT_EMBEDDING_MODEL, [query])[0] emb_mat = idx["embeddings"][filtered_idx, :] except Exception: emb_mat = None qemb = None _, query_families = expand_regulatory_queries( base_queries=[query], endpoint_modules=endpoint_filter or [], frameworks=["FDA CTP", "EPA"], extra_terms=[], ) ranked_pages, rank_diag = hybrid_rank_text_items( items=filtered_pages, query=query, families=query_families, top_k=max(1, int(top_k)), item_embeddings=emb_mat, query_embedding=qemb, ) rows = [] evidence = [] for r in ranked_pages: pid = r["paper_id"] org = (papers.get(pid, {}) or {}).get("organ", "unknown") span = extract_evidence_span(r.get("text", ""), query, page=r.get("page"), n_sentences=5) ctx = span.get("text", "") ctx_wrapped = hard_wrap(ctx, width=110) preview = ctx.strip() preview = (preview[:220] + "…") if len(preview) > 220 else preview rows.append({ "file": r.get("file",""), "page": r.get("page",""), "score": round(float(r.get("_nlp_rrf_score", 0.0)), 4), "organ": org, "endpoints": "; ".join(r.get("endpoints") or []), "enzymes": "; ".join((r.get("enzymes") or [])[:12]), "pathways": "; ".join((r.get("pathways") or [])[:12]), "preview": preview, }) snippet = (ctx_wrapped.replace("\n", " ")[:360] + "…") if len(ctx_wrapped) > 360 else ctx_wrapped.replace("\n", " ") evidence.append(f"- **{r.get('file','')}** (p.{r.get('page','')}): {snippet}") # ✅ Compact table (no long context column) results_df = pd.DataFrame(rows, columns=["file","page","score","organ","endpoints","enzymes","pathways","preview"]) evidence_md = "### Evidence used\n" + "\n".join(evidence[:8]) # grounded mini-summary mini_summary = "(mini-summary unavailable)" try: client = get_client(api_key) payload = [{"file": x["file"], "page": x["page"], "preview": x["preview"]} for x in rows[:8]] system_msg = ( "You are a literature assistant for toxicology researchers. " "Write ONE neutral paragraph that answers the user's query based ONLY on the evidence excerpts. " "Cite sources inline as (File p.X). Do not add outside facts." ) user_msg = "USER QUERY:\n" + query + "\n\nEVIDENCE EXCERPTS:\n" + json.dumps(payload, indent=2) resp = client.responses.create( model=summary_model or DEFAULT_SUMMARY_MODEL, input=[{"role":"system","content":system_msg},{"role":"user","content":user_msg}] ) mini_summary = resp.output_text.strip() except Exception as e: mini_summary = f"(mini-summary unavailable: {e})" if rank_diag: mini_summary = ( f"{mini_summary}\n\n" f"_NLP diagnostics: method={rank_diag.get('ranking_method','')}, " f"coverage={rank_diag.get('coverage_score', 0.0)}._" ) mini_md = "### Grounded mini-summary\n" + mini_summary return results_df, mini_md, evidence_md def on_select_result(df: pd.DataFrame, idx: dict, query: str, evt: gr.SelectData): if df is None or df.empty: return "", "", "", "" # evt.index may be (row, col) or int depending on gradio version row_i = evt.index[0] if isinstance(evt.index, (list, tuple)) else int(evt.index) r = df.iloc[int(row_i)] file = str(r.get("file", "")) page = int(r.get("page", 0)) citation = f"{file} p.{page}" rec = next((x for x in (idx.get("pages", []) or []) if x.get("file")==file and int(x.get("page",0))==page), None) if not rec: meta = f"**{citation}**" return meta, citation, "(page text not found)", "" span = extract_evidence_span(rec.get("text",""), query, page=page, n_sentences=5) ctx = hard_wrap(span.get("text", ""), width=110) full_txt = hard_wrap(rec.get("text",""), width=110) meta = f"**{citation}** | organ: **{r.get('organ','')}** | score: **{r.get('score','')}**" return meta, citation, ctx, full_txt def citation_ready(citation: str): c = (citation or "").strip() if not c: return "Select a result row first." return f"✅ Citation ready: {c} (copy from the box above)" # ============================= # Tab plugin (Option A) # ============================= def build_literature_explorer_tab(): gr.Markdown( "## Literature Explorer (Pilot)\n" f"- Limits: **max {MAX_PDFS} PDFs**, **max {MAX_PAGES_PER_PDF} pages/PDF**\n" "- Text-based PDFs only (not scanned/image PDFs).\n" "- Search is **page-level**; “3–5 lines” is approximated as **3–5 sentences**.\n" ) idx_state = gr.State(empty_index()) with gr.Group(): files = gr.File(label="Upload PDFs (Explorer only)", file_types=[".pdf"], file_count="multiple") with gr.Row(): api_key = gr.Textbox(label="OpenAI API key (Explorer)", type="password") embedding_model = gr.Dropdown(label="Embedding model", choices=["text-embedding-3-small","text-embedding-3-large"], value=DEFAULT_EMBEDDING_MODEL) summary_model = gr.Dropdown(label="Mini-summary model", choices=["gpt-4o-mini","gpt-4o","gpt-4o-2024-08-06"], value=DEFAULT_SUMMARY_MODEL) build_btn = gr.Button("Build Search Index", variant="primary") index_status = gr.Textbox(label="Index status", interactive=False) papers_df = gr.Dataframe(label="Indexed papers", interactive=False, wrap=True) # ✅ Table 2 now present/absent per paper endpoint_matrix_df = gr.Dataframe(label="Endpoint correlation (present/absent per paper)", interactive=False, wrap=True) with gr.Group(): gr.Markdown("### Search across indexed papers") query = gr.Textbox(label="Search query", placeholder="e.g., CYP3A4 oxidative stress and genotoxicity", lines=2) with gr.Row(): endpoint_filter = gr.Dropdown(label="Endpoint filter (optional)", choices=list(ENDPOINT_HINTS.keys()), multiselect=True, value=[]) organ_filter = gr.Dropdown(label="Organ filter (optional)", choices=["any"] + ORGANS, value="any") enzyme_filter = gr.Dropdown(label="Enzyme filter (optional)", choices=[""], value="") pathway_filter = gr.Dropdown(label="Pathway filter (optional)", choices=[""], value="") top_k = gr.Slider(5, 30, value=12, step=1, label="Top results") search_btn = gr.Button("Search", variant="secondary") mini_summary_md = gr.Markdown() # ✅ Table 3 compact (no long context) results_df = gr.Dataframe(label="Search results (compact, page-level)", interactive=False, wrap=True) # ✅ Selected result viewer (context moved out of table) selected_meta = gr.Markdown() citation_box = gr.Textbox(label="Citation (copy/paste)", interactive=False) copy_btn = gr.Button("Copy citation (fills box)", variant="secondary") copy_status = gr.Textbox(label="Copy status", interactive=False) selected_context = gr.Textbox(label="Selected result context (3–5 sentences)", lines=6, interactive=False) with gr.Accordion("Full page text (optional)", open=False): full_page_text = gr.Textbox(label="Full page text", lines=14, interactive=False) evidence_md = gr.Markdown() build_btn.click( fn=build_index, inputs=[files, api_key, embedding_model], outputs=[idx_state, papers_df, endpoint_matrix_df, index_status, enzyme_filter, pathway_filter] ) search_btn.click( fn=search, inputs=[query, idx_state, api_key, embedding_model, summary_model, endpoint_filter, organ_filter, enzyme_filter, pathway_filter, top_k], outputs=[results_df, mini_summary_md, evidence_md] ) results_df.select( fn=on_select_result, inputs=[results_df, idx_state, query], outputs=[selected_meta, citation_box, selected_context, full_page_text] ) copy_btn.click( fn=citation_ready, inputs=[citation_box], outputs=[copy_status] )