TOPICMODELLING / tools.py
Milind Kamat
Add PDF Downloader workbench
614dd95
"""
tools.py
Deterministic operations the UI calls. Shared across both pipelines for
CSV ingest; embedding-specific operations and TCCM-specific operations
are clearly grouped within.
No LLM calls here. The agent module is invoked from app.py only.
"""
from __future__ import annotations
import io
import csv
import json
import os
import numpy as np
import pandas as pd
import hdbscan
import database as db
import embeddings as emb
import tccm_engine as tccm
# ============================================================
# SHARED: CSV ingest
# ============================================================
REQUIRED_COLS = {"title", "abstract"}
OPTIONAL_COLS = {"doi", "year", "author_keywords"}
# Scopus exports use capitalised, space-separated column names. Map them
# to the lowercase snake_case schema the app uses. Detection is by
# fingerprint: if the CSV has these specific Scopus columns, rename
# silently; otherwise leave alone (user already used the lowercase names).
SCOPUS_COLUMN_MAP = {
"Title": "title",
"Abstract": "abstract",
"DOI": "doi",
"Year": "year",
"Author Keywords": "author_keywords",
}
def _normalise_columns(df: pd.DataFrame) -> pd.DataFrame:
"""If the CSV looks like a Scopus export, rename columns to the
schema the app expects. Otherwise return unchanged."""
cols = set(df.columns)
# Fingerprint: Scopus always exports both 'Title' and 'Abstract'
# capitalised. If both are present, treat as Scopus export.
if "Title" in cols and "Abstract" in cols:
rename_map = {k: v for k, v in SCOPUS_COLUMN_MAP.items() if k in cols}
df = df.rename(columns=rename_map)
return df
def ingest_csv(file_path: str, progress_callback=None) -> dict:
"""
Load a CSV and upsert papers into Supabase.
Required columns: title, abstract (or Scopus-style 'Title', 'Abstract')
Optional columns: doi, year, author_keywords (or 'DOI', 'Year',
'Author Keywords' from a Scopus export)
Other columns are ignored.
progress_callback: optional callable(fraction: float, desc: str) so the
UI can render a real progress bar. fraction in [0.0, 1.0].
"""
def report(frac, desc):
if progress_callback is not None:
progress_callback(frac, desc)
report(0.05, "Reading CSV file")
df = pd.read_csv(file_path, encoding="utf-8-sig")
n_total_rows = len(df)
report(0.15, f"CSV parsed: {n_total_rows} rows, {len(df.columns)} columns")
df = _normalise_columns(df)
report(0.20, "Column names normalised")
missing = REQUIRED_COLS - set(df.columns)
if missing:
raise ValueError(
f"CSV missing required columns: {missing}. "
f"Expected either lowercase ('title','abstract') or "
f"Scopus-style ('Title','Abstract')."
)
for col in OPTIONAL_COLS:
if col not in df.columns:
df[col] = "" if col != "year" else None
df = df.dropna(subset=["title"]).reset_index(drop=True)
if len(df) == 0:
raise ValueError("CSV has no rows with a non-empty title.")
# Stable paper IDs based on row order
df["paper_id"] = [f"P{i+1:04d}" for i in range(len(df))]
# Coerce year to int when possible
def _year_to_int(v):
try:
if pd.isna(v):
return None
return int(v)
except (TypeError, ValueError):
return None
df["year"] = df["year"].apply(_year_to_int)
# Replace pandas NaN with empty string in text columns; the database
# layer also has a defensive scrubber, but cleaning at source means
# downstream consumers (export, embedding) see consistent data.
for col in ("doi", "title", "abstract", "author_keywords"):
df[col] = df[col].fillna("").astype(str)
report(0.25, f"Validated {len(df)} rows; preparing upload")
rows = df[
["paper_id", "doi", "title", "abstract", "year", "author_keywords"]
].to_dict("records")
# Upload in chunks of 200 (smaller than db.upsert_papers default 500
# so the user sees movement on the progress bar)
chunk_size = 200
n = len(rows)
n_chunks = (n + chunk_size - 1) // chunk_size
for i in range(0, n, chunk_size):
chunk = rows[i:i + chunk_size]
db.upsert_papers(chunk)
chunk_idx = (i // chunk_size) + 1
# Map chunk progress into the 0.25 -> 0.99 range
frac = 0.25 + (0.74 * chunk_idx / n_chunks)
report(frac, f"Uploaded batch {chunk_idx}/{n_chunks} to Supabase ({i + len(chunk)}/{n} rows)")
report(1.0, f"Done. {n} papers in database.")
return {
"n_papers_loaded": n,
"first_id": rows[0]["paper_id"],
"last_id": rows[-1]["paper_id"],
}
def list_papers() -> list[dict]:
return db.get_all_papers()
# ============================================================
# EMBEDDING PIPELINE
# ============================================================
VECTORS_PATH = "vectors_10d.npy"
VECTORS_IDS_PATH = "vectors_paper_ids.json"
def embed_papers(progress_callback=None) -> dict:
"""
Run SPECTER 2 embedding on every paper in Supabase. Save 10D and 2D
UMAP reductions; the 2D coords go into Supabase for the scatter plot,
the 10D vectors are cached on local disk for the clustering step.
"""
papers = db.get_all_papers()
if not papers:
raise ValueError("No papers in database. Ingest a CSV first.")
titles = [p["title"] for p in papers]
abstracts = [p.get("abstract") or "" for p in papers]
paper_ids = [p["paper_id"] for p in papers]
vectors_768 = emb.embed_texts(titles, abstracts,
progress_callback=progress_callback)
vectors_10d = emb.reduce_dimensions(vectors_768, n_components=10)
vectors_2d = emb.reduce_dimensions(vectors_768, n_components=2)
db.save_2d_coords_bulk([
{"paper_id": pid, "coords": coord.tolist()}
for pid, coord in zip(paper_ids, vectors_2d)
])
np.save(VECTORS_PATH, vectors_10d)
with open(VECTORS_IDS_PATH, "w") as f:
json.dump(paper_ids, f)
return {
"n_papers": len(papers),
"embedding_dim": int(vectors_768.shape[1]),
"reduced_dim": int(vectors_10d.shape[1]),
"vectors_path": VECTORS_PATH,
}
def _load_cached_vectors() -> tuple[np.ndarray, list[str]]:
if not os.path.exists(VECTORS_PATH):
raise RuntimeError(
"No embeddings found. Run the Embed step first."
)
vectors = np.load(VECTORS_PATH)
with open(VECTORS_IDS_PATH) as f:
paper_ids = json.load(f)
return vectors, paper_ids
def hdbscan_grid_sweep(grid: list[dict]) -> list[dict]:
vectors, _ = _load_cached_vectors()
out = []
for cfg in grid:
c = hdbscan.HDBSCAN(
min_cluster_size=cfg.get("min_cluster_size", 8),
min_samples=cfg.get("min_samples", 3),
max_cluster_size=cfg.get("max_cluster_size", 0),
metric="euclidean",
cluster_selection_method=cfg.get("cluster_selection_method", "eom"),
)
labels = c.fit_predict(vectors)
n_clusters = int(len(set(labels)) - (1 if -1 in labels else 0))
n_noise = int((labels == -1).sum())
sizes = sorted(
[int((labels == cid).sum()) for cid in set(labels) if cid != -1],
reverse=True,
)
out.append({
"config": cfg,
"n_clusters": n_clusters,
"n_noise": n_noise,
"noise_pct": round(100 * n_noise / max(len(labels), 1), 1),
"cluster_sizes": sizes,
})
return out
def run_clustering(params: dict, notes: str = "") -> int:
vectors, paper_ids = _load_cached_vectors()
clusterer = hdbscan.HDBSCAN(
min_cluster_size=params.get("min_cluster_size", 8),
min_samples=params.get("min_samples", 3),
max_cluster_size=params.get("max_cluster_size", 0),
metric="euclidean",
cluster_selection_method=params.get("cluster_selection_method", "eom"),
)
labels = clusterer.fit_predict(vectors).tolist()
probs = clusterer.probabilities_.tolist()
n_clusters = int(len(set(labels)) - (1 if -1 in labels else 0))
n_noise = int(sum(1 for l in labels if l == -1))
run_id = db.create_clustering_run(params, n_clusters, n_noise, notes)
db.save_cluster_assignments(run_id, paper_ids, labels, probs)
return run_id
def top_representative_papers(run_id: int, n: int = 3) -> dict[int, list[dict]]:
rows = db.get_cluster_assignments(run_id)
by_cluster: dict[int, list[dict]] = {}
for r in rows:
if r["cluster_id"] == -1:
continue
by_cluster.setdefault(r["cluster_id"], []).append(r)
out = {}
for cid, papers in by_cluster.items():
papers_sorted = sorted(
papers, key=lambda p: p["membership_prob"], reverse=True
)
out[cid] = papers_sorted[:n]
return out
def upsert_cluster_label(run_id: int, cluster_id: int, label: str,
subject: str = "", object_: str = "",
phenomenon: str = "", rationale: str = "",
top_paper_ids: list[str] | None = None,
validated_by_researcher: bool = False,
locked: bool = False,
researcher_notes: str = "") -> None:
db.save_cluster_label(
run_id=run_id, cluster_id=cluster_id, label=label,
subject=subject, object_=object_, phenomenon=phenomenon,
rationale=rationale, top_paper_ids=top_paper_ids,
validated_by_researcher=validated_by_researcher, locked=locked,
researcher_notes=researcher_notes,
)
def list_clustering_runs() -> list[dict]:
return db.list_clustering_runs()
def cluster_summary_for_run(run_id: int) -> list[dict]:
assignments = db.get_cluster_assignments(run_id)
labels = {l["cluster_id"]: l for l in db.get_cluster_labels(run_id)}
by_cluster: dict[int, list[dict]] = {}
for r in assignments:
by_cluster.setdefault(r["cluster_id"], []).append(r)
summaries = []
for cid, papers in sorted(by_cluster.items()):
if cid == -1:
summaries.append({
"cluster_id": -1, "label": "(noise)",
"n_papers": len(papers),
"subject": "", "object": "", "phenomenon": "",
"validated": False, "locked": False,
"rationale": "Papers HDBSCAN could not place in any cluster",
})
continue
lab = labels.get(cid, {})
summaries.append({
"cluster_id": cid,
"label": lab.get("label", ""),
"n_papers": len(papers),
"subject": lab.get("subject", ""),
"object": lab.get("object", ""),
"phenomenon": lab.get("phenomenon", ""),
"validated": lab.get("validated_by_researcher", False),
"locked": lab.get("locked", False),
"rationale": lab.get("rationale", ""),
"researcher_notes": lab.get("researcher_notes", ""),
})
return summaries
def export_clustering_run_csv(run_id: int) -> tuple[str, str]:
assignments = db.get_cluster_assignments(run_id)
labels = {l["cluster_id"]: l for l in db.get_cluster_labels(run_id)}
paper_lookup = {p["paper_id"]: p for p in db.get_all_papers()}
pbuf = io.StringIO()
pwriter = csv.writer(pbuf)
pwriter.writerow([
"paper_id", "doi", "title", "year", "author_keywords",
"cluster_id", "cluster_label", "membership_prob",
])
for r in assignments:
cid = r["cluster_id"]
lab = labels.get(cid, {}).get("label",
"noise" if cid == -1 else "")
p = paper_lookup.get(r["paper_id"], {})
pwriter.writerow([
r["paper_id"], p.get("doi", ""), r["title"], r["year"],
p.get("author_keywords", ""),
cid, lab, r["membership_prob"],
])
cbuf = io.StringIO()
cwriter = csv.writer(cbuf)
cwriter.writerow([
"cluster_id", "label", "n_papers", "subject", "object",
"phenomenon", "validated_by_researcher", "locked", "rationale",
"researcher_notes",
])
for s in cluster_summary_for_run(run_id):
cwriter.writerow([
s["cluster_id"], s["label"], s["n_papers"],
s["subject"], s["object"], s["phenomenon"],
s.get("validated", False), s.get("locked", False),
s["rationale"], s.get("researcher_notes", ""),
])
return pbuf.getvalue(), cbuf.getvalue()
# ============================================================
# TCCM PIPELINE
# ============================================================
_compiled_patterns: dict | None = None
def _patterns():
global _compiled_patterns
if _compiled_patterns is None:
_compiled_patterns = tccm.load_patterns()
return _compiled_patterns
def tccm_pattern_summary() -> dict[str, int]:
return tccm.pattern_set_summary(_patterns())
def run_tccm_screening(notes: str = "") -> int:
"""
Run the deterministic TCCM screener on every paper in Supabase.
Persists a tccm_runs row plus tccm_classifications rows.
Returns the new run_id.
"""
papers = db.get_all_papers()
if not papers:
raise ValueError("No papers in database. Ingest a CSV first.")
rows = tccm.screen_corpus(papers, _patterns())
summary = tccm.summarise_run(rows)
run_id = db.create_tccm_run(
pattern_set_version=tccm.DEFAULT_PATTERN_VERSION,
threshold_rule="anti_dominates",
n_papers=summary["total"],
n_include=summary["INCLUDE"],
n_exclude=summary["EXCLUDE"],
n_marginal=summary["MARGINAL"],
notes=notes,
)
db.save_tccm_classifications(run_id, rows)
return run_id
def list_tccm_runs() -> list[dict]:
return db.list_tccm_runs()
def get_tccm_run_papers(run_id: int,
verdict_filter: str | None = None) -> list[dict]:
return db.get_tccm_classifications(run_id, verdict_filter)
def save_tccm_marginal_review(run_id: int, paper_id: str,
agent_verdict: str, agent_rationale: str,
researcher_verdict: str | None,
researcher_notes: str = "") -> None:
db.save_marginal_review(
run_id=run_id, paper_id=paper_id,
agent_verdict=agent_verdict, agent_rationale=agent_rationale,
researcher_verdict=researcher_verdict,
researcher_notes=researcher_notes,
)
def export_tccm_run_csv(run_id: int) -> tuple[str, str, str]:
rows = db.get_tccm_classifications(run_id)
papers_index = {p["paper_id"]: p for p in db.get_all_papers()}
primary_csv, signal_csv = tccm.export_run_to_csv(rows, papers_index)
summary = {"total": len(rows), "INCLUDE": 0, "EXCLUDE": 0, "MARGINAL": 0}
for r in rows:
if r["verdict"] in summary:
summary[r["verdict"]] += 1
summary_text = (
f"TCCM screening summary (run {run_id})\n"
f" pattern set: {tccm.DEFAULT_PATTERN_VERSION}\n"
f" threshold rule: anti-signals must equal or exceed primary signals to EXCLUDE\n\n"
f" total papers: {summary['total']}\n"
f" INCLUDE: {summary['INCLUDE']}\n"
f" EXCLUDE: {summary['EXCLUDE']}\n"
f" MARGINAL: {summary['MARGINAL']}\n"
)
return primary_csv, signal_csv, summary_text
# ============================================================
# PDF DOWNLOADER PIPELINE
# ============================================================
import pdf_downloader as pdfd
def run_pdf_discovery(email: str, include_arxiv: bool = True,
include_scihub: bool = False,
rate_limit_per_sec: float = 2.0,
progress_callback=None) -> dict:
"""
Query Unpaywall (and optionally arXiv, Sci-Hub) for every paper in
the database. Saves results to the pdf_downloads table.
"""
papers = db.get_all_papers()
if not papers:
raise ValueError("No papers in database. Ingest a CSV first.")
if not email or "@" not in email:
raise ValueError("Valid email required for Unpaywall API.")
discoveries = pdfd.bulk_discover(
papers=papers, email=email,
include_arxiv=include_arxiv, include_scihub=include_scihub,
rate_limit_per_sec=rate_limit_per_sec,
progress_callback=progress_callback,
)
db.save_pdf_discoveries(discoveries)
n_oa = sum(1 for d in discoveries if d.get("pdf_url"))
n_via_unpw = sum(1 for d in discoveries if d.get("source") == "unpaywall")
n_via_arxiv = sum(1 for d in discoveries if d.get("source") == "arxiv")
n_via_scihub = sum(1 for d in discoveries if d.get("source") == "scihub")
return {
"n_papers": len(discoveries),
"n_pdf_found": n_oa,
"n_unpaywall": n_via_unpw,
"n_arxiv": n_via_arxiv,
"n_scihub": n_via_scihub,
"n_no_pdf": len(discoveries) - n_oa,
}
def run_pdf_download(email: str, skip_existing: bool = True,
push_to_supabase: bool = True,
rate_limit_per_sec: float = 2.0,
progress_callback=None) -> dict:
"""
Download PDFs for every paper that has a discovered pdf_url. Updates
the pdf_downloads table with status.
"""
if not email or "@" not in email:
raise ValueError("Valid email required for downloads.")
rows = db.get_pdf_status()
candidates = [
{"paper_id": r["paper_id"],
"pdf_url": r.get("pdf_url"),
"source": r.get("source")}
for r in rows if r.get("pdf_url")
]
if not candidates:
raise ValueError("No discovered PDFs to download. Run discovery first.")
results = pdfd.bulk_download(
discoveries=candidates, email=email,
skip_existing=skip_existing, push_to_supabase=push_to_supabase,
rate_limit_per_sec=rate_limit_per_sec,
progress_callback=progress_callback,
)
db.save_pdf_downloads(results)
n_ok = sum(1 for r in results if r.get("downloaded"))
n_fail = sum(1 for r in results if not r.get("downloaded"))
n_pushed = sum(1 for r in results if r.get("uploaded_to_supabase"))
total_mb = sum((r.get("file_size") or 0) for r in results) / (1024 ** 2)
return {
"n_attempted": len(results),
"n_downloaded": n_ok,
"n_failed": n_fail,
"n_uploaded_to_supabase": n_pushed,
"total_size_mb": round(total_mb, 1),
}
def run_crossref_enrichment(email: str,
rate_limit_per_sec: float = 2.0,
progress_callback=None) -> dict:
"""
Fetch Crossref metadata (citation count, abstract, references) for
every paper with a DOI. Saves to crossref_metadata table.
"""
papers = db.get_all_papers()
papers_with_doi = [p for p in papers if p.get("doi")]
if not papers_with_doi:
raise ValueError("No papers with DOIs in database.")
results = pdfd.bulk_crossref(
papers=papers_with_doi, email=email,
rate_limit_per_sec=rate_limit_per_sec,
progress_callback=progress_callback,
)
db.save_crossref_metadata(results)
n_ok = sum(1 for r in results if not r.get("error"))
n_with_abstract = sum(1 for r in results if r.get("abstract"))
n_with_citations = sum(1 for r in results if r.get("citation_count") is not None)
return {
"n_attempted": len(results),
"n_enriched": n_ok,
"n_with_abstract": n_with_abstract,
"n_with_citations": n_with_citations,
}
def list_pdf_status() -> list[dict]:
return db.list_pdfs_with_metadata()
def export_pdf_zip() -> str:
return pdfd.build_pdf_zip("/tmp/all_pdfs.zip")