NLP_Project / literature_explorer.py
hchevva's picture
Upload 5 files
4bf9d97 verified
import os
import re
import json
import textwrap
from typing import Any, Dict, List, Tuple
import gradio as gr
import numpy as np
import pandas as pd
from pypdf import PdfReader
from openai import OpenAI
from toxra_core.nlp_pipeline import (
expand_regulatory_queries,
extract_evidence_span,
hybrid_rank_text_items,
)
# =============================
# Pilot limits
# =============================
MAX_PDFS = 5
MAX_PAGES_PER_PDF = 20
MAX_CHARS_PER_PAGE_FOR_INDEX = 7000 # cap for cost/stability
DEFAULT_EMBEDDING_MODEL = "text-embedding-3-small"
DEFAULT_SUMMARY_MODEL = "gpt-4o-mini"
# =============================
# Endpoint fallback inference lexicon (Explorer-only)
# =============================
ENDPOINT_HINTS: Dict[str, List[str]] = {
"Genotoxicity (OECD TG)": [
"genotoxic", "mutagen", "clastogen", "ames", "micronucleus", "comet assay",
"chromosomal aberration", "dna damage", "oecd tg 471", "tg471", "oecd tg 473", "tg473",
"oecd tg 476", "tg476", "oecd tg 487", "tg487", "oecd tg 490", "tg490",
"oecd tg 474", "tg474", "oecd tg 475", "tg475", "oecd tg 488", "tg488",
"oecd tg 489", "tg489"
],
"NAMs / In Silico": ["in silico", "qsar", "read-across", "aop", "pbpk", "high-throughput", "omics", "organ-on-chip", "microphysiological"],
"Acute toxicity": ["acute toxicity", "ld50", "lc50", "single dose", "mortality", "lethality"],
"Repeated dose toxicity": ["repeated dose", "subchronic", "chronic", "noael", "loael", "28-day", "90-day", "target organ"],
"Irritation / Sensitization": ["skin irritation", "eye irritation", "draize", "sensitization", "llna", "patch test"],
"Repro / Developmental": ["reproductive toxicity", "fertility", "developmental toxicity", "teratogen", "prenatal", "postnatal"],
"Carcinogenicity": ["carcinogenic", "tumor", "neoplasm", "cancer", "two-year", "bioassay"],
}
# =============================
# Organ inference (automatic only)
# =============================
ORGANS = ["liver", "lung", "kidney", "skin", "gi", "cns", "reproductive", "immune_blood", "mixed", "unknown"]
ORGAN_HINTS: Dict[str, List[str]] = {
"liver": ["liver", "hepatic", "hepatocyte", "hepatotoxic", "bile", "cholest", "alt", "ast"],
"lung": ["lung", "pulmonary", "bronch", "alveol", "airway", "inhalation", "respiratory"],
"kidney": ["kidney", "renal", "nephro", "glomerul", "tubul", "creatinine", "bun"],
"skin": ["skin", "dermal", "epiderm", "cutaneous", "topical"],
"gi": ["gastro", "intestinal", "gut", "colon", "stomach", "oral", "ingestion"],
"cns": ["brain", "cns", "neuro", "neuronal", "glia", "blood-brain", "dopamin", "seroton"],
"reproductive": ["repro", "testis", "ovary", "uterus", "placent", "fetus", "embryo", "sperm", "oocyte"],
"immune_blood": ["immune", "cytok", "inflamm", "blood", "plasma", "serum", "hemat", "lymph", "macrophage"],
}
def infer_organ_label(doc_text: str) -> str:
t = (doc_text or "").lower()
scores = {k: 0 for k in ORGAN_HINTS.keys()}
for organ, hints in ORGAN_HINTS.items():
for h in hints:
if h in t:
scores[organ] += 1
best = sorted(scores.items(), key=lambda x: x[1], reverse=True)
if not best or best[0][1] == 0:
return "unknown"
top_org, top_score = best[0]
if len(best) > 1 and best[1][1] > 0 and (top_score - best[1][1]) <= 1:
return "mixed"
return top_org
# =============================
# Curated enzymes by organ (starter list)
# =============================
ENZYMES_BY_ORGAN: Dict[str, List[str]] = {
"liver": ["CYP1A2","CYP2C9","CYP2C19","CYP2D6","CYP2E1","CYP3A4","CYP3A5","UGT1A1","UGT2B7","SULT1A1","GSTA1","GSTP1","ADH","ALDH","CES1","CES2"],
"lung": ["CYP1A1","CYP1B1","CYP2F1","GSTP1","MPO","ALDH"],
"kidney": ["OAT1","OAT3","OCT2","MATE1","MATE2","GSTP1","GSTA1"],
"skin": ["CYP1A1","GSTP1","UGT1A1","SULT1A1","ESTERASE","CES1","CES2"],
"gi": ["CYP3A4","UGT1A1","UGT2B7","SULT1A1","ABCB1","P-GP","CES1","CES2"],
"cns": ["MAO-A","MAO-B","MAOA","MAOB","COMT","ALDH"],
"reproductive": ["AROMATASE","CYP19A1","HSD17B","CYP17A1","UGT2B7"],
"immune_blood": ["MPO","COX","PTGS1","PTGS2","LOX","ALOX5"],
"mixed": [],
"unknown": [],
}
ENZYME_REGEXES = [
re.compile(r"\bCYP\s?(\d[A-Z]?\d?[A-Z]?\d?)\b", re.IGNORECASE),
re.compile(r"\bUGT\s?(\d[A-Z0-9]+)\b", re.IGNORECASE),
re.compile(r"\bSULT\s?(\d[A-Z0-9]+)\b", re.IGNORECASE),
re.compile(r"\bGST\s?([A-Z0-9]+)\b", re.IGNORECASE),
re.compile(r"\bEC\s?(\d+\.\d+\.\d+\.\d+)\b", re.IGNORECASE),
]
def detect_enzymes(text: str, organ: str) -> List[str]:
t = text or ""
up = t.upper()
base = ENZYMES_BY_ORGAN.get(organ, [])
if organ in ("mixed", "unknown"):
base = ["CYP3A4","CYP2D6","CYP2E1","UGT1A1","SULT1A1","GSTP1","ALDH","ADH"]
out: List[str] = []
for e in base:
if e in up:
out.append(e)
for rx in ENZYME_REGEXES:
for m in rx.finditer(t):
g = (m.group(1) or "").upper()
if not g:
continue
if rx.pattern.lower().startswith(r"\bcyp"):
v = f"CYP{g}"
elif rx.pattern.lower().startswith(r"\bugt"):
v = f"UGT{g}"
elif rx.pattern.lower().startswith(r"\bsult"):
v = f"SULT{g}"
elif rx.pattern.lower().startswith(r"\bgst"):
v = f"GST{g}"
else:
v = f"EC {g}"
if v not in out:
out.append(v)
# normalize P-gp variants
out2 = []
for x in out:
if x in ("P-GP", "PGP", "PGLYCO"):
x = "P-gp"
out2.append(x)
seen = set()
final = []
for x in out2:
k = x.lower()
if k not in seen:
seen.add(k)
final.append(x)
return final
# =============================
# Named pathways (starter lexicon)
# =============================
PATHWAY_TERMS = [
"oxidative stress",
"Nrf2",
"AhR",
"NF-kB",
"p53",
"MAPK",
"PPAR",
"apoptosis",
"DNA damage response",
"mitochondrial dysfunction",
"estrogen receptor",
"androgen receptor",
"inflammation",
"cytokine signaling",
]
def detect_pathways(text: str) -> List[str]:
t = text or ""
tl = t.lower()
out = []
for term in PATHWAY_TERMS:
if term.lower() in tl:
out.append(term)
if re.search(r"\bNF[-\s]?κ?B\b", t, flags=re.IGNORECASE) and "NF-kB" not in out:
out.append("NF-kB")
seen = set()
final = []
for x in out:
k = x.lower()
if k not in seen:
seen.add(k)
final.append(x)
return final
# =============================
# PDF utils
# =============================
def extract_pages(pdf_path: str, max_pages: int) -> Tuple[List[Tuple[int, str]], int]:
reader = PdfReader(pdf_path)
total = len(reader.pages)
n = min(total, max_pages)
pages: List[Tuple[int, str]] = []
for i in range(n):
try:
txt = reader.pages[i].extract_text() or ""
except Exception:
txt = ""
pages.append((i + 1, txt))
return pages, total
def clean_text(t: str) -> str:
t = (t or "").replace("\x00", " ")
t = re.sub(r"\s+", " ", t).strip()
return t
def is_text_based(pages: List[Tuple[int, str]]) -> bool:
joined = " ".join([clean_text(t) for _, t in pages if clean_text(t)])
return len(joined) >= 200
def hard_wrap(s: str, width: int = 110) -> str:
s = (s or "").strip()
if not s:
return ""
return "\n".join(textwrap.fill(line, width=width, break_long_words=True, break_on_hyphens=True)
for line in s.splitlines() if line.strip())
# =============================
# OpenAI helpers
# =============================
def get_client(api_key: str) -> OpenAI:
key = (api_key or "").strip() or os.getenv("OPENAI_API_KEY", "").strip()
if not key:
raise ValueError("Missing OpenAI API key. Provide it here or set OPENAI_API_KEY secret.")
return OpenAI(api_key=key)
def batched(xs: List[Any], n: int) -> List[List[Any]]:
return [xs[i:i+n] for i in range(0, len(xs), n)]
def embed_texts(client: OpenAI, model: str, texts: List[str]) -> np.ndarray:
embs: List[List[float]] = []
for b in batched(texts, 64):
resp = client.embeddings.create(model=model, input=b)
for item in resp.data:
embs.append(item.embedding)
arr = np.array(embs, dtype=np.float32)
norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12
return arr / norms
# =============================
# Endpoint detection
# =============================
def detect_endpoints(text: str) -> List[str]:
t = (text or "").lower()
found: List[str] = []
for ep, hints in ENDPOINT_HINTS.items():
for h in hints:
if h in t:
found.append(ep)
break
return found
# =============================
# Expanded context = 3–5 sentences (PDF lines unreliable)
# =============================
def split_sentences(text: str) -> List[str]:
t = re.sub(r"\s+", " ", (text or "")).strip()
if not t:
return []
parts = re.split(r"(?<=[\.\?\!])\s+", t)
return [p.strip() for p in parts if p.strip()]
def expanded_context(page_text: str, query: str, n_sentences: int = 5) -> str:
sents = split_sentences(page_text)
if not sents:
return ""
q = (query or "").strip().lower()
if not q:
return " ".join(sents[:n_sentences])
qwords = [w for w in re.findall(r"[a-zA-Z0-9\-]+", q) if len(w) >= 3]
hit_i = None
for i, s in enumerate(sents):
sl = s.lower()
if any(w in sl for w in qwords):
hit_i = i
break
if hit_i is None:
return " ".join(sents[:n_sentences])
start = max(0, hit_i - 2)
end = min(len(sents), hit_i + 3)
return " ".join(sents[start:end])
# =============================
# Index state object (stored in gr.State)
# =============================
def empty_index() -> Dict[str, Any]:
return {
"papers": [], # {paper_id, file, organ, pages_indexed, text_based}
"pages": [], # {paper_id, file, page, text, endpoints, enzymes, pathways}
"embeddings": None, # np.ndarray normalized
"embedding_model": None,
"has_embeddings": False,
"enzymes_vocab": [],
"pathways_vocab": [],
}
def build_index(files, api_key: str, embedding_model: str):
if not files:
return empty_index(), pd.DataFrame(), pd.DataFrame(), "Upload PDFs then click Build Search Index.", gr.update(choices=[""], value=""), gr.update(choices=[""], value="")
if len(files) > MAX_PDFS:
return empty_index(), pd.DataFrame(), pd.DataFrame(), f"Upload limit exceeded: max {MAX_PDFS} PDFs for pilot.", gr.update(choices=[""], value=""), gr.update(choices=[""], value="")
idx = empty_index()
papers_rows: List[Dict[str, Any]] = []
page_rows: List[Dict[str, Any]] = []
for f in files:
pdf_path = f.name
filename = os.path.basename(pdf_path)
pages, total = extract_pages(pdf_path, MAX_PAGES_PER_PDF)
text_ok = is_text_based(pages)
doc_text = " ".join([clean_text(t) for _, t in pages if clean_text(t)])
organ = infer_organ_label(doc_text) if text_ok else "unknown"
paper_id = filename
papers_rows.append({
"paper_id": paper_id,
"file": filename,
"organ": organ,
"pages_indexed": min(total, MAX_PAGES_PER_PDF),
"text_based": bool(text_ok),
})
if not text_ok:
continue
for pno, raw in pages:
txt = clean_text(raw)
if not txt:
continue
txt = txt[:MAX_CHARS_PER_PAGE_FOR_INDEX]
eps = detect_endpoints(txt)
enz = detect_enzymes(txt, organ)
pws = detect_pathways(txt)
page_rows.append({
"paper_id": paper_id,
"file": filename,
"page": pno,
"text": txt,
"endpoints": eps,
"enzymes": enz,
"pathways": pws,
})
idx["papers"] = papers_rows
idx["pages"] = page_rows
papers_df = pd.DataFrame(papers_rows, columns=["file","organ","pages_indexed","text_based"])
# ✅ Endpoint correlation: present/absent per paper (cleaner)
endpoint_names = list(ENDPOINT_HINTS.keys())
matrix = []
for p in papers_rows:
if not p.get("text_based"):
continue
pid = p["paper_id"]
p_pages = [r for r in page_rows if r["paper_id"] == pid]
row = {"file": p["file"], "organ": p["organ"]}
for ep in endpoint_names:
present = any(ep in (r.get("endpoints") or []) for r in p_pages)
row[ep] = "present" if present else ""
matrix.append(row)
endpoint_matrix_df = pd.DataFrame(matrix) if matrix else pd.DataFrame(columns=["file","organ"] + endpoint_names)
# vocab lists for filters (computed at indexing time)
enzymes_vocab = sorted({e for r in page_rows for e in (r.get("enzymes") or [])})
pathways_vocab = sorted({p for r in page_rows for p in (r.get("pathways") or [])})
idx["enzymes_vocab"] = enzymes_vocab
idx["pathways_vocab"] = pathways_vocab
# embeddings
status = "✅ Indexed pages locally (no embeddings)."
try:
client = get_client(api_key)
texts = [r["text"] for r in page_rows]
if texts:
em = embed_texts(client, embedding_model or DEFAULT_EMBEDDING_MODEL, texts)
idx["embeddings"] = em
idx["embedding_model"] = embedding_model or DEFAULT_EMBEDDING_MODEL
idx["has_embeddings"] = True
status = f"✅ Indexed {len(papers_rows)} paper(s), {len(texts)} page(s). Embeddings built ({idx['embedding_model']})."
else:
status = "⚠️ No text pages found to index (text-based PDFs only)."
except Exception as e:
status = f"⚠️ Indexed pages, but embeddings unavailable: {e}. You can still run search with fallback ranking."
return (
idx,
papers_df,
endpoint_matrix_df,
status,
gr.update(choices=[""] + enzymes_vocab, value=""),
gr.update(choices=[""] + pathways_vocab, value="")
)
def search(
query: str,
idx: Dict[str, Any],
api_key: str,
embedding_model: str,
summary_model: str,
endpoint_filter: List[str],
organ_filter: str,
enzyme_filter: str,
pathway_filter: str,
top_k: int,
):
query = (query or "").strip()
if not query:
return pd.DataFrame(), "### Grounded mini-summary\n(type a query)", "### Evidence used\n"
if not idx or not idx.get("pages"):
return pd.DataFrame(), "### Grounded mini-summary\n(Build the index first)", "### Evidence used\n"
pages = idx["pages"]
papers = {p["paper_id"]: p for p in (idx.get("papers") or [])}
def passes(r: Dict[str, Any]) -> bool:
if organ_filter and organ_filter != "any":
org = (papers.get(r["paper_id"], {}) or {}).get("organ", "unknown")
if org != organ_filter:
return False
if endpoint_filter:
eps = r.get("endpoints") or []
if not any(e in eps for e in endpoint_filter):
return False
if enzyme_filter:
enz = r.get("enzymes") or []
if enzyme_filter not in enz:
return False
if pathway_filter:
pws = r.get("pathways") or []
if pathway_filter not in pws:
return False
return True
filtered_idx = [i for i, r in enumerate(pages) if passes(r)]
if not filtered_idx:
return pd.DataFrame(), "### Grounded mini-summary\n(No pages match your filters)", "### Evidence used\n"
filtered_pages = [pages[i] for i in filtered_idx]
emb_mat = None
qemb = None
if idx.get("has_embeddings") and idx.get("embeddings") is not None:
try:
client = get_client(api_key)
qemb = embed_texts(client, embedding_model or idx.get("embedding_model") or DEFAULT_EMBEDDING_MODEL, [query])[0]
emb_mat = idx["embeddings"][filtered_idx, :]
except Exception:
emb_mat = None
qemb = None
_, query_families = expand_regulatory_queries(
base_queries=[query],
endpoint_modules=endpoint_filter or [],
frameworks=["FDA CTP", "EPA"],
extra_terms=[],
)
ranked_pages, rank_diag = hybrid_rank_text_items(
items=filtered_pages,
query=query,
families=query_families,
top_k=max(1, int(top_k)),
item_embeddings=emb_mat,
query_embedding=qemb,
)
rows = []
evidence = []
for r in ranked_pages:
pid = r["paper_id"]
org = (papers.get(pid, {}) or {}).get("organ", "unknown")
span = extract_evidence_span(r.get("text", ""), query, page=r.get("page"), n_sentences=5)
ctx = span.get("text", "")
ctx_wrapped = hard_wrap(ctx, width=110)
preview = ctx.strip()
preview = (preview[:220] + "…") if len(preview) > 220 else preview
rows.append({
"file": r.get("file",""),
"page": r.get("page",""),
"score": round(float(r.get("_nlp_rrf_score", 0.0)), 4),
"organ": org,
"endpoints": "; ".join(r.get("endpoints") or []),
"enzymes": "; ".join((r.get("enzymes") or [])[:12]),
"pathways": "; ".join((r.get("pathways") or [])[:12]),
"preview": preview,
})
snippet = (ctx_wrapped.replace("\n", " ")[:360] + "…") if len(ctx_wrapped) > 360 else ctx_wrapped.replace("\n", " ")
evidence.append(f"- **{r.get('file','')}** (p.{r.get('page','')}): {snippet}")
# ✅ Compact table (no long context column)
results_df = pd.DataFrame(rows, columns=["file","page","score","organ","endpoints","enzymes","pathways","preview"])
evidence_md = "### Evidence used\n" + "\n".join(evidence[:8])
# grounded mini-summary
mini_summary = "(mini-summary unavailable)"
try:
client = get_client(api_key)
payload = [{"file": x["file"], "page": x["page"], "preview": x["preview"]} for x in rows[:8]]
system_msg = (
"You are a literature assistant for toxicology researchers. "
"Write ONE neutral paragraph that answers the user's query based ONLY on the evidence excerpts. "
"Cite sources inline as (File p.X). Do not add outside facts."
)
user_msg = "USER QUERY:\n" + query + "\n\nEVIDENCE EXCERPTS:\n" + json.dumps(payload, indent=2)
resp = client.responses.create(
model=summary_model or DEFAULT_SUMMARY_MODEL,
input=[{"role":"system","content":system_msg},{"role":"user","content":user_msg}]
)
mini_summary = resp.output_text.strip()
except Exception as e:
mini_summary = f"(mini-summary unavailable: {e})"
if rank_diag:
mini_summary = (
f"{mini_summary}\n\n"
f"_NLP diagnostics: method={rank_diag.get('ranking_method','')}, "
f"coverage={rank_diag.get('coverage_score', 0.0)}._"
)
mini_md = "### Grounded mini-summary\n" + mini_summary
return results_df, mini_md, evidence_md
def on_select_result(df: pd.DataFrame, idx: dict, query: str, evt: gr.SelectData):
if df is None or df.empty:
return "", "", "", ""
# evt.index may be (row, col) or int depending on gradio version
row_i = evt.index[0] if isinstance(evt.index, (list, tuple)) else int(evt.index)
r = df.iloc[int(row_i)]
file = str(r.get("file", ""))
page = int(r.get("page", 0))
citation = f"{file} p.{page}"
rec = next((x for x in (idx.get("pages", []) or []) if x.get("file")==file and int(x.get("page",0))==page), None)
if not rec:
meta = f"**{citation}**"
return meta, citation, "(page text not found)", ""
span = extract_evidence_span(rec.get("text",""), query, page=page, n_sentences=5)
ctx = hard_wrap(span.get("text", ""), width=110)
full_txt = hard_wrap(rec.get("text",""), width=110)
meta = f"**{citation}** | organ: **{r.get('organ','')}** | score: **{r.get('score','')}**"
return meta, citation, ctx, full_txt
def citation_ready(citation: str):
c = (citation or "").strip()
if not c:
return "Select a result row first."
return f"✅ Citation ready: {c} (copy from the box above)"
# =============================
# Tab plugin (Option A)
# =============================
def build_literature_explorer_tab():
gr.Markdown(
"## Literature Explorer (Pilot)\n"
f"- Limits: **max {MAX_PDFS} PDFs**, **max {MAX_PAGES_PER_PDF} pages/PDF**\n"
"- Text-based PDFs only (not scanned/image PDFs).\n"
"- Search is **page-level**; “3–5 lines” is approximated as **3–5 sentences**.\n"
)
idx_state = gr.State(empty_index())
with gr.Group():
files = gr.File(label="Upload PDFs (Explorer only)", file_types=[".pdf"], file_count="multiple")
with gr.Row():
api_key = gr.Textbox(label="OpenAI API key (Explorer)", type="password")
embedding_model = gr.Dropdown(label="Embedding model", choices=["text-embedding-3-small","text-embedding-3-large"], value=DEFAULT_EMBEDDING_MODEL)
summary_model = gr.Dropdown(label="Mini-summary model", choices=["gpt-4o-mini","gpt-4o","gpt-4o-2024-08-06"], value=DEFAULT_SUMMARY_MODEL)
build_btn = gr.Button("Build Search Index", variant="primary")
index_status = gr.Textbox(label="Index status", interactive=False)
papers_df = gr.Dataframe(label="Indexed papers", interactive=False, wrap=True)
# ✅ Table 2 now present/absent per paper
endpoint_matrix_df = gr.Dataframe(label="Endpoint correlation (present/absent per paper)", interactive=False, wrap=True)
with gr.Group():
gr.Markdown("### Search across indexed papers")
query = gr.Textbox(label="Search query", placeholder="e.g., CYP3A4 oxidative stress and genotoxicity", lines=2)
with gr.Row():
endpoint_filter = gr.Dropdown(label="Endpoint filter (optional)", choices=list(ENDPOINT_HINTS.keys()), multiselect=True, value=[])
organ_filter = gr.Dropdown(label="Organ filter (optional)", choices=["any"] + ORGANS, value="any")
enzyme_filter = gr.Dropdown(label="Enzyme filter (optional)", choices=[""], value="")
pathway_filter = gr.Dropdown(label="Pathway filter (optional)", choices=[""], value="")
top_k = gr.Slider(5, 30, value=12, step=1, label="Top results")
search_btn = gr.Button("Search", variant="secondary")
mini_summary_md = gr.Markdown()
# ✅ Table 3 compact (no long context)
results_df = gr.Dataframe(label="Search results (compact, page-level)", interactive=False, wrap=True)
# ✅ Selected result viewer (context moved out of table)
selected_meta = gr.Markdown()
citation_box = gr.Textbox(label="Citation (copy/paste)", interactive=False)
copy_btn = gr.Button("Copy citation (fills box)", variant="secondary")
copy_status = gr.Textbox(label="Copy status", interactive=False)
selected_context = gr.Textbox(label="Selected result context (3–5 sentences)", lines=6, interactive=False)
with gr.Accordion("Full page text (optional)", open=False):
full_page_text = gr.Textbox(label="Full page text", lines=14, interactive=False)
evidence_md = gr.Markdown()
build_btn.click(
fn=build_index,
inputs=[files, api_key, embedding_model],
outputs=[idx_state, papers_df, endpoint_matrix_df, index_status, enzyme_filter, pathway_filter]
)
search_btn.click(
fn=search,
inputs=[query, idx_state, api_key, embedding_model, summary_model, endpoint_filter, organ_filter, enzyme_filter, pathway_filter, top_k],
outputs=[results_df, mini_summary_md, evidence_md]
)
results_df.select(
fn=on_select_result,
inputs=[results_df, idx_state, query],
outputs=[selected_meta, citation_box, selected_context, full_page_text]
)
copy_btn.click(
fn=citation_ready,
inputs=[citation_box],
outputs=[copy_status]
)