""" tools.py -------- Topic-modelling pipeline: SPECTER-2 → UMAP → HDBSCAN with multi-objective Bayesian optimisation over UMAP + HDBSCAN parameters (§3.1–§3.6 of the methodology guide). No BERTopic wrapper — bare UMAP + HDBSCAN on SPECTER-2 embeddings. """ import re import logging import pandas as pd import numpy as np from typing import Optional from collections import Counter, defaultdict from sentence_transformers import SentenceTransformer from umap import UMAP from hdbscan import HDBSCAN from sklearn.metrics import adjusted_rand_score from sklearn.metrics.pairwise import cosine_similarity import optuna # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig(level=logging.INFO, format="%(levelname)s | %(message)s") logger = logging.getLogger(__name__) optuna.logging.set_verbosity(optuna.logging.WARNING) # --------------------------------------------------------------------------- # Data Loading (unchanged) # --------------------------------------------------------------------------- def load_csv(filepath: str) -> pd.DataFrame: df = pd.read_csv(filepath) df.columns = df.columns.str.lower() required = {"title", "abstract"} missing = required - set(df.columns) if missing: raise ValueError(f"CSV missing column(s): {missing}") logger.info("Loaded %d rows from '%s'.", len(df), filepath) return df # --------------------------------------------------------------------------- # §3.1 — Input unit: title + abstract concatenation # --------------------------------------------------------------------------- def prepare_documents(df: pd.DataFrame) -> list[str]: """One string per paper: title + abstract (§3.1 input unit).""" docs = (df["title"].fillna("") + ". " + df["abstract"].fillna("")).tolist() logger.info("Prepared %d title+abstract documents.", len(docs)) return docs # --------------------------------------------------------------------------- # §3.1 — Embed with SPECTER-2 (cached model for speed) # --------------------------------------------------------------------------- _MODEL_CACHE = {} def embed_documents( docs: list[str], model_name: str = "allenai/specter2_base", ) -> np.ndarray: """Embed with SPECTER-2. Deterministic — no tuning (§3.3).""" if model_name not in _MODEL_CACHE: logger.info("Loading %s (first time, will be cached)…", model_name) _MODEL_CACHE[model_name] = SentenceTransformer(model_name) model = _MODEL_CACHE[model_name] embeddings = model.encode(docs, show_progress_bar=True, batch_size=64) logger.info("Embedded %d docs → %s", len(docs), embeddings.shape) return embeddings # --------------------------------------------------------------------------- # §3.2 — Cluster discipline checks # --------------------------------------------------------------------------- def check_discipline(labels: np.ndarray, n_docs: int) -> dict: """Two hard constraints: max-mass ≤ 25 %, min-size ≥ 5.""" counts = Counter(int(l) for l in labels) unique = [l for l in counts if l != -1] if not unique: return dict(max_mass_pct=0, max_mass_ok=False, min_size=0, min_size_ok=False, n_clusters=0, n_noise=counts.get(-1, 0)) max_mass_pct = max(counts[l] / n_docs for l in unique) min_size = min(counts[l] for l in unique) return dict( max_mass_pct=round(max_mass_pct, 4), max_mass_ok=max_mass_pct <= 0.25, min_size=int(min_size), min_size_ok=min_size >= 5, n_clusters=len(unique), n_noise=counts.get(-1, 0), cluster_sizes={l: counts[l] for l in sorted(unique)}, ) # --------------------------------------------------------------------------- # §3.4 — Quality metrics # --------------------------------------------------------------------------- def compute_persistence(clusterer: HDBSCAN) -> float: """Average cluster persistence from the condensed tree.""" try: p = getattr(clusterer, "cluster_persistence_", None) if p is not None and len(p) > 0: return float(np.mean(p)) except Exception: pass return 0.0 def per_cluster_persistence(clusterer: HDBSCAN, labels: np.ndarray) -> dict: """Map each cluster ID to its persistence score (§8).""" try: p = getattr(clusterer, "cluster_persistence_", None) if p is None or len(p) == 0: return {} unique = sorted(set(int(l) for l in labels if l != -1)) return {cid: float(p[i]) if i < len(p) else 0.0 for i, cid in enumerate(unique)} except Exception: return {} def compute_dbcv(reduced: np.ndarray, labels: np.ndarray) -> float: """Density-Based Cluster Validity index.""" try: from hdbscan.validity import validity_index ul = set(labels); ul.discard(-1) if len(ul) < 2: return -1.0 return float(validity_index(reduced.astype(np.float64), labels)) except Exception as e: logger.warning("DBCV failed: %s", e) return -1.0 def compute_stability(embeddings: np.ndarray, params: dict, n_seeds: int = 3) -> float: """Cluster-recurrence stability via pairwise ARI across seeds (§3.4). Uses 3 seeds by default for speed (spec allows 3–5).""" all_labels = [] for s in range(n_seeds): u = UMAP(n_neighbors=params["n_neighbors"], n_components=params["n_components"], min_dist=0.0, metric="cosine", random_state=s * 7 + 1, low_memory=True) red = u.fit_transform(embeddings) h = HDBSCAN(min_cluster_size=params["min_cluster_size"], min_samples=params["min_samples"], metric="euclidean", cluster_selection_method=params["csm"], cluster_selection_epsilon=params["cse"]) all_labels.append(h.fit_predict(red)) aris = [] for i in range(len(all_labels)): for j in range(i + 1, len(all_labels)): aris.append(adjusted_rand_score(all_labels[i], all_labels[j])) return float(np.mean(aris)) if aris else 0.0 # --------------------------------------------------------------------------- # §3.4 — Bayesian optimisation objective # --------------------------------------------------------------------------- def _objective(trial, embeddings, n_docs): """Single Optuna trial — returns (persistence, dbcv, stability_placeholder).""" n_neighbors = trial.suggest_categorical("n_neighbors", [5, 10, 15, 30, 50]) n_components = trial.suggest_int("n_components", 5, 10) mcs = trial.suggest_int( "min_cluster_size", max(5, int(0.01 * n_docs)), max(20, int(0.05 * n_docs)), ) ms = trial.suggest_int("min_samples", 1, mcs) csm = trial.suggest_categorical("csm", ["eom", "leaf"]) cse = trial.suggest_float("cse", 0.0, 0.3, step=0.05) params = dict(n_neighbors=n_neighbors, n_components=n_components, min_cluster_size=mcs, min_samples=ms, csm=csm, cse=cse) u = UMAP(n_neighbors=n_neighbors, n_components=n_components, min_dist=0.0, metric="cosine", random_state=42, low_memory=True) red = u.fit_transform(embeddings) h = HDBSCAN(min_cluster_size=mcs, min_samples=ms, metric="euclidean", cluster_selection_method=csm, cluster_selection_epsilon=cse, allow_single_cluster=False, gen_min_span_tree=True) labels = h.fit_predict(red) disc = check_discipline(labels, n_docs) trial.set_user_attr("params", params) trial.set_user_attr("discipline", disc) trial.set_user_attr("labels", labels.tolist()) # Hard-constraint violation → worst scores if not disc["max_mass_ok"] or not disc["min_size_ok"]: trial.set_user_attr("pass", False) return 0.0, -1.0, 0.0 trial.set_user_attr("pass", True) pers = compute_persistence(h) dbcv = compute_dbcv(red, labels) trial.set_user_attr("persistence", pers) trial.set_user_attr("dbcv", dbcv) return pers, dbcv, 0.5 # stability computed only for winner # --------------------------------------------------------------------------- # §3.4 — Run the full Bayesian loop # --------------------------------------------------------------------------- def run_bayesian_optimisation( embeddings: np.ndarray, n_trials: int = 50, progress_callback=None, ) -> dict: n_docs = len(embeddings) study = optuna.create_study( directions=["maximize", "maximize", "maximize"], sampler=optuna.samplers.TPESampler(seed=42, multivariate=True), study_name="specter2_umap_hdbscan", ) trial_log = [] def _cb(study, trial): d = trial.user_attrs.get("discipline", {}) entry = dict( trial=trial.number, params=trial.user_attrs.get("params", {}), discipline_pass=trial.user_attrs.get("pass", False), persistence=trial.user_attrs.get("persistence", 0.0), dbcv=trial.user_attrs.get("dbcv", -1.0), n_clusters=d.get("n_clusters", 0), max_mass_pct=d.get("max_mass_pct", 0.0), min_size=d.get("min_size", 0), n_noise=d.get("n_noise", 0), values=list(trial.values) if trial.values else [], ) trial_log.append(entry) if progress_callback: progress_callback(trial.number + 1, n_trials, entry) for i in range(n_trials): study.optimize( lambda t: _objective(t, embeddings, n_docs), n_trials=1, callbacks=[_cb], show_progress_bar=False, ) # §3.6 convergence: 3 consecutive passing within 5 % of best passing = [e for e in trial_log if e["discipline_pass"]] if len(passing) >= 3 and i >= 9: # allow early stop after 10 trials best_p = max(e["persistence"] for e in passing) if best_p > 0: last3 = passing[-3:] if all(abs(e["persistence"] - best_p) / best_p < 0.05 for e in last3): logger.info("Converged at trial %d.", i + 1) break # Select best passing trial (max persistence, then DBCV) passing_trials = [t for t in study.trials if t.user_attrs.get("pass", False)] if passing_trials: best = max(passing_trials, key=lambda t: (t.values[0], t.values[1])) else: logger.warning("No trial passed discipline — using last trial.") best = study.trials[-1] bp = best.user_attrs["params"] labels = np.array(best.user_attrs["labels"]) stability = compute_stability(embeddings, bp, n_seeds=3) return dict( best_params=bp, best_labels=labels, best_trial=best.number, persistence=best.user_attrs.get("persistence", 0.0), dbcv=best.user_attrs.get("dbcv", -1.0), stability=stability, discipline=best.user_attrs.get("discipline", {}), trial_log=trial_log, n_trials_run=len(trial_log), ) # --------------------------------------------------------------------------- # §3.1 — 2-D UMAP for visualisation # --------------------------------------------------------------------------- def compute_2d_umap(embeddings: np.ndarray, seed: int = 42) -> np.ndarray: return UMAP(n_neighbors=15, n_components=2, min_dist=0.1, metric="cosine", random_state=seed, low_memory=True).fit_transform(embeddings) # --------------------------------------------------------------------------- # §3.1 — TF-IDF keyphrase extraction per cluster (3–5 phrases) # Fast alternative to KeyBERT — no extra model download needed. # --------------------------------------------------------------------------- def extract_keyphrases(docs: list[str], labels: np.ndarray, top_n: int = 5) -> dict: from sklearn.feature_extraction.text import TfidfVectorizer cluster_docs = defaultdict(list) for doc, lab in zip(docs, labels): if lab != -1: cluster_docs[int(lab)].append(doc) out = {} for cid, cdocs in cluster_docs.items(): if len(cdocs) < 2: out[cid] = [] continue try: tfidf = TfidfVectorizer( stop_words="english", max_features=200, ngram_range=(1, 3), max_df=0.9, min_df=1) X = tfidf.fit_transform(cdocs) terms = tfidf.get_feature_names_out() scores = X.sum(axis=0).A1 top_idx = scores.argsort()[::-1][:top_n] out[cid] = [(terms[i], float(scores[i])) for i in top_idx] except Exception as e: logger.warning("Keyphrase extraction cluster %d: %s", cid, e) out[cid] = [] return out # --------------------------------------------------------------------------- # §3.1 — Strong / weak member counts via HDBSCAN probabilities # --------------------------------------------------------------------------- def strong_weak_members(labels: np.ndarray, probabilities: np.ndarray) -> dict: mem = defaultdict(lambda: {"strong": 0, "weak": 0}) for lab, prob in zip(labels, probabilities): if lab == -1: continue cid = int(lab) if prob >= 0.5: mem[cid]["strong"] += 1 else: mem[cid]["weak"] += 1 return dict(mem) # --------------------------------------------------------------------------- # §3.2 — Outlier reduction: reassign noise to nearest cluster (≤ 25 %) # --------------------------------------------------------------------------- def outlier_reduction(labels: np.ndarray, reduced: np.ndarray, n_docs: int) -> np.ndarray: labels = labels.copy() cap = int(0.25 * n_docs) cdocs = defaultdict(list) for i, l in enumerate(labels): if l != -1: cdocs[int(l)].append(i) if not cdocs: return labels cids = list(cdocs.keys()) centroids = np.vstack([reduced[cdocs[c]].mean(axis=0) for c in cids]) noise = [i for i, l in enumerate(labels) if l == -1] moved = 0 for idx in noise: dists = np.linalg.norm(centroids - reduced[idx], axis=1) for best in np.argsort(dists): tgt = cids[best] if len(cdocs[tgt]) < cap: labels[idx] = tgt cdocs[tgt].append(idx) moved += 1 break logger.info("Outlier reduction: %d / %d noise reassigned.", moved, len(noise)) return labels # --------------------------------------------------------------------------- # Representative docs (top-3 by centroid proximity) # --------------------------------------------------------------------------- def get_representative_docs(labels, embeddings, docs, top_n=3): cdocs = defaultdict(list) for i, l in enumerate(labels): if l != -1: cdocs[int(l)].append(i) out = {} for cid, idxs in cdocs.items(): ce = embeddings[idxs].mean(axis=0).reshape(1, -1) sims = cosine_similarity(ce, embeddings[idxs])[0] top = np.argsort(sims)[-top_n:][::-1] out[cid] = [docs[idxs[t]] for t in top] return out # --------------------------------------------------------------------------- # §9 — RQ2 / RQ3 mismatch table # --------------------------------------------------------------------------- def build_mismatch_table(keyphrases: dict, cluster_labels: dict) -> list: """Compare cluster keyphrases against assigned labels to flag mismatches. Returns rows for a mismatch table (§9).""" rows = [] for cid in sorted(keyphrases.keys()): kps = keyphrases.get(cid, []) kp_terms = [k[0] if isinstance(k, tuple) else k for k in kps[:5]] label = cluster_labels.get(cid, "") # Check overlap between label words and keyphrase terms label_words = set(label.lower().split()) kp_words = set(" ".join(kp_terms).lower().split()) overlap = label_words & kp_words noise = {"the","and","for","with","using","based","from","in","of","a","to"} overlap -= noise match_pct = len(overlap) / max(len(label_words - noise), 1) status = "MATCH" if match_pct >= 0.3 else "MISMATCH" rows.append(dict( cluster=cid, label=label, keyphrases=", ".join(kp_terms), overlap=", ".join(overlap) if overlap else "—", match_pct=round(match_pct * 100), status=status, )) return rows # --------------------------------------------------------------------------- # High-level pipeline entry point # --------------------------------------------------------------------------- def run_topic_modeling(filepath: str, n_trials: int = 50, progress_callback=None) -> dict: # 1. Load df = load_csv(filepath) docs = prepare_documents(df) n_docs = len(docs) # 2. Embed (deterministic) embeddings = embed_documents(docs) # 3. Bayesian optimisation (§3.4) opt = run_bayesian_optimisation(embeddings, n_trials, progress_callback) bp = opt["best_params"] labels = opt["best_labels"] # 4. Re-run winner for clusterer object (probabilities) u = UMAP(n_neighbors=bp["n_neighbors"], n_components=bp["n_components"], min_dist=0.0, metric="cosine", random_state=42) red = u.fit_transform(embeddings) h = HDBSCAN(min_cluster_size=bp["min_cluster_size"], min_samples=bp["min_samples"], metric="euclidean", cluster_selection_method=bp["csm"], cluster_selection_epsilon=bp["cse"], allow_single_cluster=False, gen_min_span_tree=True) h.fit(red) # Per-cluster persistence (§8) cluster_pers = per_cluster_persistence(h, labels) # 5. Outlier reduction (§3.2 — clusters < 5 reassigned) labels = outlier_reduction(labels, red, n_docs) # 6. Strong / weak (§3.1) sw = strong_weak_members(labels, h.probabilities_) # 7. 2-D UMAP (§3.1) umap_2d = compute_2d_umap(embeddings) # 8. KeyBERT keyphrases (§3.1) keyphrases = extract_keyphrases(docs, labels) # 9. Rep docs rep_docs = get_representative_docs(labels, embeddings, docs) # 10. Final discipline disc = check_discipline(labels, n_docs) return dict( documents=docs, labels=labels.tolist(), keyphrases=keyphrases, representative_docs=rep_docs, membership=sw, umap_2d=umap_2d.tolist(), discipline=disc, best_params=bp, cluster_persistence=cluster_pers, metrics=dict(persistence=opt["persistence"], dbcv=opt["dbcv"], stability=opt["stability"]), trial_log=opt["trial_log"], n_trials_run=opt["n_trials_run"], best_trial=opt["best_trial"], n_docs=n_docs, embeddings=embeddings, )