BERTopic_AG_final / tools.py
anujjuna's picture
Update tools.py
7cbf97d verified
"""
tools.py
--------
Topic-modelling pipeline: SPECTER-2 → UMAP → HDBSCAN
with multi-objective Bayesian optimisation over UMAP + HDBSCAN
parameters (§3.1–§3.6 of the methodology guide).
No BERTopic wrapper — bare UMAP + HDBSCAN on SPECTER-2 embeddings.
"""
import re
import logging
import pandas as pd
import numpy as np
from typing import Optional
from collections import Counter, defaultdict
from sentence_transformers import SentenceTransformer
from umap import UMAP
from hdbscan import HDBSCAN
from sklearn.metrics import adjusted_rand_score
from sklearn.metrics.pairwise import cosine_similarity
import optuna
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format="%(levelname)s | %(message)s")
logger = logging.getLogger(__name__)
optuna.logging.set_verbosity(optuna.logging.WARNING)
# ---------------------------------------------------------------------------
# Data Loading (unchanged)
# ---------------------------------------------------------------------------
def load_csv(filepath: str) -> pd.DataFrame:
df = pd.read_csv(filepath)
df.columns = df.columns.str.lower()
required = {"title", "abstract"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"CSV missing column(s): {missing}")
logger.info("Loaded %d rows from '%s'.", len(df), filepath)
return df
# ---------------------------------------------------------------------------
# §3.1 — Input unit: title + abstract concatenation
# ---------------------------------------------------------------------------
def prepare_documents(df: pd.DataFrame) -> list[str]:
"""One string per paper: title + abstract (§3.1 input unit)."""
docs = (df["title"].fillna("") + ". " + df["abstract"].fillna("")).tolist()
logger.info("Prepared %d title+abstract documents.", len(docs))
return docs
# ---------------------------------------------------------------------------
# §3.1 — Embed with SPECTER-2 (cached model for speed)
# ---------------------------------------------------------------------------
_MODEL_CACHE = {}
def embed_documents(
docs: list[str],
model_name: str = "allenai/specter2_base",
) -> np.ndarray:
"""Embed with SPECTER-2. Deterministic — no tuning (§3.3)."""
if model_name not in _MODEL_CACHE:
logger.info("Loading %s (first time, will be cached)…", model_name)
_MODEL_CACHE[model_name] = SentenceTransformer(model_name)
model = _MODEL_CACHE[model_name]
embeddings = model.encode(docs, show_progress_bar=True, batch_size=64)
logger.info("Embedded %d docs → %s", len(docs), embeddings.shape)
return embeddings
# ---------------------------------------------------------------------------
# §3.2 — Cluster discipline checks
# ---------------------------------------------------------------------------
def check_discipline(labels: np.ndarray, n_docs: int) -> dict:
"""Two hard constraints: max-mass ≤ 25 %, min-size ≥ 5."""
counts = Counter(int(l) for l in labels)
unique = [l for l in counts if l != -1]
if not unique:
return dict(max_mass_pct=0, max_mass_ok=False,
min_size=0, min_size_ok=False,
n_clusters=0, n_noise=counts.get(-1, 0))
max_mass_pct = max(counts[l] / n_docs for l in unique)
min_size = min(counts[l] for l in unique)
return dict(
max_mass_pct=round(max_mass_pct, 4),
max_mass_ok=max_mass_pct <= 0.25,
min_size=int(min_size),
min_size_ok=min_size >= 5,
n_clusters=len(unique),
n_noise=counts.get(-1, 0),
cluster_sizes={l: counts[l] for l in sorted(unique)},
)
# ---------------------------------------------------------------------------
# §3.4 — Quality metrics
# ---------------------------------------------------------------------------
def compute_persistence(clusterer: HDBSCAN) -> float:
"""Average cluster persistence from the condensed tree."""
try:
p = getattr(clusterer, "cluster_persistence_", None)
if p is not None and len(p) > 0:
return float(np.mean(p))
except Exception:
pass
return 0.0
def per_cluster_persistence(clusterer: HDBSCAN, labels: np.ndarray) -> dict:
"""Map each cluster ID to its persistence score (§8)."""
try:
p = getattr(clusterer, "cluster_persistence_", None)
if p is None or len(p) == 0:
return {}
unique = sorted(set(int(l) for l in labels if l != -1))
return {cid: float(p[i]) if i < len(p) else 0.0
for i, cid in enumerate(unique)}
except Exception:
return {}
def compute_dbcv(reduced: np.ndarray, labels: np.ndarray) -> float:
"""Density-Based Cluster Validity index."""
try:
from hdbscan.validity import validity_index
ul = set(labels); ul.discard(-1)
if len(ul) < 2:
return -1.0
return float(validity_index(reduced.astype(np.float64), labels))
except Exception as e:
logger.warning("DBCV failed: %s", e)
return -1.0
def compute_stability(embeddings: np.ndarray, params: dict,
n_seeds: int = 3) -> float:
"""Cluster-recurrence stability via pairwise ARI across seeds (§3.4).
Uses 3 seeds by default for speed (spec allows 3–5)."""
all_labels = []
for s in range(n_seeds):
u = UMAP(n_neighbors=params["n_neighbors"],
n_components=params["n_components"],
min_dist=0.0, metric="cosine",
random_state=s * 7 + 1, low_memory=True)
red = u.fit_transform(embeddings)
h = HDBSCAN(min_cluster_size=params["min_cluster_size"],
min_samples=params["min_samples"],
metric="euclidean",
cluster_selection_method=params["csm"],
cluster_selection_epsilon=params["cse"])
all_labels.append(h.fit_predict(red))
aris = []
for i in range(len(all_labels)):
for j in range(i + 1, len(all_labels)):
aris.append(adjusted_rand_score(all_labels[i], all_labels[j]))
return float(np.mean(aris)) if aris else 0.0
# ---------------------------------------------------------------------------
# §3.4 — Bayesian optimisation objective
# ---------------------------------------------------------------------------
def _objective(trial, embeddings, n_docs):
"""Single Optuna trial — returns (persistence, dbcv, stability_placeholder)."""
n_neighbors = trial.suggest_categorical("n_neighbors", [5, 10, 15, 30, 50])
n_components = trial.suggest_int("n_components", 5, 10)
mcs = trial.suggest_int(
"min_cluster_size",
max(5, int(0.01 * n_docs)),
max(20, int(0.05 * n_docs)),
)
ms = trial.suggest_int("min_samples", 1, mcs)
csm = trial.suggest_categorical("csm", ["eom", "leaf"])
cse = trial.suggest_float("cse", 0.0, 0.3, step=0.05)
params = dict(n_neighbors=n_neighbors, n_components=n_components,
min_cluster_size=mcs, min_samples=ms, csm=csm, cse=cse)
u = UMAP(n_neighbors=n_neighbors, n_components=n_components,
min_dist=0.0, metric="cosine", random_state=42,
low_memory=True)
red = u.fit_transform(embeddings)
h = HDBSCAN(min_cluster_size=mcs, min_samples=ms, metric="euclidean",
cluster_selection_method=csm,
cluster_selection_epsilon=cse,
allow_single_cluster=False, gen_min_span_tree=True)
labels = h.fit_predict(red)
disc = check_discipline(labels, n_docs)
trial.set_user_attr("params", params)
trial.set_user_attr("discipline", disc)
trial.set_user_attr("labels", labels.tolist())
# Hard-constraint violation → worst scores
if not disc["max_mass_ok"] or not disc["min_size_ok"]:
trial.set_user_attr("pass", False)
return 0.0, -1.0, 0.0
trial.set_user_attr("pass", True)
pers = compute_persistence(h)
dbcv = compute_dbcv(red, labels)
trial.set_user_attr("persistence", pers)
trial.set_user_attr("dbcv", dbcv)
return pers, dbcv, 0.5 # stability computed only for winner
# ---------------------------------------------------------------------------
# §3.4 — Run the full Bayesian loop
# ---------------------------------------------------------------------------
def run_bayesian_optimisation(
embeddings: np.ndarray,
n_trials: int = 50,
progress_callback=None,
) -> dict:
n_docs = len(embeddings)
study = optuna.create_study(
directions=["maximize", "maximize", "maximize"],
sampler=optuna.samplers.TPESampler(seed=42, multivariate=True),
study_name="specter2_umap_hdbscan",
)
trial_log = []
def _cb(study, trial):
d = trial.user_attrs.get("discipline", {})
entry = dict(
trial=trial.number,
params=trial.user_attrs.get("params", {}),
discipline_pass=trial.user_attrs.get("pass", False),
persistence=trial.user_attrs.get("persistence", 0.0),
dbcv=trial.user_attrs.get("dbcv", -1.0),
n_clusters=d.get("n_clusters", 0),
max_mass_pct=d.get("max_mass_pct", 0.0),
min_size=d.get("min_size", 0),
n_noise=d.get("n_noise", 0),
values=list(trial.values) if trial.values else [],
)
trial_log.append(entry)
if progress_callback:
progress_callback(trial.number + 1, n_trials, entry)
for i in range(n_trials):
study.optimize(
lambda t: _objective(t, embeddings, n_docs),
n_trials=1, callbacks=[_cb], show_progress_bar=False,
)
# §3.6 convergence: 3 consecutive passing within 5 % of best
passing = [e for e in trial_log if e["discipline_pass"]]
if len(passing) >= 3 and i >= 9: # allow early stop after 10 trials
best_p = max(e["persistence"] for e in passing)
if best_p > 0:
last3 = passing[-3:]
if all(abs(e["persistence"] - best_p) / best_p < 0.05
for e in last3):
logger.info("Converged at trial %d.", i + 1)
break
# Select best passing trial (max persistence, then DBCV)
passing_trials = [t for t in study.trials
if t.user_attrs.get("pass", False)]
if passing_trials:
best = max(passing_trials, key=lambda t: (t.values[0], t.values[1]))
else:
logger.warning("No trial passed discipline — using last trial.")
best = study.trials[-1]
bp = best.user_attrs["params"]
labels = np.array(best.user_attrs["labels"])
stability = compute_stability(embeddings, bp, n_seeds=3)
return dict(
best_params=bp, best_labels=labels,
best_trial=best.number,
persistence=best.user_attrs.get("persistence", 0.0),
dbcv=best.user_attrs.get("dbcv", -1.0),
stability=stability,
discipline=best.user_attrs.get("discipline", {}),
trial_log=trial_log,
n_trials_run=len(trial_log),
)
# ---------------------------------------------------------------------------
# §3.1 — 2-D UMAP for visualisation
# ---------------------------------------------------------------------------
def compute_2d_umap(embeddings: np.ndarray, seed: int = 42) -> np.ndarray:
return UMAP(n_neighbors=15, n_components=2, min_dist=0.1,
metric="cosine", random_state=seed,
low_memory=True).fit_transform(embeddings)
# ---------------------------------------------------------------------------
# §3.1 — TF-IDF keyphrase extraction per cluster (3–5 phrases)
# Fast alternative to KeyBERT — no extra model download needed.
# ---------------------------------------------------------------------------
def extract_keyphrases(docs: list[str], labels: np.ndarray,
top_n: int = 5) -> dict:
from sklearn.feature_extraction.text import TfidfVectorizer
cluster_docs = defaultdict(list)
for doc, lab in zip(docs, labels):
if lab != -1:
cluster_docs[int(lab)].append(doc)
out = {}
for cid, cdocs in cluster_docs.items():
if len(cdocs) < 2:
out[cid] = []
continue
try:
tfidf = TfidfVectorizer(
stop_words="english", max_features=200,
ngram_range=(1, 3), max_df=0.9, min_df=1)
X = tfidf.fit_transform(cdocs)
terms = tfidf.get_feature_names_out()
scores = X.sum(axis=0).A1
top_idx = scores.argsort()[::-1][:top_n]
out[cid] = [(terms[i], float(scores[i])) for i in top_idx]
except Exception as e:
logger.warning("Keyphrase extraction cluster %d: %s", cid, e)
out[cid] = []
return out
# ---------------------------------------------------------------------------
# §3.1 — Strong / weak member counts via HDBSCAN probabilities
# ---------------------------------------------------------------------------
def strong_weak_members(labels: np.ndarray,
probabilities: np.ndarray) -> dict:
mem = defaultdict(lambda: {"strong": 0, "weak": 0})
for lab, prob in zip(labels, probabilities):
if lab == -1:
continue
cid = int(lab)
if prob >= 0.5:
mem[cid]["strong"] += 1
else:
mem[cid]["weak"] += 1
return dict(mem)
# ---------------------------------------------------------------------------
# §3.2 — Outlier reduction: reassign noise to nearest cluster (≤ 25 %)
# ---------------------------------------------------------------------------
def outlier_reduction(labels: np.ndarray, reduced: np.ndarray,
n_docs: int) -> np.ndarray:
labels = labels.copy()
cap = int(0.25 * n_docs)
cdocs = defaultdict(list)
for i, l in enumerate(labels):
if l != -1:
cdocs[int(l)].append(i)
if not cdocs:
return labels
cids = list(cdocs.keys())
centroids = np.vstack([reduced[cdocs[c]].mean(axis=0) for c in cids])
noise = [i for i, l in enumerate(labels) if l == -1]
moved = 0
for idx in noise:
dists = np.linalg.norm(centroids - reduced[idx], axis=1)
for best in np.argsort(dists):
tgt = cids[best]
if len(cdocs[tgt]) < cap:
labels[idx] = tgt
cdocs[tgt].append(idx)
moved += 1
break
logger.info("Outlier reduction: %d / %d noise reassigned.", moved, len(noise))
return labels
# ---------------------------------------------------------------------------
# Representative docs (top-3 by centroid proximity)
# ---------------------------------------------------------------------------
def get_representative_docs(labels, embeddings, docs, top_n=3):
cdocs = defaultdict(list)
for i, l in enumerate(labels):
if l != -1:
cdocs[int(l)].append(i)
out = {}
for cid, idxs in cdocs.items():
ce = embeddings[idxs].mean(axis=0).reshape(1, -1)
sims = cosine_similarity(ce, embeddings[idxs])[0]
top = np.argsort(sims)[-top_n:][::-1]
out[cid] = [docs[idxs[t]] for t in top]
return out
# ---------------------------------------------------------------------------
# §9 — RQ2 / RQ3 mismatch table
# ---------------------------------------------------------------------------
def build_mismatch_table(keyphrases: dict, cluster_labels: dict) -> list:
"""Compare cluster keyphrases against assigned labels to flag mismatches.
Returns rows for a mismatch table (§9)."""
rows = []
for cid in sorted(keyphrases.keys()):
kps = keyphrases.get(cid, [])
kp_terms = [k[0] if isinstance(k, tuple) else k for k in kps[:5]]
label = cluster_labels.get(cid, "")
# Check overlap between label words and keyphrase terms
label_words = set(label.lower().split())
kp_words = set(" ".join(kp_terms).lower().split())
overlap = label_words & kp_words
noise = {"the","and","for","with","using","based","from","in","of","a","to"}
overlap -= noise
match_pct = len(overlap) / max(len(label_words - noise), 1)
status = "MATCH" if match_pct >= 0.3 else "MISMATCH"
rows.append(dict(
cluster=cid, label=label,
keyphrases=", ".join(kp_terms),
overlap=", ".join(overlap) if overlap else "—",
match_pct=round(match_pct * 100),
status=status,
))
return rows
# ---------------------------------------------------------------------------
# High-level pipeline entry point
# ---------------------------------------------------------------------------
def run_topic_modeling(filepath: str, n_trials: int = 50,
progress_callback=None) -> dict:
# 1. Load
df = load_csv(filepath)
docs = prepare_documents(df)
n_docs = len(docs)
# 2. Embed (deterministic)
embeddings = embed_documents(docs)
# 3. Bayesian optimisation (§3.4)
opt = run_bayesian_optimisation(embeddings, n_trials, progress_callback)
bp = opt["best_params"]
labels = opt["best_labels"]
# 4. Re-run winner for clusterer object (probabilities)
u = UMAP(n_neighbors=bp["n_neighbors"], n_components=bp["n_components"],
min_dist=0.0, metric="cosine", random_state=42)
red = u.fit_transform(embeddings)
h = HDBSCAN(min_cluster_size=bp["min_cluster_size"],
min_samples=bp["min_samples"], metric="euclidean",
cluster_selection_method=bp["csm"],
cluster_selection_epsilon=bp["cse"],
allow_single_cluster=False,
gen_min_span_tree=True)
h.fit(red)
# Per-cluster persistence (§8)
cluster_pers = per_cluster_persistence(h, labels)
# 5. Outlier reduction (§3.2 — clusters < 5 reassigned)
labels = outlier_reduction(labels, red, n_docs)
# 6. Strong / weak (§3.1)
sw = strong_weak_members(labels, h.probabilities_)
# 7. 2-D UMAP (§3.1)
umap_2d = compute_2d_umap(embeddings)
# 8. KeyBERT keyphrases (§3.1)
keyphrases = extract_keyphrases(docs, labels)
# 9. Rep docs
rep_docs = get_representative_docs(labels, embeddings, docs)
# 10. Final discipline
disc = check_discipline(labels, n_docs)
return dict(
documents=docs, labels=labels.tolist(),
keyphrases=keyphrases, representative_docs=rep_docs,
membership=sw, umap_2d=umap_2d.tolist(),
discipline=disc, best_params=bp,
cluster_persistence=cluster_pers,
metrics=dict(persistence=opt["persistence"],
dbcv=opt["dbcv"],
stability=opt["stability"]),
trial_log=opt["trial_log"],
n_trials_run=opt["n_trials_run"],
best_trial=opt["best_trial"],
n_docs=n_docs,
embeddings=embeddings,
)