Spaces:
Running
Running
| """ | |
| tools.py | |
| -------- | |
| Topic-modelling pipeline: SPECTER-2 → UMAP → HDBSCAN | |
| with multi-objective Bayesian optimisation over UMAP + HDBSCAN | |
| parameters (§3.1–§3.6 of the methodology guide). | |
| No BERTopic wrapper — bare UMAP + HDBSCAN on SPECTER-2 embeddings. | |
| """ | |
| import re | |
| import logging | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Optional | |
| from collections import Counter, defaultdict | |
| from sentence_transformers import SentenceTransformer | |
| from umap import UMAP | |
| from hdbscan import HDBSCAN | |
| from sklearn.metrics import adjusted_rand_score | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import optuna | |
| # --------------------------------------------------------------------------- | |
| # Logging | |
| # --------------------------------------------------------------------------- | |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s | %(message)s") | |
| logger = logging.getLogger(__name__) | |
| optuna.logging.set_verbosity(optuna.logging.WARNING) | |
| # --------------------------------------------------------------------------- | |
| # Data Loading (unchanged) | |
| # --------------------------------------------------------------------------- | |
| def load_csv(filepath: str) -> pd.DataFrame: | |
| df = pd.read_csv(filepath) | |
| df.columns = df.columns.str.lower() | |
| required = {"title", "abstract"} | |
| missing = required - set(df.columns) | |
| if missing: | |
| raise ValueError(f"CSV missing column(s): {missing}") | |
| logger.info("Loaded %d rows from '%s'.", len(df), filepath) | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # §3.1 — Input unit: title + abstract concatenation | |
| # --------------------------------------------------------------------------- | |
| def prepare_documents(df: pd.DataFrame) -> list[str]: | |
| """One string per paper: title + abstract (§3.1 input unit).""" | |
| docs = (df["title"].fillna("") + ". " + df["abstract"].fillna("")).tolist() | |
| logger.info("Prepared %d title+abstract documents.", len(docs)) | |
| return docs | |
| # --------------------------------------------------------------------------- | |
| # §3.1 — Embed with SPECTER-2 (cached model for speed) | |
| # --------------------------------------------------------------------------- | |
| _MODEL_CACHE = {} | |
| def embed_documents( | |
| docs: list[str], | |
| model_name: str = "allenai/specter2_base", | |
| ) -> np.ndarray: | |
| """Embed with SPECTER-2. Deterministic — no tuning (§3.3).""" | |
| if model_name not in _MODEL_CACHE: | |
| logger.info("Loading %s (first time, will be cached)…", model_name) | |
| _MODEL_CACHE[model_name] = SentenceTransformer(model_name) | |
| model = _MODEL_CACHE[model_name] | |
| embeddings = model.encode(docs, show_progress_bar=True, batch_size=64) | |
| logger.info("Embedded %d docs → %s", len(docs), embeddings.shape) | |
| return embeddings | |
| # --------------------------------------------------------------------------- | |
| # §3.2 — Cluster discipline checks | |
| # --------------------------------------------------------------------------- | |
| def check_discipline(labels: np.ndarray, n_docs: int) -> dict: | |
| """Two hard constraints: max-mass ≤ 25 %, min-size ≥ 5.""" | |
| counts = Counter(int(l) for l in labels) | |
| unique = [l for l in counts if l != -1] | |
| if not unique: | |
| return dict(max_mass_pct=0, max_mass_ok=False, | |
| min_size=0, min_size_ok=False, | |
| n_clusters=0, n_noise=counts.get(-1, 0)) | |
| max_mass_pct = max(counts[l] / n_docs for l in unique) | |
| min_size = min(counts[l] for l in unique) | |
| return dict( | |
| max_mass_pct=round(max_mass_pct, 4), | |
| max_mass_ok=max_mass_pct <= 0.25, | |
| min_size=int(min_size), | |
| min_size_ok=min_size >= 5, | |
| n_clusters=len(unique), | |
| n_noise=counts.get(-1, 0), | |
| cluster_sizes={l: counts[l] for l in sorted(unique)}, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # §3.4 — Quality metrics | |
| # --------------------------------------------------------------------------- | |
| def compute_persistence(clusterer: HDBSCAN) -> float: | |
| """Average cluster persistence from the condensed tree.""" | |
| try: | |
| p = getattr(clusterer, "cluster_persistence_", None) | |
| if p is not None and len(p) > 0: | |
| return float(np.mean(p)) | |
| except Exception: | |
| pass | |
| return 0.0 | |
| def per_cluster_persistence(clusterer: HDBSCAN, labels: np.ndarray) -> dict: | |
| """Map each cluster ID to its persistence score (§8).""" | |
| try: | |
| p = getattr(clusterer, "cluster_persistence_", None) | |
| if p is None or len(p) == 0: | |
| return {} | |
| unique = sorted(set(int(l) for l in labels if l != -1)) | |
| return {cid: float(p[i]) if i < len(p) else 0.0 | |
| for i, cid in enumerate(unique)} | |
| except Exception: | |
| return {} | |
| def compute_dbcv(reduced: np.ndarray, labels: np.ndarray) -> float: | |
| """Density-Based Cluster Validity index.""" | |
| try: | |
| from hdbscan.validity import validity_index | |
| ul = set(labels); ul.discard(-1) | |
| if len(ul) < 2: | |
| return -1.0 | |
| return float(validity_index(reduced.astype(np.float64), labels)) | |
| except Exception as e: | |
| logger.warning("DBCV failed: %s", e) | |
| return -1.0 | |
| def compute_stability(embeddings: np.ndarray, params: dict, | |
| n_seeds: int = 3) -> float: | |
| """Cluster-recurrence stability via pairwise ARI across seeds (§3.4). | |
| Uses 3 seeds by default for speed (spec allows 3–5).""" | |
| all_labels = [] | |
| for s in range(n_seeds): | |
| u = UMAP(n_neighbors=params["n_neighbors"], | |
| n_components=params["n_components"], | |
| min_dist=0.0, metric="cosine", | |
| random_state=s * 7 + 1, low_memory=True) | |
| red = u.fit_transform(embeddings) | |
| h = HDBSCAN(min_cluster_size=params["min_cluster_size"], | |
| min_samples=params["min_samples"], | |
| metric="euclidean", | |
| cluster_selection_method=params["csm"], | |
| cluster_selection_epsilon=params["cse"]) | |
| all_labels.append(h.fit_predict(red)) | |
| aris = [] | |
| for i in range(len(all_labels)): | |
| for j in range(i + 1, len(all_labels)): | |
| aris.append(adjusted_rand_score(all_labels[i], all_labels[j])) | |
| return float(np.mean(aris)) if aris else 0.0 | |
| # --------------------------------------------------------------------------- | |
| # §3.4 — Bayesian optimisation objective | |
| # --------------------------------------------------------------------------- | |
| def _objective(trial, embeddings, n_docs): | |
| """Single Optuna trial — returns (persistence, dbcv, stability_placeholder).""" | |
| n_neighbors = trial.suggest_categorical("n_neighbors", [5, 10, 15, 30, 50]) | |
| n_components = trial.suggest_int("n_components", 5, 10) | |
| mcs = trial.suggest_int( | |
| "min_cluster_size", | |
| max(5, int(0.01 * n_docs)), | |
| max(20, int(0.05 * n_docs)), | |
| ) | |
| ms = trial.suggest_int("min_samples", 1, mcs) | |
| csm = trial.suggest_categorical("csm", ["eom", "leaf"]) | |
| cse = trial.suggest_float("cse", 0.0, 0.3, step=0.05) | |
| params = dict(n_neighbors=n_neighbors, n_components=n_components, | |
| min_cluster_size=mcs, min_samples=ms, csm=csm, cse=cse) | |
| u = UMAP(n_neighbors=n_neighbors, n_components=n_components, | |
| min_dist=0.0, metric="cosine", random_state=42, | |
| low_memory=True) | |
| red = u.fit_transform(embeddings) | |
| h = HDBSCAN(min_cluster_size=mcs, min_samples=ms, metric="euclidean", | |
| cluster_selection_method=csm, | |
| cluster_selection_epsilon=cse, | |
| allow_single_cluster=False, gen_min_span_tree=True) | |
| labels = h.fit_predict(red) | |
| disc = check_discipline(labels, n_docs) | |
| trial.set_user_attr("params", params) | |
| trial.set_user_attr("discipline", disc) | |
| trial.set_user_attr("labels", labels.tolist()) | |
| # Hard-constraint violation → worst scores | |
| if not disc["max_mass_ok"] or not disc["min_size_ok"]: | |
| trial.set_user_attr("pass", False) | |
| return 0.0, -1.0, 0.0 | |
| trial.set_user_attr("pass", True) | |
| pers = compute_persistence(h) | |
| dbcv = compute_dbcv(red, labels) | |
| trial.set_user_attr("persistence", pers) | |
| trial.set_user_attr("dbcv", dbcv) | |
| return pers, dbcv, 0.5 # stability computed only for winner | |
| # --------------------------------------------------------------------------- | |
| # §3.4 — Run the full Bayesian loop | |
| # --------------------------------------------------------------------------- | |
| def run_bayesian_optimisation( | |
| embeddings: np.ndarray, | |
| n_trials: int = 50, | |
| progress_callback=None, | |
| ) -> dict: | |
| n_docs = len(embeddings) | |
| study = optuna.create_study( | |
| directions=["maximize", "maximize", "maximize"], | |
| sampler=optuna.samplers.TPESampler(seed=42, multivariate=True), | |
| study_name="specter2_umap_hdbscan", | |
| ) | |
| trial_log = [] | |
| def _cb(study, trial): | |
| d = trial.user_attrs.get("discipline", {}) | |
| entry = dict( | |
| trial=trial.number, | |
| params=trial.user_attrs.get("params", {}), | |
| discipline_pass=trial.user_attrs.get("pass", False), | |
| persistence=trial.user_attrs.get("persistence", 0.0), | |
| dbcv=trial.user_attrs.get("dbcv", -1.0), | |
| n_clusters=d.get("n_clusters", 0), | |
| max_mass_pct=d.get("max_mass_pct", 0.0), | |
| min_size=d.get("min_size", 0), | |
| n_noise=d.get("n_noise", 0), | |
| values=list(trial.values) if trial.values else [], | |
| ) | |
| trial_log.append(entry) | |
| if progress_callback: | |
| progress_callback(trial.number + 1, n_trials, entry) | |
| for i in range(n_trials): | |
| study.optimize( | |
| lambda t: _objective(t, embeddings, n_docs), | |
| n_trials=1, callbacks=[_cb], show_progress_bar=False, | |
| ) | |
| # §3.6 convergence: 3 consecutive passing within 5 % of best | |
| passing = [e for e in trial_log if e["discipline_pass"]] | |
| if len(passing) >= 3 and i >= 9: # allow early stop after 10 trials | |
| best_p = max(e["persistence"] for e in passing) | |
| if best_p > 0: | |
| last3 = passing[-3:] | |
| if all(abs(e["persistence"] - best_p) / best_p < 0.05 | |
| for e in last3): | |
| logger.info("Converged at trial %d.", i + 1) | |
| break | |
| # Select best passing trial (max persistence, then DBCV) | |
| passing_trials = [t for t in study.trials | |
| if t.user_attrs.get("pass", False)] | |
| if passing_trials: | |
| best = max(passing_trials, key=lambda t: (t.values[0], t.values[1])) | |
| else: | |
| logger.warning("No trial passed discipline — using last trial.") | |
| best = study.trials[-1] | |
| bp = best.user_attrs["params"] | |
| labels = np.array(best.user_attrs["labels"]) | |
| stability = compute_stability(embeddings, bp, n_seeds=3) | |
| return dict( | |
| best_params=bp, best_labels=labels, | |
| best_trial=best.number, | |
| persistence=best.user_attrs.get("persistence", 0.0), | |
| dbcv=best.user_attrs.get("dbcv", -1.0), | |
| stability=stability, | |
| discipline=best.user_attrs.get("discipline", {}), | |
| trial_log=trial_log, | |
| n_trials_run=len(trial_log), | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # §3.1 — 2-D UMAP for visualisation | |
| # --------------------------------------------------------------------------- | |
| def compute_2d_umap(embeddings: np.ndarray, seed: int = 42) -> np.ndarray: | |
| return UMAP(n_neighbors=15, n_components=2, min_dist=0.1, | |
| metric="cosine", random_state=seed, | |
| low_memory=True).fit_transform(embeddings) | |
| # --------------------------------------------------------------------------- | |
| # §3.1 — TF-IDF keyphrase extraction per cluster (3–5 phrases) | |
| # Fast alternative to KeyBERT — no extra model download needed. | |
| # --------------------------------------------------------------------------- | |
| def extract_keyphrases(docs: list[str], labels: np.ndarray, | |
| top_n: int = 5) -> dict: | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| cluster_docs = defaultdict(list) | |
| for doc, lab in zip(docs, labels): | |
| if lab != -1: | |
| cluster_docs[int(lab)].append(doc) | |
| out = {} | |
| for cid, cdocs in cluster_docs.items(): | |
| if len(cdocs) < 2: | |
| out[cid] = [] | |
| continue | |
| try: | |
| tfidf = TfidfVectorizer( | |
| stop_words="english", max_features=200, | |
| ngram_range=(1, 3), max_df=0.9, min_df=1) | |
| X = tfidf.fit_transform(cdocs) | |
| terms = tfidf.get_feature_names_out() | |
| scores = X.sum(axis=0).A1 | |
| top_idx = scores.argsort()[::-1][:top_n] | |
| out[cid] = [(terms[i], float(scores[i])) for i in top_idx] | |
| except Exception as e: | |
| logger.warning("Keyphrase extraction cluster %d: %s", cid, e) | |
| out[cid] = [] | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # §3.1 — Strong / weak member counts via HDBSCAN probabilities | |
| # --------------------------------------------------------------------------- | |
| def strong_weak_members(labels: np.ndarray, | |
| probabilities: np.ndarray) -> dict: | |
| mem = defaultdict(lambda: {"strong": 0, "weak": 0}) | |
| for lab, prob in zip(labels, probabilities): | |
| if lab == -1: | |
| continue | |
| cid = int(lab) | |
| if prob >= 0.5: | |
| mem[cid]["strong"] += 1 | |
| else: | |
| mem[cid]["weak"] += 1 | |
| return dict(mem) | |
| # --------------------------------------------------------------------------- | |
| # §3.2 — Outlier reduction: reassign noise to nearest cluster (≤ 25 %) | |
| # --------------------------------------------------------------------------- | |
| def outlier_reduction(labels: np.ndarray, reduced: np.ndarray, | |
| n_docs: int) -> np.ndarray: | |
| labels = labels.copy() | |
| cap = int(0.25 * n_docs) | |
| cdocs = defaultdict(list) | |
| for i, l in enumerate(labels): | |
| if l != -1: | |
| cdocs[int(l)].append(i) | |
| if not cdocs: | |
| return labels | |
| cids = list(cdocs.keys()) | |
| centroids = np.vstack([reduced[cdocs[c]].mean(axis=0) for c in cids]) | |
| noise = [i for i, l in enumerate(labels) if l == -1] | |
| moved = 0 | |
| for idx in noise: | |
| dists = np.linalg.norm(centroids - reduced[idx], axis=1) | |
| for best in np.argsort(dists): | |
| tgt = cids[best] | |
| if len(cdocs[tgt]) < cap: | |
| labels[idx] = tgt | |
| cdocs[tgt].append(idx) | |
| moved += 1 | |
| break | |
| logger.info("Outlier reduction: %d / %d noise reassigned.", moved, len(noise)) | |
| return labels | |
| # --------------------------------------------------------------------------- | |
| # Representative docs (top-3 by centroid proximity) | |
| # --------------------------------------------------------------------------- | |
| def get_representative_docs(labels, embeddings, docs, top_n=3): | |
| cdocs = defaultdict(list) | |
| for i, l in enumerate(labels): | |
| if l != -1: | |
| cdocs[int(l)].append(i) | |
| out = {} | |
| for cid, idxs in cdocs.items(): | |
| ce = embeddings[idxs].mean(axis=0).reshape(1, -1) | |
| sims = cosine_similarity(ce, embeddings[idxs])[0] | |
| top = np.argsort(sims)[-top_n:][::-1] | |
| out[cid] = [docs[idxs[t]] for t in top] | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # §9 — RQ2 / RQ3 mismatch table | |
| # --------------------------------------------------------------------------- | |
| def build_mismatch_table(keyphrases: dict, cluster_labels: dict) -> list: | |
| """Compare cluster keyphrases against assigned labels to flag mismatches. | |
| Returns rows for a mismatch table (§9).""" | |
| rows = [] | |
| for cid in sorted(keyphrases.keys()): | |
| kps = keyphrases.get(cid, []) | |
| kp_terms = [k[0] if isinstance(k, tuple) else k for k in kps[:5]] | |
| label = cluster_labels.get(cid, "") | |
| # Check overlap between label words and keyphrase terms | |
| label_words = set(label.lower().split()) | |
| kp_words = set(" ".join(kp_terms).lower().split()) | |
| overlap = label_words & kp_words | |
| noise = {"the","and","for","with","using","based","from","in","of","a","to"} | |
| overlap -= noise | |
| match_pct = len(overlap) / max(len(label_words - noise), 1) | |
| status = "MATCH" if match_pct >= 0.3 else "MISMATCH" | |
| rows.append(dict( | |
| cluster=cid, label=label, | |
| keyphrases=", ".join(kp_terms), | |
| overlap=", ".join(overlap) if overlap else "—", | |
| match_pct=round(match_pct * 100), | |
| status=status, | |
| )) | |
| return rows | |
| # --------------------------------------------------------------------------- | |
| # High-level pipeline entry point | |
| # --------------------------------------------------------------------------- | |
| def run_topic_modeling(filepath: str, n_trials: int = 50, | |
| progress_callback=None) -> dict: | |
| # 1. Load | |
| df = load_csv(filepath) | |
| docs = prepare_documents(df) | |
| n_docs = len(docs) | |
| # 2. Embed (deterministic) | |
| embeddings = embed_documents(docs) | |
| # 3. Bayesian optimisation (§3.4) | |
| opt = run_bayesian_optimisation(embeddings, n_trials, progress_callback) | |
| bp = opt["best_params"] | |
| labels = opt["best_labels"] | |
| # 4. Re-run winner for clusterer object (probabilities) | |
| u = UMAP(n_neighbors=bp["n_neighbors"], n_components=bp["n_components"], | |
| min_dist=0.0, metric="cosine", random_state=42) | |
| red = u.fit_transform(embeddings) | |
| h = HDBSCAN(min_cluster_size=bp["min_cluster_size"], | |
| min_samples=bp["min_samples"], metric="euclidean", | |
| cluster_selection_method=bp["csm"], | |
| cluster_selection_epsilon=bp["cse"], | |
| allow_single_cluster=False, | |
| gen_min_span_tree=True) | |
| h.fit(red) | |
| # Per-cluster persistence (§8) | |
| cluster_pers = per_cluster_persistence(h, labels) | |
| # 5. Outlier reduction (§3.2 — clusters < 5 reassigned) | |
| labels = outlier_reduction(labels, red, n_docs) | |
| # 6. Strong / weak (§3.1) | |
| sw = strong_weak_members(labels, h.probabilities_) | |
| # 7. 2-D UMAP (§3.1) | |
| umap_2d = compute_2d_umap(embeddings) | |
| # 8. KeyBERT keyphrases (§3.1) | |
| keyphrases = extract_keyphrases(docs, labels) | |
| # 9. Rep docs | |
| rep_docs = get_representative_docs(labels, embeddings, docs) | |
| # 10. Final discipline | |
| disc = check_discipline(labels, n_docs) | |
| return dict( | |
| documents=docs, labels=labels.tolist(), | |
| keyphrases=keyphrases, representative_docs=rep_docs, | |
| membership=sw, umap_2d=umap_2d.tolist(), | |
| discipline=disc, best_params=bp, | |
| cluster_persistence=cluster_pers, | |
| metrics=dict(persistence=opt["persistence"], | |
| dbcv=opt["dbcv"], | |
| stability=opt["stability"]), | |
| trial_log=opt["trial_log"], | |
| n_trials_run=opt["n_trials_run"], | |
| best_trial=opt["best_trial"], | |
| n_docs=n_docs, | |
| embeddings=embeddings, | |
| ) | |