Spaces:
Sleeping
Sleeping
| """ | |
| tools.py | |
| Deterministic operations the UI calls. Shared across both pipelines for | |
| CSV ingest; embedding-specific operations and TCCM-specific operations | |
| are clearly grouped within. | |
| No LLM calls here. The agent module is invoked from app.py only. | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import csv | |
| import json | |
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import hdbscan | |
| import database as db | |
| import embeddings as emb | |
| import tccm_engine as tccm | |
| # ============================================================ | |
| # SHARED: CSV ingest | |
| # ============================================================ | |
| REQUIRED_COLS = {"title", "abstract"} | |
| OPTIONAL_COLS = {"doi", "year", "author_keywords"} | |
| # Scopus exports use capitalised, space-separated column names. Map them | |
| # to the lowercase snake_case schema the app uses. Detection is by | |
| # fingerprint: if the CSV has these specific Scopus columns, rename | |
| # silently; otherwise leave alone (user already used the lowercase names). | |
| SCOPUS_COLUMN_MAP = { | |
| "Title": "title", | |
| "Abstract": "abstract", | |
| "DOI": "doi", | |
| "Year": "year", | |
| "Author Keywords": "author_keywords", | |
| } | |
| def _normalise_columns(df: pd.DataFrame) -> pd.DataFrame: | |
| """If the CSV looks like a Scopus export, rename columns to the | |
| schema the app expects. Otherwise return unchanged.""" | |
| cols = set(df.columns) | |
| # Fingerprint: Scopus always exports both 'Title' and 'Abstract' | |
| # capitalised. If both are present, treat as Scopus export. | |
| if "Title" in cols and "Abstract" in cols: | |
| rename_map = {k: v for k, v in SCOPUS_COLUMN_MAP.items() if k in cols} | |
| df = df.rename(columns=rename_map) | |
| return df | |
| def ingest_csv(file_path: str, progress_callback=None) -> dict: | |
| """ | |
| Load a CSV and upsert papers into Supabase. | |
| Required columns: title, abstract (or Scopus-style 'Title', 'Abstract') | |
| Optional columns: doi, year, author_keywords (or 'DOI', 'Year', | |
| 'Author Keywords' from a Scopus export) | |
| Other columns are ignored. | |
| progress_callback: optional callable(fraction: float, desc: str) so the | |
| UI can render a real progress bar. fraction in [0.0, 1.0]. | |
| """ | |
| def report(frac, desc): | |
| if progress_callback is not None: | |
| progress_callback(frac, desc) | |
| report(0.05, "Reading CSV file") | |
| df = pd.read_csv(file_path, encoding="utf-8-sig") | |
| n_total_rows = len(df) | |
| report(0.15, f"CSV parsed: {n_total_rows} rows, {len(df.columns)} columns") | |
| df = _normalise_columns(df) | |
| report(0.20, "Column names normalised") | |
| missing = REQUIRED_COLS - set(df.columns) | |
| if missing: | |
| raise ValueError( | |
| f"CSV missing required columns: {missing}. " | |
| f"Expected either lowercase ('title','abstract') or " | |
| f"Scopus-style ('Title','Abstract')." | |
| ) | |
| for col in OPTIONAL_COLS: | |
| if col not in df.columns: | |
| df[col] = "" if col != "year" else None | |
| df = df.dropna(subset=["title"]).reset_index(drop=True) | |
| if len(df) == 0: | |
| raise ValueError("CSV has no rows with a non-empty title.") | |
| # Stable paper IDs based on row order | |
| df["paper_id"] = [f"P{i+1:04d}" for i in range(len(df))] | |
| # Coerce year to int when possible | |
| def _year_to_int(v): | |
| try: | |
| if pd.isna(v): | |
| return None | |
| return int(v) | |
| except (TypeError, ValueError): | |
| return None | |
| df["year"] = df["year"].apply(_year_to_int) | |
| # Replace pandas NaN with empty string in text columns; the database | |
| # layer also has a defensive scrubber, but cleaning at source means | |
| # downstream consumers (export, embedding) see consistent data. | |
| for col in ("doi", "title", "abstract", "author_keywords"): | |
| df[col] = df[col].fillna("").astype(str) | |
| report(0.25, f"Validated {len(df)} rows; preparing upload") | |
| rows = df[ | |
| ["paper_id", "doi", "title", "abstract", "year", "author_keywords"] | |
| ].to_dict("records") | |
| # Upload in chunks of 200 (smaller than db.upsert_papers default 500 | |
| # so the user sees movement on the progress bar) | |
| chunk_size = 200 | |
| n = len(rows) | |
| n_chunks = (n + chunk_size - 1) // chunk_size | |
| for i in range(0, n, chunk_size): | |
| chunk = rows[i:i + chunk_size] | |
| db.upsert_papers(chunk) | |
| chunk_idx = (i // chunk_size) + 1 | |
| # Map chunk progress into the 0.25 -> 0.99 range | |
| frac = 0.25 + (0.74 * chunk_idx / n_chunks) | |
| report(frac, f"Uploaded batch {chunk_idx}/{n_chunks} to Supabase ({i + len(chunk)}/{n} rows)") | |
| report(1.0, f"Done. {n} papers in database.") | |
| return { | |
| "n_papers_loaded": n, | |
| "first_id": rows[0]["paper_id"], | |
| "last_id": rows[-1]["paper_id"], | |
| } | |
| def list_papers() -> list[dict]: | |
| return db.get_all_papers() | |
| # ============================================================ | |
| # EMBEDDING PIPELINE | |
| # ============================================================ | |
| VECTORS_PATH = "vectors_10d.npy" | |
| VECTORS_IDS_PATH = "vectors_paper_ids.json" | |
| def embed_papers(progress_callback=None) -> dict: | |
| """ | |
| Run SPECTER 2 embedding on every paper in Supabase. Save 10D and 2D | |
| UMAP reductions; the 2D coords go into Supabase for the scatter plot, | |
| the 10D vectors are cached on local disk for the clustering step. | |
| """ | |
| papers = db.get_all_papers() | |
| if not papers: | |
| raise ValueError("No papers in database. Ingest a CSV first.") | |
| titles = [p["title"] for p in papers] | |
| abstracts = [p.get("abstract") or "" for p in papers] | |
| paper_ids = [p["paper_id"] for p in papers] | |
| vectors_768 = emb.embed_texts(titles, abstracts, | |
| progress_callback=progress_callback) | |
| vectors_10d = emb.reduce_dimensions(vectors_768, n_components=10) | |
| vectors_2d = emb.reduce_dimensions(vectors_768, n_components=2) | |
| db.save_2d_coords_bulk([ | |
| {"paper_id": pid, "coords": coord.tolist()} | |
| for pid, coord in zip(paper_ids, vectors_2d) | |
| ]) | |
| np.save(VECTORS_PATH, vectors_10d) | |
| with open(VECTORS_IDS_PATH, "w") as f: | |
| json.dump(paper_ids, f) | |
| return { | |
| "n_papers": len(papers), | |
| "embedding_dim": int(vectors_768.shape[1]), | |
| "reduced_dim": int(vectors_10d.shape[1]), | |
| "vectors_path": VECTORS_PATH, | |
| } | |
| def _load_cached_vectors() -> tuple[np.ndarray, list[str]]: | |
| if not os.path.exists(VECTORS_PATH): | |
| raise RuntimeError( | |
| "No embeddings found. Run the Embed step first." | |
| ) | |
| vectors = np.load(VECTORS_PATH) | |
| with open(VECTORS_IDS_PATH) as f: | |
| paper_ids = json.load(f) | |
| return vectors, paper_ids | |
| def hdbscan_grid_sweep(grid: list[dict]) -> list[dict]: | |
| vectors, _ = _load_cached_vectors() | |
| out = [] | |
| for cfg in grid: | |
| c = hdbscan.HDBSCAN( | |
| min_cluster_size=cfg.get("min_cluster_size", 8), | |
| min_samples=cfg.get("min_samples", 3), | |
| max_cluster_size=cfg.get("max_cluster_size", 0), | |
| metric="euclidean", | |
| cluster_selection_method=cfg.get("cluster_selection_method", "eom"), | |
| ) | |
| labels = c.fit_predict(vectors) | |
| n_clusters = int(len(set(labels)) - (1 if -1 in labels else 0)) | |
| n_noise = int((labels == -1).sum()) | |
| sizes = sorted( | |
| [int((labels == cid).sum()) for cid in set(labels) if cid != -1], | |
| reverse=True, | |
| ) | |
| out.append({ | |
| "config": cfg, | |
| "n_clusters": n_clusters, | |
| "n_noise": n_noise, | |
| "noise_pct": round(100 * n_noise / max(len(labels), 1), 1), | |
| "cluster_sizes": sizes, | |
| }) | |
| return out | |
| def run_clustering(params: dict, notes: str = "") -> int: | |
| vectors, paper_ids = _load_cached_vectors() | |
| clusterer = hdbscan.HDBSCAN( | |
| min_cluster_size=params.get("min_cluster_size", 8), | |
| min_samples=params.get("min_samples", 3), | |
| max_cluster_size=params.get("max_cluster_size", 0), | |
| metric="euclidean", | |
| cluster_selection_method=params.get("cluster_selection_method", "eom"), | |
| ) | |
| labels = clusterer.fit_predict(vectors).tolist() | |
| probs = clusterer.probabilities_.tolist() | |
| n_clusters = int(len(set(labels)) - (1 if -1 in labels else 0)) | |
| n_noise = int(sum(1 for l in labels if l == -1)) | |
| run_id = db.create_clustering_run(params, n_clusters, n_noise, notes) | |
| db.save_cluster_assignments(run_id, paper_ids, labels, probs) | |
| return run_id | |
| def top_representative_papers(run_id: int, n: int = 3) -> dict[int, list[dict]]: | |
| rows = db.get_cluster_assignments(run_id) | |
| by_cluster: dict[int, list[dict]] = {} | |
| for r in rows: | |
| if r["cluster_id"] == -1: | |
| continue | |
| by_cluster.setdefault(r["cluster_id"], []).append(r) | |
| out = {} | |
| for cid, papers in by_cluster.items(): | |
| papers_sorted = sorted( | |
| papers, key=lambda p: p["membership_prob"], reverse=True | |
| ) | |
| out[cid] = papers_sorted[:n] | |
| return out | |
| def upsert_cluster_label(run_id: int, cluster_id: int, label: str, | |
| subject: str = "", object_: str = "", | |
| phenomenon: str = "", rationale: str = "", | |
| top_paper_ids: list[str] | None = None, | |
| validated_by_researcher: bool = False, | |
| locked: bool = False, | |
| researcher_notes: str = "") -> None: | |
| db.save_cluster_label( | |
| run_id=run_id, cluster_id=cluster_id, label=label, | |
| subject=subject, object_=object_, phenomenon=phenomenon, | |
| rationale=rationale, top_paper_ids=top_paper_ids, | |
| validated_by_researcher=validated_by_researcher, locked=locked, | |
| researcher_notes=researcher_notes, | |
| ) | |
| def list_clustering_runs() -> list[dict]: | |
| return db.list_clustering_runs() | |
| def cluster_summary_for_run(run_id: int) -> list[dict]: | |
| assignments = db.get_cluster_assignments(run_id) | |
| labels = {l["cluster_id"]: l for l in db.get_cluster_labels(run_id)} | |
| by_cluster: dict[int, list[dict]] = {} | |
| for r in assignments: | |
| by_cluster.setdefault(r["cluster_id"], []).append(r) | |
| summaries = [] | |
| for cid, papers in sorted(by_cluster.items()): | |
| if cid == -1: | |
| summaries.append({ | |
| "cluster_id": -1, "label": "(noise)", | |
| "n_papers": len(papers), | |
| "subject": "", "object": "", "phenomenon": "", | |
| "validated": False, "locked": False, | |
| "rationale": "Papers HDBSCAN could not place in any cluster", | |
| }) | |
| continue | |
| lab = labels.get(cid, {}) | |
| summaries.append({ | |
| "cluster_id": cid, | |
| "label": lab.get("label", ""), | |
| "n_papers": len(papers), | |
| "subject": lab.get("subject", ""), | |
| "object": lab.get("object", ""), | |
| "phenomenon": lab.get("phenomenon", ""), | |
| "validated": lab.get("validated_by_researcher", False), | |
| "locked": lab.get("locked", False), | |
| "rationale": lab.get("rationale", ""), | |
| "researcher_notes": lab.get("researcher_notes", ""), | |
| }) | |
| return summaries | |
| def export_clustering_run_csv(run_id: int) -> tuple[str, str]: | |
| assignments = db.get_cluster_assignments(run_id) | |
| labels = {l["cluster_id"]: l for l in db.get_cluster_labels(run_id)} | |
| paper_lookup = {p["paper_id"]: p for p in db.get_all_papers()} | |
| pbuf = io.StringIO() | |
| pwriter = csv.writer(pbuf) | |
| pwriter.writerow([ | |
| "paper_id", "doi", "title", "year", "author_keywords", | |
| "cluster_id", "cluster_label", "membership_prob", | |
| ]) | |
| for r in assignments: | |
| cid = r["cluster_id"] | |
| lab = labels.get(cid, {}).get("label", | |
| "noise" if cid == -1 else "") | |
| p = paper_lookup.get(r["paper_id"], {}) | |
| pwriter.writerow([ | |
| r["paper_id"], p.get("doi", ""), r["title"], r["year"], | |
| p.get("author_keywords", ""), | |
| cid, lab, r["membership_prob"], | |
| ]) | |
| cbuf = io.StringIO() | |
| cwriter = csv.writer(cbuf) | |
| cwriter.writerow([ | |
| "cluster_id", "label", "n_papers", "subject", "object", | |
| "phenomenon", "validated_by_researcher", "locked", "rationale", | |
| "researcher_notes", | |
| ]) | |
| for s in cluster_summary_for_run(run_id): | |
| cwriter.writerow([ | |
| s["cluster_id"], s["label"], s["n_papers"], | |
| s["subject"], s["object"], s["phenomenon"], | |
| s.get("validated", False), s.get("locked", False), | |
| s["rationale"], s.get("researcher_notes", ""), | |
| ]) | |
| return pbuf.getvalue(), cbuf.getvalue() | |
| # ============================================================ | |
| # TCCM PIPELINE | |
| # ============================================================ | |
| _compiled_patterns: dict | None = None | |
| def _patterns(): | |
| global _compiled_patterns | |
| if _compiled_patterns is None: | |
| _compiled_patterns = tccm.load_patterns() | |
| return _compiled_patterns | |
| def tccm_pattern_summary() -> dict[str, int]: | |
| return tccm.pattern_set_summary(_patterns()) | |
| def run_tccm_screening(notes: str = "") -> int: | |
| """ | |
| Run the deterministic TCCM screener on every paper in Supabase. | |
| Persists a tccm_runs row plus tccm_classifications rows. | |
| Returns the new run_id. | |
| """ | |
| papers = db.get_all_papers() | |
| if not papers: | |
| raise ValueError("No papers in database. Ingest a CSV first.") | |
| rows = tccm.screen_corpus(papers, _patterns()) | |
| summary = tccm.summarise_run(rows) | |
| run_id = db.create_tccm_run( | |
| pattern_set_version=tccm.DEFAULT_PATTERN_VERSION, | |
| threshold_rule="anti_dominates", | |
| n_papers=summary["total"], | |
| n_include=summary["INCLUDE"], | |
| n_exclude=summary["EXCLUDE"], | |
| n_marginal=summary["MARGINAL"], | |
| notes=notes, | |
| ) | |
| db.save_tccm_classifications(run_id, rows) | |
| return run_id | |
| def list_tccm_runs() -> list[dict]: | |
| return db.list_tccm_runs() | |
| def get_tccm_run_papers(run_id: int, | |
| verdict_filter: str | None = None) -> list[dict]: | |
| return db.get_tccm_classifications(run_id, verdict_filter) | |
| def save_tccm_marginal_review(run_id: int, paper_id: str, | |
| agent_verdict: str, agent_rationale: str, | |
| researcher_verdict: str | None, | |
| researcher_notes: str = "") -> None: | |
| db.save_marginal_review( | |
| run_id=run_id, paper_id=paper_id, | |
| agent_verdict=agent_verdict, agent_rationale=agent_rationale, | |
| researcher_verdict=researcher_verdict, | |
| researcher_notes=researcher_notes, | |
| ) | |
| def export_tccm_run_csv(run_id: int) -> tuple[str, str, str]: | |
| rows = db.get_tccm_classifications(run_id) | |
| papers_index = {p["paper_id"]: p for p in db.get_all_papers()} | |
| primary_csv, signal_csv = tccm.export_run_to_csv(rows, papers_index) | |
| summary = {"total": len(rows), "INCLUDE": 0, "EXCLUDE": 0, "MARGINAL": 0} | |
| for r in rows: | |
| if r["verdict"] in summary: | |
| summary[r["verdict"]] += 1 | |
| summary_text = ( | |
| f"TCCM screening summary (run {run_id})\n" | |
| f" pattern set: {tccm.DEFAULT_PATTERN_VERSION}\n" | |
| f" threshold rule: anti-signals must equal or exceed primary signals to EXCLUDE\n\n" | |
| f" total papers: {summary['total']}\n" | |
| f" INCLUDE: {summary['INCLUDE']}\n" | |
| f" EXCLUDE: {summary['EXCLUDE']}\n" | |
| f" MARGINAL: {summary['MARGINAL']}\n" | |
| ) | |
| return primary_csv, signal_csv, summary_text | |
| # ============================================================ | |
| # PDF DOWNLOADER PIPELINE | |
| # ============================================================ | |
| import pdf_downloader as pdfd | |
| def run_pdf_discovery(email: str, include_arxiv: bool = True, | |
| include_scihub: bool = False, | |
| rate_limit_per_sec: float = 2.0, | |
| progress_callback=None) -> dict: | |
| """ | |
| Query Unpaywall (and optionally arXiv, Sci-Hub) for every paper in | |
| the database. Saves results to the pdf_downloads table. | |
| """ | |
| papers = db.get_all_papers() | |
| if not papers: | |
| raise ValueError("No papers in database. Ingest a CSV first.") | |
| if not email or "@" not in email: | |
| raise ValueError("Valid email required for Unpaywall API.") | |
| discoveries = pdfd.bulk_discover( | |
| papers=papers, email=email, | |
| include_arxiv=include_arxiv, include_scihub=include_scihub, | |
| rate_limit_per_sec=rate_limit_per_sec, | |
| progress_callback=progress_callback, | |
| ) | |
| db.save_pdf_discoveries(discoveries) | |
| n_oa = sum(1 for d in discoveries if d.get("pdf_url")) | |
| n_via_unpw = sum(1 for d in discoveries if d.get("source") == "unpaywall") | |
| n_via_arxiv = sum(1 for d in discoveries if d.get("source") == "arxiv") | |
| n_via_scihub = sum(1 for d in discoveries if d.get("source") == "scihub") | |
| return { | |
| "n_papers": len(discoveries), | |
| "n_pdf_found": n_oa, | |
| "n_unpaywall": n_via_unpw, | |
| "n_arxiv": n_via_arxiv, | |
| "n_scihub": n_via_scihub, | |
| "n_no_pdf": len(discoveries) - n_oa, | |
| } | |
| def run_pdf_download(email: str, skip_existing: bool = True, | |
| push_to_supabase: bool = True, | |
| rate_limit_per_sec: float = 2.0, | |
| progress_callback=None) -> dict: | |
| """ | |
| Download PDFs for every paper that has a discovered pdf_url. Updates | |
| the pdf_downloads table with status. | |
| """ | |
| if not email or "@" not in email: | |
| raise ValueError("Valid email required for downloads.") | |
| rows = db.get_pdf_status() | |
| candidates = [ | |
| {"paper_id": r["paper_id"], | |
| "pdf_url": r.get("pdf_url"), | |
| "source": r.get("source")} | |
| for r in rows if r.get("pdf_url") | |
| ] | |
| if not candidates: | |
| raise ValueError("No discovered PDFs to download. Run discovery first.") | |
| results = pdfd.bulk_download( | |
| discoveries=candidates, email=email, | |
| skip_existing=skip_existing, push_to_supabase=push_to_supabase, | |
| rate_limit_per_sec=rate_limit_per_sec, | |
| progress_callback=progress_callback, | |
| ) | |
| db.save_pdf_downloads(results) | |
| n_ok = sum(1 for r in results if r.get("downloaded")) | |
| n_fail = sum(1 for r in results if not r.get("downloaded")) | |
| n_pushed = sum(1 for r in results if r.get("uploaded_to_supabase")) | |
| total_mb = sum((r.get("file_size") or 0) for r in results) / (1024 ** 2) | |
| return { | |
| "n_attempted": len(results), | |
| "n_downloaded": n_ok, | |
| "n_failed": n_fail, | |
| "n_uploaded_to_supabase": n_pushed, | |
| "total_size_mb": round(total_mb, 1), | |
| } | |
| def run_crossref_enrichment(email: str, | |
| rate_limit_per_sec: float = 2.0, | |
| progress_callback=None) -> dict: | |
| """ | |
| Fetch Crossref metadata (citation count, abstract, references) for | |
| every paper with a DOI. Saves to crossref_metadata table. | |
| """ | |
| papers = db.get_all_papers() | |
| papers_with_doi = [p for p in papers if p.get("doi")] | |
| if not papers_with_doi: | |
| raise ValueError("No papers with DOIs in database.") | |
| results = pdfd.bulk_crossref( | |
| papers=papers_with_doi, email=email, | |
| rate_limit_per_sec=rate_limit_per_sec, | |
| progress_callback=progress_callback, | |
| ) | |
| db.save_crossref_metadata(results) | |
| n_ok = sum(1 for r in results if not r.get("error")) | |
| n_with_abstract = sum(1 for r in results if r.get("abstract")) | |
| n_with_citations = sum(1 for r in results if r.get("citation_count") is not None) | |
| return { | |
| "n_attempted": len(results), | |
| "n_enriched": n_ok, | |
| "n_with_abstract": n_with_abstract, | |
| "n_with_citations": n_with_citations, | |
| } | |
| def list_pdf_status() -> list[dict]: | |
| return db.list_pdfs_with_metadata() | |
| def export_pdf_zip() -> str: | |
| return pdfd.build_pdf_zip("/tmp/all_pdfs.zip") | |