""" tools.py Deterministic operations the UI calls. Shared across both pipelines for CSV ingest; embedding-specific operations and TCCM-specific operations are clearly grouped within. No LLM calls here. The agent module is invoked from app.py only. """ from __future__ import annotations import io import csv import json import os import numpy as np import pandas as pd import hdbscan import database as db import embeddings as emb import tccm_engine as tccm # ============================================================ # SHARED: CSV ingest # ============================================================ REQUIRED_COLS = {"title", "abstract"} OPTIONAL_COLS = {"doi", "year", "author_keywords"} # Scopus exports use capitalised, space-separated column names. Map them # to the lowercase snake_case schema the app uses. Detection is by # fingerprint: if the CSV has these specific Scopus columns, rename # silently; otherwise leave alone (user already used the lowercase names). SCOPUS_COLUMN_MAP = { "Title": "title", "Abstract": "abstract", "DOI": "doi", "Year": "year", "Author Keywords": "author_keywords", } def _normalise_columns(df: pd.DataFrame) -> pd.DataFrame: """If the CSV looks like a Scopus export, rename columns to the schema the app expects. Otherwise return unchanged.""" cols = set(df.columns) # Fingerprint: Scopus always exports both 'Title' and 'Abstract' # capitalised. If both are present, treat as Scopus export. if "Title" in cols and "Abstract" in cols: rename_map = {k: v for k, v in SCOPUS_COLUMN_MAP.items() if k in cols} df = df.rename(columns=rename_map) return df def ingest_csv(file_path: str, progress_callback=None) -> dict: """ Load a CSV and upsert papers into Supabase. Required columns: title, abstract (or Scopus-style 'Title', 'Abstract') Optional columns: doi, year, author_keywords (or 'DOI', 'Year', 'Author Keywords' from a Scopus export) Other columns are ignored. progress_callback: optional callable(fraction: float, desc: str) so the UI can render a real progress bar. fraction in [0.0, 1.0]. """ def report(frac, desc): if progress_callback is not None: progress_callback(frac, desc) report(0.05, "Reading CSV file") df = pd.read_csv(file_path, encoding="utf-8-sig") n_total_rows = len(df) report(0.15, f"CSV parsed: {n_total_rows} rows, {len(df.columns)} columns") df = _normalise_columns(df) report(0.20, "Column names normalised") missing = REQUIRED_COLS - set(df.columns) if missing: raise ValueError( f"CSV missing required columns: {missing}. " f"Expected either lowercase ('title','abstract') or " f"Scopus-style ('Title','Abstract')." ) for col in OPTIONAL_COLS: if col not in df.columns: df[col] = "" if col != "year" else None df = df.dropna(subset=["title"]).reset_index(drop=True) if len(df) == 0: raise ValueError("CSV has no rows with a non-empty title.") # Stable paper IDs based on row order df["paper_id"] = [f"P{i+1:04d}" for i in range(len(df))] # Coerce year to int when possible def _year_to_int(v): try: if pd.isna(v): return None return int(v) except (TypeError, ValueError): return None df["year"] = df["year"].apply(_year_to_int) # Replace pandas NaN with empty string in text columns; the database # layer also has a defensive scrubber, but cleaning at source means # downstream consumers (export, embedding) see consistent data. for col in ("doi", "title", "abstract", "author_keywords"): df[col] = df[col].fillna("").astype(str) report(0.25, f"Validated {len(df)} rows; preparing upload") rows = df[ ["paper_id", "doi", "title", "abstract", "year", "author_keywords"] ].to_dict("records") # Upload in chunks of 200 (smaller than db.upsert_papers default 500 # so the user sees movement on the progress bar) chunk_size = 200 n = len(rows) n_chunks = (n + chunk_size - 1) // chunk_size for i in range(0, n, chunk_size): chunk = rows[i:i + chunk_size] db.upsert_papers(chunk) chunk_idx = (i // chunk_size) + 1 # Map chunk progress into the 0.25 -> 0.99 range frac = 0.25 + (0.74 * chunk_idx / n_chunks) report(frac, f"Uploaded batch {chunk_idx}/{n_chunks} to Supabase ({i + len(chunk)}/{n} rows)") report(1.0, f"Done. {n} papers in database.") return { "n_papers_loaded": n, "first_id": rows[0]["paper_id"], "last_id": rows[-1]["paper_id"], } def list_papers() -> list[dict]: return db.get_all_papers() # ============================================================ # EMBEDDING PIPELINE # ============================================================ VECTORS_PATH = "vectors_10d.npy" VECTORS_IDS_PATH = "vectors_paper_ids.json" def embed_papers(progress_callback=None) -> dict: """ Run SPECTER 2 embedding on every paper in Supabase. Save 10D and 2D UMAP reductions; the 2D coords go into Supabase for the scatter plot, the 10D vectors are cached on local disk for the clustering step. """ papers = db.get_all_papers() if not papers: raise ValueError("No papers in database. Ingest a CSV first.") titles = [p["title"] for p in papers] abstracts = [p.get("abstract") or "" for p in papers] paper_ids = [p["paper_id"] for p in papers] vectors_768 = emb.embed_texts(titles, abstracts, progress_callback=progress_callback) vectors_10d = emb.reduce_dimensions(vectors_768, n_components=10) vectors_2d = emb.reduce_dimensions(vectors_768, n_components=2) db.save_2d_coords_bulk([ {"paper_id": pid, "coords": coord.tolist()} for pid, coord in zip(paper_ids, vectors_2d) ]) np.save(VECTORS_PATH, vectors_10d) with open(VECTORS_IDS_PATH, "w") as f: json.dump(paper_ids, f) return { "n_papers": len(papers), "embedding_dim": int(vectors_768.shape[1]), "reduced_dim": int(vectors_10d.shape[1]), "vectors_path": VECTORS_PATH, } def _load_cached_vectors() -> tuple[np.ndarray, list[str]]: if not os.path.exists(VECTORS_PATH): raise RuntimeError( "No embeddings found. Run the Embed step first." ) vectors = np.load(VECTORS_PATH) with open(VECTORS_IDS_PATH) as f: paper_ids = json.load(f) return vectors, paper_ids def hdbscan_grid_sweep(grid: list[dict]) -> list[dict]: vectors, _ = _load_cached_vectors() out = [] for cfg in grid: c = hdbscan.HDBSCAN( min_cluster_size=cfg.get("min_cluster_size", 8), min_samples=cfg.get("min_samples", 3), max_cluster_size=cfg.get("max_cluster_size", 0), metric="euclidean", cluster_selection_method=cfg.get("cluster_selection_method", "eom"), ) labels = c.fit_predict(vectors) n_clusters = int(len(set(labels)) - (1 if -1 in labels else 0)) n_noise = int((labels == -1).sum()) sizes = sorted( [int((labels == cid).sum()) for cid in set(labels) if cid != -1], reverse=True, ) out.append({ "config": cfg, "n_clusters": n_clusters, "n_noise": n_noise, "noise_pct": round(100 * n_noise / max(len(labels), 1), 1), "cluster_sizes": sizes, }) return out def run_clustering(params: dict, notes: str = "") -> int: vectors, paper_ids = _load_cached_vectors() clusterer = hdbscan.HDBSCAN( min_cluster_size=params.get("min_cluster_size", 8), min_samples=params.get("min_samples", 3), max_cluster_size=params.get("max_cluster_size", 0), metric="euclidean", cluster_selection_method=params.get("cluster_selection_method", "eom"), ) labels = clusterer.fit_predict(vectors).tolist() probs = clusterer.probabilities_.tolist() n_clusters = int(len(set(labels)) - (1 if -1 in labels else 0)) n_noise = int(sum(1 for l in labels if l == -1)) run_id = db.create_clustering_run(params, n_clusters, n_noise, notes) db.save_cluster_assignments(run_id, paper_ids, labels, probs) return run_id def top_representative_papers(run_id: int, n: int = 3) -> dict[int, list[dict]]: rows = db.get_cluster_assignments(run_id) by_cluster: dict[int, list[dict]] = {} for r in rows: if r["cluster_id"] == -1: continue by_cluster.setdefault(r["cluster_id"], []).append(r) out = {} for cid, papers in by_cluster.items(): papers_sorted = sorted( papers, key=lambda p: p["membership_prob"], reverse=True ) out[cid] = papers_sorted[:n] return out def upsert_cluster_label(run_id: int, cluster_id: int, label: str, subject: str = "", object_: str = "", phenomenon: str = "", rationale: str = "", top_paper_ids: list[str] | None = None, validated_by_researcher: bool = False, locked: bool = False, researcher_notes: str = "") -> None: db.save_cluster_label( run_id=run_id, cluster_id=cluster_id, label=label, subject=subject, object_=object_, phenomenon=phenomenon, rationale=rationale, top_paper_ids=top_paper_ids, validated_by_researcher=validated_by_researcher, locked=locked, researcher_notes=researcher_notes, ) def list_clustering_runs() -> list[dict]: return db.list_clustering_runs() def cluster_summary_for_run(run_id: int) -> list[dict]: assignments = db.get_cluster_assignments(run_id) labels = {l["cluster_id"]: l for l in db.get_cluster_labels(run_id)} by_cluster: dict[int, list[dict]] = {} for r in assignments: by_cluster.setdefault(r["cluster_id"], []).append(r) summaries = [] for cid, papers in sorted(by_cluster.items()): if cid == -1: summaries.append({ "cluster_id": -1, "label": "(noise)", "n_papers": len(papers), "subject": "", "object": "", "phenomenon": "", "validated": False, "locked": False, "rationale": "Papers HDBSCAN could not place in any cluster", }) continue lab = labels.get(cid, {}) summaries.append({ "cluster_id": cid, "label": lab.get("label", ""), "n_papers": len(papers), "subject": lab.get("subject", ""), "object": lab.get("object", ""), "phenomenon": lab.get("phenomenon", ""), "validated": lab.get("validated_by_researcher", False), "locked": lab.get("locked", False), "rationale": lab.get("rationale", ""), "researcher_notes": lab.get("researcher_notes", ""), }) return summaries def export_clustering_run_csv(run_id: int) -> tuple[str, str]: assignments = db.get_cluster_assignments(run_id) labels = {l["cluster_id"]: l for l in db.get_cluster_labels(run_id)} paper_lookup = {p["paper_id"]: p for p in db.get_all_papers()} pbuf = io.StringIO() pwriter = csv.writer(pbuf) pwriter.writerow([ "paper_id", "doi", "title", "year", "author_keywords", "cluster_id", "cluster_label", "membership_prob", ]) for r in assignments: cid = r["cluster_id"] lab = labels.get(cid, {}).get("label", "noise" if cid == -1 else "") p = paper_lookup.get(r["paper_id"], {}) pwriter.writerow([ r["paper_id"], p.get("doi", ""), r["title"], r["year"], p.get("author_keywords", ""), cid, lab, r["membership_prob"], ]) cbuf = io.StringIO() cwriter = csv.writer(cbuf) cwriter.writerow([ "cluster_id", "label", "n_papers", "subject", "object", "phenomenon", "validated_by_researcher", "locked", "rationale", "researcher_notes", ]) for s in cluster_summary_for_run(run_id): cwriter.writerow([ s["cluster_id"], s["label"], s["n_papers"], s["subject"], s["object"], s["phenomenon"], s.get("validated", False), s.get("locked", False), s["rationale"], s.get("researcher_notes", ""), ]) return pbuf.getvalue(), cbuf.getvalue() # ============================================================ # TCCM PIPELINE # ============================================================ _compiled_patterns: dict | None = None def _patterns(): global _compiled_patterns if _compiled_patterns is None: _compiled_patterns = tccm.load_patterns() return _compiled_patterns def tccm_pattern_summary() -> dict[str, int]: return tccm.pattern_set_summary(_patterns()) def run_tccm_screening(notes: str = "") -> int: """ Run the deterministic TCCM screener on every paper in Supabase. Persists a tccm_runs row plus tccm_classifications rows. Returns the new run_id. """ papers = db.get_all_papers() if not papers: raise ValueError("No papers in database. Ingest a CSV first.") rows = tccm.screen_corpus(papers, _patterns()) summary = tccm.summarise_run(rows) run_id = db.create_tccm_run( pattern_set_version=tccm.DEFAULT_PATTERN_VERSION, threshold_rule="anti_dominates", n_papers=summary["total"], n_include=summary["INCLUDE"], n_exclude=summary["EXCLUDE"], n_marginal=summary["MARGINAL"], notes=notes, ) db.save_tccm_classifications(run_id, rows) return run_id def list_tccm_runs() -> list[dict]: return db.list_tccm_runs() def get_tccm_run_papers(run_id: int, verdict_filter: str | None = None) -> list[dict]: return db.get_tccm_classifications(run_id, verdict_filter) def save_tccm_marginal_review(run_id: int, paper_id: str, agent_verdict: str, agent_rationale: str, researcher_verdict: str | None, researcher_notes: str = "") -> None: db.save_marginal_review( run_id=run_id, paper_id=paper_id, agent_verdict=agent_verdict, agent_rationale=agent_rationale, researcher_verdict=researcher_verdict, researcher_notes=researcher_notes, ) def export_tccm_run_csv(run_id: int) -> tuple[str, str, str]: rows = db.get_tccm_classifications(run_id) papers_index = {p["paper_id"]: p for p in db.get_all_papers()} primary_csv, signal_csv = tccm.export_run_to_csv(rows, papers_index) summary = {"total": len(rows), "INCLUDE": 0, "EXCLUDE": 0, "MARGINAL": 0} for r in rows: if r["verdict"] in summary: summary[r["verdict"]] += 1 summary_text = ( f"TCCM screening summary (run {run_id})\n" f" pattern set: {tccm.DEFAULT_PATTERN_VERSION}\n" f" threshold rule: anti-signals must equal or exceed primary signals to EXCLUDE\n\n" f" total papers: {summary['total']}\n" f" INCLUDE: {summary['INCLUDE']}\n" f" EXCLUDE: {summary['EXCLUDE']}\n" f" MARGINAL: {summary['MARGINAL']}\n" ) return primary_csv, signal_csv, summary_text # ============================================================ # PDF DOWNLOADER PIPELINE # ============================================================ import pdf_downloader as pdfd def run_pdf_discovery(email: str, include_arxiv: bool = True, include_scihub: bool = False, rate_limit_per_sec: float = 2.0, progress_callback=None) -> dict: """ Query Unpaywall (and optionally arXiv, Sci-Hub) for every paper in the database. Saves results to the pdf_downloads table. """ papers = db.get_all_papers() if not papers: raise ValueError("No papers in database. Ingest a CSV first.") if not email or "@" not in email: raise ValueError("Valid email required for Unpaywall API.") discoveries = pdfd.bulk_discover( papers=papers, email=email, include_arxiv=include_arxiv, include_scihub=include_scihub, rate_limit_per_sec=rate_limit_per_sec, progress_callback=progress_callback, ) db.save_pdf_discoveries(discoveries) n_oa = sum(1 for d in discoveries if d.get("pdf_url")) n_via_unpw = sum(1 for d in discoveries if d.get("source") == "unpaywall") n_via_arxiv = sum(1 for d in discoveries if d.get("source") == "arxiv") n_via_scihub = sum(1 for d in discoveries if d.get("source") == "scihub") return { "n_papers": len(discoveries), "n_pdf_found": n_oa, "n_unpaywall": n_via_unpw, "n_arxiv": n_via_arxiv, "n_scihub": n_via_scihub, "n_no_pdf": len(discoveries) - n_oa, } def run_pdf_download(email: str, skip_existing: bool = True, push_to_supabase: bool = True, rate_limit_per_sec: float = 2.0, progress_callback=None) -> dict: """ Download PDFs for every paper that has a discovered pdf_url. Updates the pdf_downloads table with status. """ if not email or "@" not in email: raise ValueError("Valid email required for downloads.") rows = db.get_pdf_status() candidates = [ {"paper_id": r["paper_id"], "pdf_url": r.get("pdf_url"), "source": r.get("source")} for r in rows if r.get("pdf_url") ] if not candidates: raise ValueError("No discovered PDFs to download. Run discovery first.") results = pdfd.bulk_download( discoveries=candidates, email=email, skip_existing=skip_existing, push_to_supabase=push_to_supabase, rate_limit_per_sec=rate_limit_per_sec, progress_callback=progress_callback, ) db.save_pdf_downloads(results) n_ok = sum(1 for r in results if r.get("downloaded")) n_fail = sum(1 for r in results if not r.get("downloaded")) n_pushed = sum(1 for r in results if r.get("uploaded_to_supabase")) total_mb = sum((r.get("file_size") or 0) for r in results) / (1024 ** 2) return { "n_attempted": len(results), "n_downloaded": n_ok, "n_failed": n_fail, "n_uploaded_to_supabase": n_pushed, "total_size_mb": round(total_mb, 1), } def run_crossref_enrichment(email: str, rate_limit_per_sec: float = 2.0, progress_callback=None) -> dict: """ Fetch Crossref metadata (citation count, abstract, references) for every paper with a DOI. Saves to crossref_metadata table. """ papers = db.get_all_papers() papers_with_doi = [p for p in papers if p.get("doi")] if not papers_with_doi: raise ValueError("No papers with DOIs in database.") results = pdfd.bulk_crossref( papers=papers_with_doi, email=email, rate_limit_per_sec=rate_limit_per_sec, progress_callback=progress_callback, ) db.save_crossref_metadata(results) n_ok = sum(1 for r in results if not r.get("error")) n_with_abstract = sum(1 for r in results if r.get("abstract")) n_with_citations = sum(1 for r in results if r.get("citation_count") is not None) return { "n_attempted": len(results), "n_enriched": n_ok, "n_with_abstract": n_with_abstract, "n_with_citations": n_with_citations, } def list_pdf_status() -> list[dict]: return db.list_pdfs_with_metadata() def export_pdf_zip() -> str: return pdfd.build_pdf_zip("/tmp/all_pdfs.zip")