| """ |
| tools.py |
| -------- |
| Topic-modelling pipeline: SPECTER-2 → UMAP → HDBSCAN |
| with multi-objective Bayesian optimisation over UMAP + HDBSCAN |
| parameters (§3.1–§3.6 of the methodology guide). |
| |
| No BERTopic wrapper — bare UMAP + HDBSCAN on SPECTER-2 embeddings. |
| """ |
|
|
| import re |
| import logging |
| import pandas as pd |
| import numpy as np |
| from typing import Optional |
| from collections import Counter, defaultdict |
|
|
| from sentence_transformers import SentenceTransformer |
| from umap import UMAP |
| from hdbscan import HDBSCAN |
| from sklearn.metrics import adjusted_rand_score |
| from sklearn.metrics.pairwise import cosine_similarity |
| import optuna |
|
|
| |
| |
| |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s | %(message)s") |
| logger = logging.getLogger(__name__) |
| optuna.logging.set_verbosity(optuna.logging.WARNING) |
|
|
|
|
| |
| |
| |
| def load_csv(filepath: str) -> pd.DataFrame: |
| df = pd.read_csv(filepath) |
| df.columns = df.columns.str.lower() |
| required = {"title", "abstract"} |
| missing = required - set(df.columns) |
| if missing: |
| raise ValueError(f"CSV missing column(s): {missing}") |
| logger.info("Loaded %d rows from '%s'.", len(df), filepath) |
| return df |
|
|
|
|
| |
| |
| |
| def prepare_documents(df: pd.DataFrame) -> list[str]: |
| """One string per paper: title + abstract (§3.1 input unit).""" |
| docs = (df["title"].fillna("") + ". " + df["abstract"].fillna("")).tolist() |
| logger.info("Prepared %d title+abstract documents.", len(docs)) |
| return docs |
|
|
|
|
| |
| |
| |
| _MODEL_CACHE = {} |
|
|
| def embed_documents( |
| docs: list[str], |
| model_name: str = "allenai/specter2_base", |
| ) -> np.ndarray: |
| """Embed with SPECTER-2. Deterministic — no tuning (§3.3).""" |
| if model_name not in _MODEL_CACHE: |
| logger.info("Loading %s (first time, will be cached)…", model_name) |
| _MODEL_CACHE[model_name] = SentenceTransformer(model_name) |
| model = _MODEL_CACHE[model_name] |
| embeddings = model.encode(docs, show_progress_bar=True, batch_size=64) |
| logger.info("Embedded %d docs → %s", len(docs), embeddings.shape) |
| return embeddings |
|
|
|
|
| |
| |
| |
| def check_discipline(labels: np.ndarray, n_docs: int) -> dict: |
| """Two hard constraints: max-mass ≤ 25 %, min-size ≥ 5.""" |
| counts = Counter(int(l) for l in labels) |
| unique = [l for l in counts if l != -1] |
|
|
| if not unique: |
| return dict(max_mass_pct=0, max_mass_ok=False, |
| min_size=0, min_size_ok=False, |
| n_clusters=0, n_noise=counts.get(-1, 0)) |
|
|
| max_mass_pct = max(counts[l] / n_docs for l in unique) |
| min_size = min(counts[l] for l in unique) |
|
|
| return dict( |
| max_mass_pct=round(max_mass_pct, 4), |
| max_mass_ok=max_mass_pct <= 0.25, |
| min_size=int(min_size), |
| min_size_ok=min_size >= 5, |
| n_clusters=len(unique), |
| n_noise=counts.get(-1, 0), |
| cluster_sizes={l: counts[l] for l in sorted(unique)}, |
| ) |
|
|
|
|
| |
| |
| |
| def compute_persistence(clusterer: HDBSCAN) -> float: |
| """Average cluster persistence from the condensed tree.""" |
| try: |
| p = getattr(clusterer, "cluster_persistence_", None) |
| if p is not None and len(p) > 0: |
| return float(np.mean(p)) |
| except Exception: |
| pass |
| return 0.0 |
|
|
|
|
| def per_cluster_persistence(clusterer: HDBSCAN, labels: np.ndarray) -> dict: |
| """Map each cluster ID to its persistence score (§8).""" |
| try: |
| p = getattr(clusterer, "cluster_persistence_", None) |
| if p is None or len(p) == 0: |
| return {} |
| unique = sorted(set(int(l) for l in labels if l != -1)) |
| return {cid: float(p[i]) if i < len(p) else 0.0 |
| for i, cid in enumerate(unique)} |
| except Exception: |
| return {} |
|
|
|
|
| def compute_dbcv(reduced: np.ndarray, labels: np.ndarray) -> float: |
| """Density-Based Cluster Validity index.""" |
| try: |
| from hdbscan.validity import validity_index |
| ul = set(labels); ul.discard(-1) |
| if len(ul) < 2: |
| return -1.0 |
| return float(validity_index(reduced.astype(np.float64), labels)) |
| except Exception as e: |
| logger.warning("DBCV failed: %s", e) |
| return -1.0 |
|
|
|
|
| def compute_stability(embeddings: np.ndarray, params: dict, |
| n_seeds: int = 3) -> float: |
| """Cluster-recurrence stability via pairwise ARI across seeds (§3.4). |
| Uses 3 seeds by default for speed (spec allows 3–5).""" |
| all_labels = [] |
| for s in range(n_seeds): |
| u = UMAP(n_neighbors=params["n_neighbors"], |
| n_components=params["n_components"], |
| min_dist=0.0, metric="cosine", |
| random_state=s * 7 + 1, low_memory=True) |
| red = u.fit_transform(embeddings) |
| h = HDBSCAN(min_cluster_size=params["min_cluster_size"], |
| min_samples=params["min_samples"], |
| metric="euclidean", |
| cluster_selection_method=params["csm"], |
| cluster_selection_epsilon=params["cse"]) |
| all_labels.append(h.fit_predict(red)) |
|
|
| aris = [] |
| for i in range(len(all_labels)): |
| for j in range(i + 1, len(all_labels)): |
| aris.append(adjusted_rand_score(all_labels[i], all_labels[j])) |
| return float(np.mean(aris)) if aris else 0.0 |
|
|
|
|
| |
| |
| |
| def _objective(trial, embeddings, n_docs): |
| """Single Optuna trial — returns (persistence, dbcv, stability_placeholder).""" |
| n_neighbors = trial.suggest_categorical("n_neighbors", [5, 10, 15, 30, 50]) |
| n_components = trial.suggest_int("n_components", 5, 10) |
| mcs = trial.suggest_int( |
| "min_cluster_size", |
| max(5, int(0.01 * n_docs)), |
| max(20, int(0.05 * n_docs)), |
| ) |
| ms = trial.suggest_int("min_samples", 1, mcs) |
| csm = trial.suggest_categorical("csm", ["eom", "leaf"]) |
| cse = trial.suggest_float("cse", 0.0, 0.3, step=0.05) |
|
|
| params = dict(n_neighbors=n_neighbors, n_components=n_components, |
| min_cluster_size=mcs, min_samples=ms, csm=csm, cse=cse) |
|
|
| u = UMAP(n_neighbors=n_neighbors, n_components=n_components, |
| min_dist=0.0, metric="cosine", random_state=42, |
| low_memory=True) |
| red = u.fit_transform(embeddings) |
|
|
| h = HDBSCAN(min_cluster_size=mcs, min_samples=ms, metric="euclidean", |
| cluster_selection_method=csm, |
| cluster_selection_epsilon=cse, |
| allow_single_cluster=False, gen_min_span_tree=True) |
| labels = h.fit_predict(red) |
|
|
| disc = check_discipline(labels, n_docs) |
| trial.set_user_attr("params", params) |
| trial.set_user_attr("discipline", disc) |
| trial.set_user_attr("labels", labels.tolist()) |
|
|
| |
| if not disc["max_mass_ok"] or not disc["min_size_ok"]: |
| trial.set_user_attr("pass", False) |
| return 0.0, -1.0, 0.0 |
|
|
| trial.set_user_attr("pass", True) |
| pers = compute_persistence(h) |
| dbcv = compute_dbcv(red, labels) |
| trial.set_user_attr("persistence", pers) |
| trial.set_user_attr("dbcv", dbcv) |
| return pers, dbcv, 0.5 |
|
|
|
|
| |
| |
| |
| def run_bayesian_optimisation( |
| embeddings: np.ndarray, |
| n_trials: int = 50, |
| progress_callback=None, |
| ) -> dict: |
| n_docs = len(embeddings) |
| study = optuna.create_study( |
| directions=["maximize", "maximize", "maximize"], |
| sampler=optuna.samplers.TPESampler(seed=42, multivariate=True), |
| study_name="specter2_umap_hdbscan", |
| ) |
| trial_log = [] |
|
|
| def _cb(study, trial): |
| d = trial.user_attrs.get("discipline", {}) |
| entry = dict( |
| trial=trial.number, |
| params=trial.user_attrs.get("params", {}), |
| discipline_pass=trial.user_attrs.get("pass", False), |
| persistence=trial.user_attrs.get("persistence", 0.0), |
| dbcv=trial.user_attrs.get("dbcv", -1.0), |
| n_clusters=d.get("n_clusters", 0), |
| max_mass_pct=d.get("max_mass_pct", 0.0), |
| min_size=d.get("min_size", 0), |
| n_noise=d.get("n_noise", 0), |
| values=list(trial.values) if trial.values else [], |
| ) |
| trial_log.append(entry) |
| if progress_callback: |
| progress_callback(trial.number + 1, n_trials, entry) |
|
|
| for i in range(n_trials): |
| study.optimize( |
| lambda t: _objective(t, embeddings, n_docs), |
| n_trials=1, callbacks=[_cb], show_progress_bar=False, |
| ) |
| |
| passing = [e for e in trial_log if e["discipline_pass"]] |
| if len(passing) >= 3 and i >= 9: |
| best_p = max(e["persistence"] for e in passing) |
| if best_p > 0: |
| last3 = passing[-3:] |
| if all(abs(e["persistence"] - best_p) / best_p < 0.05 |
| for e in last3): |
| logger.info("Converged at trial %d.", i + 1) |
| break |
|
|
| |
| passing_trials = [t for t in study.trials |
| if t.user_attrs.get("pass", False)] |
| if passing_trials: |
| best = max(passing_trials, key=lambda t: (t.values[0], t.values[1])) |
| else: |
| logger.warning("No trial passed discipline — using last trial.") |
| best = study.trials[-1] |
|
|
| bp = best.user_attrs["params"] |
| labels = np.array(best.user_attrs["labels"]) |
| stability = compute_stability(embeddings, bp, n_seeds=3) |
|
|
| return dict( |
| best_params=bp, best_labels=labels, |
| best_trial=best.number, |
| persistence=best.user_attrs.get("persistence", 0.0), |
| dbcv=best.user_attrs.get("dbcv", -1.0), |
| stability=stability, |
| discipline=best.user_attrs.get("discipline", {}), |
| trial_log=trial_log, |
| n_trials_run=len(trial_log), |
| ) |
|
|
|
|
| |
| |
| |
| def compute_2d_umap(embeddings: np.ndarray, seed: int = 42) -> np.ndarray: |
| return UMAP(n_neighbors=15, n_components=2, min_dist=0.1, |
| metric="cosine", random_state=seed, |
| low_memory=True).fit_transform(embeddings) |
|
|
|
|
| |
| |
| |
| |
| def extract_keyphrases(docs: list[str], labels: np.ndarray, |
| top_n: int = 5) -> dict: |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| cluster_docs = defaultdict(list) |
| for doc, lab in zip(docs, labels): |
| if lab != -1: |
| cluster_docs[int(lab)].append(doc) |
| out = {} |
| for cid, cdocs in cluster_docs.items(): |
| if len(cdocs) < 2: |
| out[cid] = [] |
| continue |
| try: |
| tfidf = TfidfVectorizer( |
| stop_words="english", max_features=200, |
| ngram_range=(1, 3), max_df=0.9, min_df=1) |
| X = tfidf.fit_transform(cdocs) |
| terms = tfidf.get_feature_names_out() |
| scores = X.sum(axis=0).A1 |
| top_idx = scores.argsort()[::-1][:top_n] |
| out[cid] = [(terms[i], float(scores[i])) for i in top_idx] |
| except Exception as e: |
| logger.warning("Keyphrase extraction cluster %d: %s", cid, e) |
| out[cid] = [] |
| return out |
|
|
|
|
| |
| |
| |
| def strong_weak_members(labels: np.ndarray, |
| probabilities: np.ndarray) -> dict: |
| mem = defaultdict(lambda: {"strong": 0, "weak": 0}) |
| for lab, prob in zip(labels, probabilities): |
| if lab == -1: |
| continue |
| cid = int(lab) |
| if prob >= 0.5: |
| mem[cid]["strong"] += 1 |
| else: |
| mem[cid]["weak"] += 1 |
| return dict(mem) |
|
|
|
|
| |
| |
| |
| def outlier_reduction(labels: np.ndarray, reduced: np.ndarray, |
| n_docs: int) -> np.ndarray: |
| labels = labels.copy() |
| cap = int(0.25 * n_docs) |
| cdocs = defaultdict(list) |
| for i, l in enumerate(labels): |
| if l != -1: |
| cdocs[int(l)].append(i) |
| if not cdocs: |
| return labels |
| cids = list(cdocs.keys()) |
| centroids = np.vstack([reduced[cdocs[c]].mean(axis=0) for c in cids]) |
| noise = [i for i, l in enumerate(labels) if l == -1] |
| moved = 0 |
| for idx in noise: |
| dists = np.linalg.norm(centroids - reduced[idx], axis=1) |
| for best in np.argsort(dists): |
| tgt = cids[best] |
| if len(cdocs[tgt]) < cap: |
| labels[idx] = tgt |
| cdocs[tgt].append(idx) |
| moved += 1 |
| break |
| logger.info("Outlier reduction: %d / %d noise reassigned.", moved, len(noise)) |
| return labels |
|
|
|
|
| |
| |
| |
| def get_representative_docs(labels, embeddings, docs, top_n=3): |
| cdocs = defaultdict(list) |
| for i, l in enumerate(labels): |
| if l != -1: |
| cdocs[int(l)].append(i) |
| out = {} |
| for cid, idxs in cdocs.items(): |
| ce = embeddings[idxs].mean(axis=0).reshape(1, -1) |
| sims = cosine_similarity(ce, embeddings[idxs])[0] |
| top = np.argsort(sims)[-top_n:][::-1] |
| out[cid] = [docs[idxs[t]] for t in top] |
| return out |
|
|
|
|
| |
| |
| |
| def build_mismatch_table(keyphrases: dict, cluster_labels: dict) -> list: |
| """Compare cluster keyphrases against assigned labels to flag mismatches. |
| Returns rows for a mismatch table (§9).""" |
| rows = [] |
| for cid in sorted(keyphrases.keys()): |
| kps = keyphrases.get(cid, []) |
| kp_terms = [k[0] if isinstance(k, tuple) else k for k in kps[:5]] |
| label = cluster_labels.get(cid, "") |
| |
| label_words = set(label.lower().split()) |
| kp_words = set(" ".join(kp_terms).lower().split()) |
| overlap = label_words & kp_words |
| noise = {"the","and","for","with","using","based","from","in","of","a","to"} |
| overlap -= noise |
| match_pct = len(overlap) / max(len(label_words - noise), 1) |
| status = "MATCH" if match_pct >= 0.3 else "MISMATCH" |
| rows.append(dict( |
| cluster=cid, label=label, |
| keyphrases=", ".join(kp_terms), |
| overlap=", ".join(overlap) if overlap else "—", |
| match_pct=round(match_pct * 100), |
| status=status, |
| )) |
| return rows |
|
|
|
|
| |
| |
| |
| def run_topic_modeling(filepath: str, n_trials: int = 50, |
| progress_callback=None) -> dict: |
| |
| df = load_csv(filepath) |
| docs = prepare_documents(df) |
| n_docs = len(docs) |
|
|
| |
| embeddings = embed_documents(docs) |
|
|
| |
| opt = run_bayesian_optimisation(embeddings, n_trials, progress_callback) |
| bp = opt["best_params"] |
| labels = opt["best_labels"] |
|
|
| |
| u = UMAP(n_neighbors=bp["n_neighbors"], n_components=bp["n_components"], |
| min_dist=0.0, metric="cosine", random_state=42) |
| red = u.fit_transform(embeddings) |
| h = HDBSCAN(min_cluster_size=bp["min_cluster_size"], |
| min_samples=bp["min_samples"], metric="euclidean", |
| cluster_selection_method=bp["csm"], |
| cluster_selection_epsilon=bp["cse"], |
| allow_single_cluster=False, |
| gen_min_span_tree=True) |
| h.fit(red) |
|
|
| |
| cluster_pers = per_cluster_persistence(h, labels) |
|
|
| |
| labels = outlier_reduction(labels, red, n_docs) |
|
|
| |
| sw = strong_weak_members(labels, h.probabilities_) |
|
|
| |
| umap_2d = compute_2d_umap(embeddings) |
|
|
| |
| keyphrases = extract_keyphrases(docs, labels) |
|
|
| |
| rep_docs = get_representative_docs(labels, embeddings, docs) |
|
|
| |
| disc = check_discipline(labels, n_docs) |
|
|
| return dict( |
| documents=docs, labels=labels.tolist(), |
| keyphrases=keyphrases, representative_docs=rep_docs, |
| membership=sw, umap_2d=umap_2d.tolist(), |
| discipline=disc, best_params=bp, |
| cluster_persistence=cluster_pers, |
| metrics=dict(persistence=opt["persistence"], |
| dbcv=opt["dbcv"], |
| stability=opt["stability"]), |
| trial_log=opt["trial_log"], |
| n_trials_run=opt["n_trials_run"], |
| best_trial=opt["best_trial"], |
| n_docs=n_docs, |
| embeddings=embeddings, |
| ) |
|
|