topic-modelling / tools.py
vvinayakkk's picture
added
79cd547
"""
tools.py β€” Core pipeline for topic modelling on Scopus CSV data.
FIXES IN THIS VERSION:
1. consolidate_themes() β€” Complete rewrite. Was producing 1 theme due to
threshold=0.35 being too tight. Now uses AgglomerativeClustering with
direct n_clusters target. Guaranteed 4-8 themes.
2. Theme balancing β€” Filters noise clusters (< MIN_THEME_PAPERS) before
theming. Orphaned small clusters collected into "Niche Topics" group.
Prevents 1 giant catch-all theme + 5 singletons.
3. Noise cluster detection β€” Post-clustering filter removes clusters whose
top keywords are metadata garbage (conference years, editorial tokens etc.)
4. build_topic_table() β€” Updated column names to match labeler.py
(hf_mistral_label, hf_zephyr_label) with backward-compat aliases.
5. name_themes_with_llm() β€” Better prompting, proper academic style.
6. create_taxonomy_map() β€” Weighted semantic scoring (exact + partial match).
7. upsert_json_bucket() β€” Added explicit flush + resolve logging so writes
never silently fail.
8. run_braun_clarke_pipeline() β€” Passes embeddings+labels into
consolidate_themes() so centroids are computed on real document vectors.
9. OUTPUT_DIR handling β€” All file writes use the resolved absolute path
passed in from app.py, never a relative ".".
10. run_full_pipeline() β€” NOW includes Combined (Title+Abstract) analysis
as third pipeline run; returns dict["combined"] so the UI can display it.
"""
import json
import os
import re
import logging
import hashlib
from collections import Counter
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import plotly.express as px
from dotenv import load_dotenv
from sklearn.cluster import AgglomerativeClustering
from sklearn.feature_extraction.text import TfidfVectorizer, ENGLISH_STOP_WORDS
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import normalize
import cleaner
load_dotenv()
hf_token = os.getenv("HF_TOKEN", "").strip()
if hf_token and not os.getenv("HUGGING_FACE_HUB_TOKEN"):
os.environ["HUGGING_FACE_HUB_TOKEN"] = hf_token
logger = logging.getLogger(__name__)
SPECTER2_MODEL_REPO = "vvinayakkkkk/specter2-cached"
STOPWORDS = set(ENGLISH_STOP_WORDS)
# Minimum papers a cluster must have to participate in theme grouping.
MIN_THEME_PAPERS = 10
# Tokens that indicate a cluster is metadata noise, not a research topic.
METADATA_NOISE_TOKENS = {
"conference", "international", "editorial", "guest", "proceedings",
"workshop", "journal", "vol", "issue", "abstract", "available",
"know", "tell", "king", "live", "flow", "long", "path", "features",
"domains", "experiments", "dsp", "sixth", "2001", "2002", "2003",
"2004", "2005", "2006", "2007", "2008", "2009", "2010",
}
PAJAIS_CATEGORIES = {
"artificial intelligence and machine learning": {
"ai", "artificial", "intelligence", "neural", "deep", "learning",
"machine", "algorithm", "model", "prediction", "classification",
"nlp", "language", "generative", "llm", "transformer",
},
"data analytics and information systems": {
"analytics", "data", "mining", "insight", "dashboard", "system",
"information", "platform", "digital", "infrastructure", "database",
"big", "cloud", "iot", "real", "time",
},
"sustainability and environment": {
"sustainability", "green", "environment", "climate", "carbon",
"energy", "renewable", "emission", "ecological", "circular",
},
"healthcare and medicine": {
"health", "clinical", "patient", "medicine", "medical", "hospital",
"drug", "disease", "treatment", "diagnosis", "telemedicine", "ehealth",
},
"education and learning technology": {
"education", "learning", "student", "teaching", "curriculum",
"online", "elearning", "academic", "university", "pedagogy",
},
"business strategy and management": {
"business", "strategy", "management", "firm", "organization",
"leadership", "performance", "competitive", "governance", "agile",
},
"finance and economics": {
"finance", "financial", "economic", "market", "investment",
"banking", "cryptocurrency", "fintech", "stock", "portfolio",
},
"supply chain and operations": {
"supply", "logistics", "operations", "inventory", "manufacturing",
"procurement", "lean", "warehouse", "distribution", "production",
},
"social media and digital marketing": {
"social", "media", "marketing", "digital", "brand", "consumer",
"influencer", "advertising", "engagement", "content", "platform",
},
"cybersecurity and risk": {
"risk", "security", "privacy", "cyber", "resilience", "threat",
"fraud", "vulnerability", "authentication", "blockchain",
},
"human behavior and psychology": {
"behavior", "psychology", "attitude", "intention", "motivation",
"trust", "adoption", "perception", "cognitive", "satisfaction",
},
"policy and governance": {
"policy", "governance", "regulation", "public", "government",
"compliance", "law", "equity", "ethics", "stakeholder",
},
}
@dataclass
class AnalysisResult:
source_column: str
paper_count: int
unit_count: int
topic_table: pd.DataFrame
taxonomy_table: pd.DataFrame
distribution_fig: Any
keyword_fig: Any
labels_payload: Dict[str, Any]
themes_payload: Dict[str, Any]
taxonomy_payload: List[Dict[str, str]]
suggested_params: Dict[str, Any] = None
trial_log: List[Dict[str, Any]] = None
paper_assignments: List[int] = None
doc_row_indices: List[int] = None
llm_suggested_params: Dict[str, Any] = None
bayesian_best_params: Dict[str, Any] = None
# ---------------------------------------------------------------------------
# Embedding model
# ---------------------------------------------------------------------------
@lru_cache(maxsize=1)
def get_embedding_model(model_name: str = "allenai/specter2"):
logger.info(f"πŸ“₯ Loading embedding model: {model_name}")
if model_name != "allenai/specter2":
raise ValueError("This pipeline only supports allenai/specter2 embeddings.")
from adapters import AutoAdapterModel, Stack
from transformers import AutoTokenizer
class Specter2Encoder:
def __init__(self):
import torch
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info("πŸ€– Loading SPECTER2 tokenizer...")
self.tokenizer = AutoTokenizer.from_pretrained("allenai/specter2_base")
logger.info("πŸ€– Loading SPECTER2 base model...")
self.model = AutoAdapterModel.from_pretrained("allenai/specter2_base")
logger.info("πŸ€– Loading SPECTER2 adapter from cached repo...")
self.model.load_adapter(
SPECTER2_MODEL_REPO, source="hf", load_as="specter2", set_active=False
)
self.model.set_active_adapters(Stack("specter2"))
self.model.to(self.device)
self.model.eval()
logger.info("βœ… SPECTER2 ready")
def encode(
self,
documents: List[str],
batch_size: int = 64,
show_progress_bar: bool = False,
normalize_embeddings: bool = True,
) -> np.ndarray:
import torch
logger.info(f"πŸ”€ Encoding {len(documents)} documents (batch_size={batch_size})")
embeddings: List[np.ndarray] = []
num_batches = (len(documents) + batch_size - 1) // batch_size
for batch_idx, start in enumerate(range(0, len(documents), batch_size)):
batch = documents[start : start + batch_size]
logger.info(f" Batch [{batch_idx + 1}/{num_batches}] β€” {len(batch)} docs")
inputs = self.tokenizer(
batch,
padding=True,
truncation=True,
return_tensors="pt",
return_token_type_ids=False,
max_length=512,
)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model(**inputs)
batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
embeddings.append(batch_embeddings)
return np.vstack(embeddings) if embeddings else np.empty((0, 768), dtype=float)
return Specter2Encoder()
def warm_specter2_cache() -> None:
logger.info("πŸ”₯ SPECTER2 cache warmup START")
model = get_embedding_model("allenai/specter2")
logger.info(f" warmup loaded model: {type(model).__name__}")
logger.info("βœ… SPECTER2 cache warmup COMPLETE")
def _embedding_cache_key(documents: List[str], model_name: str, batch_size: int) -> str:
hasher = hashlib.sha256()
hasher.update(model_name.encode("utf-8"))
hasher.update(str(batch_size).encode("utf-8"))
for document in documents:
hasher.update(b"\0")
hasher.update(document.encode("utf-8", errors="ignore"))
return hasher.hexdigest()[:16]
def get_cached_embeddings(
documents: List[str],
model: Any,
cache_dir: str = ".",
model_name: str = "allenai/specter2",
batch_size: int = 64,
) -> np.ndarray:
cache_root = Path(cache_dir).resolve() / ".embedding_cache"
cache_root.mkdir(parents=True, exist_ok=True)
key = _embedding_cache_key(documents, model_name, batch_size)
cache_file = cache_root / f"embeddings_{key}.npy"
meta_file = cache_root / f"embeddings_{key}.json"
if cache_file.exists():
logger.info(f"βœ… Loading cached embeddings from {cache_file}")
try:
if meta_file.exists():
meta = json.loads(meta_file.read_text(encoding="utf-8"))
logger.info(f" cache metadata: {meta}")
except Exception as exc:
logger.warning(f" cache metadata read failed: {exc}")
return np.load(cache_file, allow_pickle=False)
logger.info(f"πŸ”’ Computing embeddings for {len(documents)} documents")
embeddings = model.encode(documents, batch_size=batch_size)
np.save(cache_file, embeddings)
meta_file.write_text(
json.dumps(
{
"model_name": model_name,
"batch_size": batch_size,
"document_count": len(documents),
"embedding_shape": list(np.asarray(embeddings).shape),
"cache_file": cache_file.name,
},
indent=2,
ensure_ascii=False,
),
encoding="utf-8",
)
logger.info(f"πŸ’Ύ Saved cached embeddings to {cache_file}")
return np.asarray(embeddings)
# ---------------------------------------------------------------------------
# Artifact helpers
# ---------------------------------------------------------------------------
def ensure_output_artifacts(output_dir: str = ".") -> Dict[str, str]:
base = Path(output_dir).resolve()
base.mkdir(parents=True, exist_ok=True)
paths = {
"comparison": base / "comparison.csv",
"taxonomy": base / "taxonomy_map.json",
"narrative": base / "narrative.txt",
"labels": base / "labels.json",
"themes": base / "themes.json",
}
if not paths["comparison"].exists():
pd.DataFrame(
columns=["Abstract Topic", "Title Topic", "Similarity", "Notes"]
).to_csv(paths["comparison"], index=False)
if not paths["taxonomy"].exists():
paths["taxonomy"].write_text("[]", encoding="utf-8")
if not paths["narrative"].exists():
paths["narrative"].write_text("Narrative placeholder.", encoding="utf-8")
if not paths["labels"].exists():
paths["labels"].write_text("{}", encoding="utf-8")
if not paths["themes"].exists():
paths["themes"].write_text("{}", encoding="utf-8")
return {k: str(v) for k, v in paths.items()}
def detect_csv_file(directory: str = ".") -> Optional[str]:
excluded = {"comparison.csv"}
csv_files = sorted(
[p for p in Path(directory).iterdir() if p.is_file() and p.suffix.lower() == ".csv"]
)
candidates = [p for p in csv_files if p.name.lower() not in excluded]
for path in candidates:
try:
sample = pd.read_csv(path, nrows=1, low_memory=False)
cols = {str(c).strip().lower() for c in sample.columns}
if "abstract" in cols or "title" in cols or "matched_csv_title" in cols:
return str(path)
except Exception:
continue
if candidates:
return str(candidates[0])
return str(csv_files[0]) if csv_files else None
def load_scopus_csv(
csv_path: Optional[str] = None, directory: str = "."
) -> Tuple[pd.DataFrame, str]:
path = csv_path or detect_csv_file(directory)
if not path:
raise FileNotFoundError("No CSV file found in the working directory.")
df = pd.read_csv(path, low_memory=False)
# Normalise column names: strip BOM and whitespace
df.columns = [c.strip().lstrip("\ufeff") for c in df.columns]
# Resolve title column to "Title" if variant exists
_title_variants = [
"Title", "title", "Article Title", "Document Title",
"Paper Title", "matched_csv_title", "document_title",
]
for variant in _title_variants:
if variant in df.columns and variant != "Title":
df = df.rename(columns={variant: "Title"})
break
if "Abstract" not in df.columns:
df["Abstract"] = ""
if "Title" not in df.columns:
df["Title"] = ""
return df, str(path)
# ---------------------------------------------------------------------------
# Document preparation
# ---------------------------------------------------------------------------
def build_documents(
df: pd.DataFrame,
column: str,
) -> Tuple[List[str], List[str], int, List[int]]:
series = df[column].dropna().astype(str)
series = series[series.str.strip() != ""]
logger.info(f"[{column}] build_documents start: {len(series)} non-empty source rows")
embedding_docs: List[str] = []
tfidf_docs: List[str] = []
row_indices: List[int] = []
for idx, text in series.items():
emb = cleaner.prepare_for_embedding(text)
if not emb:
continue
tfi = cleaner.prepare_for_tfidf(text)
if not tfi:
continue
embedding_docs.append(emb)
tfidf_docs.append(tfi)
row_indices.append(int(idx))
logger.info(
f"[{column}] build_documents complete: "
f"{len(embedding_docs)} embedding docs / {len(tfidf_docs)} TF-IDF docs"
)
return embedding_docs, tfidf_docs, len(series), row_indices
def build_documents_legacy(
df: pd.DataFrame, column: str
) -> Tuple[List[str], int]:
emb, _, paper_count, _ = build_documents(df, column)
return emb, paper_count
def build_documents_combined(df: pd.DataFrame) -> Tuple[List[str], List[str], int, List[int]]:
title_series = df.get("Title", pd.Series(dtype=str)).fillna("").astype(str)
abstract_series = df.get("Abstract", pd.Series(dtype=str)).fillna("").astype(str)
combined = (title_series.str.strip() + " \n " + abstract_series.str.strip()).str.strip()
combined = combined[combined != ""]
logger.info(f"[Combined] build_documents start: {len(combined)} non-empty source rows")
embedding_docs: List[str] = []
tfidf_docs: List[str] = []
row_indices: List[int] = []
for idx, text in combined.items():
emb = cleaner.prepare_for_embedding(text)
if not emb:
continue
tfi = cleaner.prepare_for_tfidf(text)
if not tfi:
continue
embedding_docs.append(emb)
tfidf_docs.append(tfi)
row_indices.append(int(idx))
logger.info(
f"[Combined] build_documents complete: "
f"{len(embedding_docs)} embedding docs / {len(tfidf_docs)} TF-IDF docs"
)
return embedding_docs, tfidf_docs, len(combined), row_indices
# ---------------------------------------------------------------------------
# Embedding
# ---------------------------------------------------------------------------
def embed_documents(
documents: List[str],
batch_size: int = 64,
cache_dir: str = ".",
) -> np.ndarray:
if not documents:
return np.empty((0, 768), dtype=float)
logger.info(f"embed_documents start: {len(documents)} documents")
model = get_embedding_model("allenai/specter2")
embeddings = get_cached_embeddings(
documents, model, cache_dir=cache_dir,
model_name="allenai/specter2", batch_size=batch_size,
)
logger.info(f"embed_documents complete: output shape={np.array(embeddings).shape}")
return normalize(np.array(embeddings), norm="l2")
# ---------------------------------------------------------------------------
# Noise cluster detection
# ---------------------------------------------------------------------------
def is_noise_cluster(keywords: List[str], threshold: float = 0.55) -> bool:
if not keywords:
return True
check = keywords[:6]
noise_count = sum(1 for kw in check if kw.lower().strip() in METADATA_NOISE_TOKENS)
return (noise_count / len(check)) >= threshold
# ---------------------------------------------------------------------------
# Clustering β€” UMAP + HDBSCAN
# ---------------------------------------------------------------------------
def cluster_embeddings_umap_hdbscan(
embeddings: np.ndarray,
min_cluster_size: int = 5,
min_samples: int = 1,
n_neighbors: int = 15,
min_dist: float = 0.0,
cluster_selection_method: str = "eom",
cluster_selection_epsilon: float = 0.0,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
if len(embeddings) == 0:
return np.array([], dtype=int), np.array([]), np.array([])
if len(embeddings) == 1:
return np.array([0], dtype=int), np.array([1.0]), np.array([])
try:
from umap import UMAP
import hdbscan
logger.info(f"UMAP reduction (n_neighbors={n_neighbors}, min_dist={min_dist})...")
umap_model = UMAP(
n_neighbors=min(n_neighbors, len(embeddings) - 1),
min_dist=min_dist,
metric="cosine",
n_components=min(8, len(embeddings) - 1),
random_state=42,
)
reduced = umap_model.fit_transform(embeddings)
logger.info(f"HDBSCAN clustering (min_cluster_size={min_cluster_size})...")
clusterer = hdbscan.HDBSCAN(
min_cluster_size=min(min_cluster_size, max(2, len(embeddings) // 5)),
min_samples=min_samples,
metric="euclidean",
prediction_data=True,
cluster_selection_method=cluster_selection_method,
cluster_selection_epsilon=cluster_selection_epsilon,
)
labels = clusterer.fit_predict(reduced)
try:
probabilities = np.asarray(clusterer.probabilities_)
except Exception:
probabilities = np.array([])
try:
persistence_scores = np.asarray(clusterer.cluster_persistence_)
except Exception:
persistence_scores = np.array([])
return labels, probabilities, persistence_scores
except ImportError as e:
logger.error(f"UMAP or HDBSCAN not installed: {e}")
return cluster_embeddings(embeddings, threshold=0.7)
def control_cluster_count(
embeddings: np.ndarray,
labels: np.ndarray,
min_clusters: int = 15,
max_clusters: int = 30,
) -> np.ndarray:
unique_labels = np.unique(labels)
current_count = len(unique_labels[unique_labels >= 0])
logger.info(f"Cluster count: {current_count} (target: {min_clusters}–{max_clusters})")
if min_clusters <= current_count <= max_clusters:
return labels
if current_count > max_clusters:
valid = unique_labels[unique_labels >= 0]
centroids = np.vstack([embeddings[labels == lbl].mean(axis=0) for lbl in valid])
centroids = normalize(centroids, norm="l2")
n_target = min(max_clusters, len(centroids))
try:
merger = AgglomerativeClustering(n_clusters=n_target, metric="cosine", linkage="average")
except TypeError:
merger = AgglomerativeClustering(n_clusters=n_target, affinity="cosine", linkage="average")
merged = merger.fit_predict(centroids)
label_map = {int(valid[i]): int(merged[i]) for i in range(len(valid))}
return np.array([label_map.get(int(l), 0) for l in labels], dtype=int)
if current_count < min_clusters:
new_min_size = max(2, min_clusters // 5)
labels, probs, persist = cluster_embeddings_umap_hdbscan(
embeddings, min_cluster_size=new_min_size,
n_neighbors=min(15, len(embeddings) - 1),
)
return labels
return labels
def cluster_embeddings(embeddings: np.ndarray, threshold: float = 0.7) -> np.ndarray:
if len(embeddings) == 0:
return np.array([], dtype=int)
if len(embeddings) == 1:
return np.array([0], dtype=int)
try:
clustering = AgglomerativeClustering(
n_clusters=None, metric="cosine", linkage="average",
distance_threshold=threshold,
)
except TypeError:
clustering = AgglomerativeClustering(
n_clusters=None, affinity="cosine", linkage="average",
distance_threshold=threshold,
)
return clustering.fit_predict(embeddings)
def reduce_topic_count(
embeddings: np.ndarray, labels: np.ndarray,
min_topics: int = 50, max_topics: int = 120,
) -> np.ndarray:
if len(labels) == 0:
return labels
unique_labels = np.unique(labels)
if len(unique_labels) <= max_topics:
return labels
topic_to_idx = {int(lbl): idx for idx, lbl in enumerate(unique_labels)}
centroids = np.vstack([embeddings[labels == lbl].mean(axis=0) for lbl in unique_labels])
centroids = normalize(centroids, norm="l2")
target = min(max(min_topics, min(len(unique_labels), max_topics)), len(centroids))
try:
reducer = AgglomerativeClustering(n_clusters=target, metric="cosine", linkage="average")
except TypeError:
reducer = AgglomerativeClustering(n_clusters=target, affinity="cosine", linkage="average")
reduced_ids = reducer.fit_predict(centroids)
remap = {int(lbl): int(reduced_ids[idx]) for lbl, idx in topic_to_idx.items()}
return np.array([remap[int(lbl)] for lbl in labels], dtype=int)
# ---------------------------------------------------------------------------
# LLM-guided parameter suggestion
# ---------------------------------------------------------------------------
def suggest_clustering_params_with_llm(
n_docs: int, sample_keywords: List[str], llm: Any = None,
) -> Dict[str, Any]:
defaults = {
"n_neighbors": 15,
"n_components": 8,
"min_cluster_size": max(5, n_docs // 100),
"min_samples": 1,
"cluster_selection_method": "eom",
"cluster_selection_epsilon": 0.0,
"reasoning": "default",
}
prompt = (
f"Corpus size: {n_docs}\n"
f"Sample keywords: {', '.join(sample_keywords[:20])}\n\n"
"Recommend UMAP and HDBSCAN parameters for clustering academic papers using SPECTER2 embeddings. "
"Return a valid JSON object with only the keys: n_neighbors, n_components, min_cluster_size, "
"min_samples, cluster_selection_method, cluster_selection_epsilon."
)
if llm is None:
try:
llm = create_groq_llm(temperature=0.0)
except Exception:
llm = None
try:
if llm is not None:
try:
from langchain_core.messages import HumanMessage, SystemMessage
resp = llm.invoke([
SystemMessage(content="Respond with ONLY a raw JSON object and nothing else."),
HumanMessage(content=prompt),
])
except Exception:
resp = llm.invoke(prompt)
content = getattr(resp, "content", str(resp))
else:
raise Exception("No LLM available")
jtext = str(content).strip()
if not jtext:
raise ValueError("Empty LLM response")
jtext = re.sub(r"^```(?:json)?\s*", "", jtext, flags=re.IGNORECASE)
jtext = re.sub(r"\s*```$", "", jtext)
parsed = json.loads(jtext)
params = {
"n_neighbors": int(parsed.get("n_neighbors", defaults["n_neighbors"])),
"n_components": int(parsed.get("n_components", defaults["n_components"])),
"min_cluster_size": int(parsed.get("min_cluster_size", defaults["min_cluster_size"])),
"min_samples": int(parsed.get("min_samples", defaults["min_samples"])),
"cluster_selection_method": str(parsed.get("cluster_selection_method", defaults["cluster_selection_method"])),
"cluster_selection_epsilon": float(parsed.get("cluster_selection_epsilon", defaults["cluster_selection_epsilon"])),
"reasoning": content[:1000],
}
except Exception as e:
logger.warning(f"suggest_clustering_params_with_llm failed: {e}")
params = defaults
params["n_neighbors"] = max(5, min(50, int(params.get("n_neighbors", 15))))
params["n_components"] = max(5, min(10, int(params.get("n_components", 8))))
min_size_lower = max(5, n_docs // 100)
min_size_upper = max(20, n_docs // 20)
params["min_cluster_size"] = max(min_size_lower, min(min_size_upper, int(params.get("min_cluster_size", min_size_lower))))
params["min_samples"] = max(1, min(int(params.get("min_samples", 1)), int(params["min_cluster_size"])))
params["cluster_selection_method"] = params.get("cluster_selection_method", "eom") if params.get("cluster_selection_method") in ("eom", "leaf") else "eom"
params["cluster_selection_epsilon"] = max(0.0, min(0.3, float(params.get("cluster_selection_epsilon", 0.0))))
return params
# ---------------------------------------------------------------------------
# Bayesian optimisation (Optuna) over UMAP + HDBSCAN
# ---------------------------------------------------------------------------
def run_bayesian_hdbscan_optimization(
embeddings: np.ndarray,
n_trials: int = 50,
warm_start_params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
try:
import optuna
except Exception as e:
logger.error(f"Optuna not available: {e}")
return {"best_params": {}, "best_score": -1.0, "n_valid_trials": 0, "trial_log": []}
import hdbscan
from hdbscan.validity import validity_index
from umap import UMAP
n_docs = len(embeddings)
trial_log: List[Dict[str, Any]] = []
umap_cache: Dict[Tuple[int, int], np.ndarray] = {}
def objective(trial: optuna.Trial):
params = {
"n_neighbors": trial.suggest_categorical("n_neighbors", [5, 10, 15, 25, 35, 50]),
"n_components": trial.suggest_int("n_components", 5, 10),
"min_cluster_size": trial.suggest_int("min_cluster_size", max(5, n_docs // 100), max(20, n_docs // 20)),
"min_samples": trial.suggest_int("min_samples", 1, 20),
"cluster_selection_method": trial.suggest_categorical("cluster_selection_method", ["eom", "leaf"]),
"cluster_selection_epsilon": trial.suggest_float("cluster_selection_epsilon", 0.0, 0.3),
}
if params["min_samples"] > params["min_cluster_size"]:
params["min_samples"] = params["min_cluster_size"]
cache_key = (int(params["n_neighbors"]), int(params["n_components"]))
try:
if cache_key in umap_cache:
reduced = umap_cache[cache_key]
else:
umap_model = UMAP(
n_neighbors=min(params["n_neighbors"], len(embeddings) - 1),
n_components=params["n_components"],
min_dist=0.0, metric="cosine", random_state=42,
)
reduced = umap_model.fit_transform(embeddings)
umap_cache[cache_key] = reduced
clusterer = hdbscan.HDBSCAN(
min_cluster_size=min(params["min_cluster_size"], max(2, len(embeddings) // 5)),
min_samples=params["min_samples"],
metric="euclidean", prediction_data=True,
cluster_selection_method=params["cluster_selection_method"],
cluster_selection_epsilon=params["cluster_selection_epsilon"],
)
labels = clusterer.fit_predict(reduced)
persistence = np.asarray(getattr(clusterer, "cluster_persistence_", np.array([])))
except Exception as e:
logger.warning(f"Trial failed to cluster: {e}")
return -1.0
unique, counts = np.unique(labels, return_counts=True)
max_frac = 0.0
n_valid = len([lbl for lbl in unique if lbl != -1])
for c, cnt in zip(unique, counts):
if c == -1:
continue
frac = float(cnt) / max(1, n_docs)
if frac > max_frac:
max_frac = frac
noise_count = int((labels == -1).sum())
target_min_clusters = max(10, n_docs // 120)
target_max_clusters = max(35, n_docs // 35)
passed = True
score = -1.0
persistence_mean = float(np.mean(persistence)) if len(persistence) > 0 else 0.0
dbcv = 0.0
if max_frac > 0.25:
passed = False
elif any((labels == lbl).sum() < 5 for lbl in unique if lbl != -1):
passed = False
elif n_valid < target_min_clusters or n_valid > target_max_clusters:
passed = False
else:
try:
dbcv = validity_index(embeddings, labels)
except Exception:
dbcv = 0.0
score = 0.6 * persistence_mean + 0.4 * float(dbcv)
trial_log.append({
"trial": len(trial_log) + 1,
**params,
"n_clusters": int(len(unique)),
"n_valid": int(n_valid),
"max_frac": float(max_frac),
"noise_count": int(noise_count),
"persistence": float(persistence_mean),
"dbcv": float(dbcv),
"constraints_passed": bool(passed),
"score": float(score),
})
return float(score)
study = optuna.create_study(direction="maximize")
if warm_start_params:
try:
study.enqueue_trial({
k: warm_start_params[k]
for k in warm_start_params
if k in ["n_neighbors","n_components","min_cluster_size","min_samples",
"cluster_selection_method","cluster_selection_epsilon"]
})
except Exception:
pass
study.optimize(objective, n_trials=n_trials)
try:
best = study.best_trial
best_params = best.params
best_score = float(best.value)
except Exception:
best_params = {}
best_score = -1.0
n_valid_count = sum(1 for t in trial_log if t.get("constraints_passed"))
return {"best_params": best_params, "best_score": best_score, "n_valid_trials": n_valid_count, "trial_log": trial_log}
# ---------------------------------------------------------------------------
# Keyword extraction β€” TF-IDF
# ---------------------------------------------------------------------------
def extract_topic_keywords(
tfidf_docs: List[str],
labels: np.ndarray,
top_n: int = 10,
) -> Dict[int, List[str]]:
if not tfidf_docs or len(labels) == 0:
return {}
grouped: Dict[int, List[str]] = {}
for doc, label in zip(tfidf_docs, labels):
grouped.setdefault(int(label), []).append(doc)
cluster_ids = sorted(grouped.keys())
pseudo_docs = [" ".join(grouped[cid]) for cid in cluster_ids]
vectorizer = TfidfVectorizer(
stop_words="english", ngram_range=(1, 2),
max_features=10_000, min_df=1, sublinear_tf=True,
)
try:
matrix = vectorizer.fit_transform(pseudo_docs)
except ValueError:
return {cid: [] for cid in cluster_ids}
vocab = np.array(vectorizer.get_feature_names_out())
keywords: Dict[int, List[str]] = {}
for row_idx, cid in enumerate(cluster_ids):
scores = np.asarray(matrix[row_idx].todense()).ravel()
top_idx = scores.argsort()[::-1][:top_n]
words = [str(vocab[i]) for i in top_idx if scores[i] > 0]
keywords[cid] = words
return keywords
# ---------------------------------------------------------------------------
# Topic table builder
# ---------------------------------------------------------------------------
def build_topic_table(
tfidf_docs: List[str],
labels: np.ndarray,
keywords: Dict[int, List[str]],
multi_llm_data: Optional[Dict[int, Dict[str, str]]] = None,
probabilities: Optional[np.ndarray] = None,
persistence_scores: Optional[np.ndarray] = None,
) -> pd.DataFrame:
if not tfidf_docs or len(labels) == 0:
return pd.DataFrame(columns=[
"Topic ID", "Keywords",
"groq_label", "mistral_label", "openrouter_label", "cohere_label",
"hf_mistral_label", "hf_zephyr_label", "flan_label", "gemini_label",
"Label", "cluster_size", "Count", "strong_count", "weak_count", "persistence",
])
counts = pd.Series(labels).value_counts().sort_values(ascending=False)
rows = []
for topic_id, count in counts.items():
terms = keywords.get(int(topic_id), [])
default_label = " ".join(terms[:4]).strip().title() if terms else f"Topic {int(topic_id)}"
llm = (multi_llm_data or {}).get(int(topic_id), {})
groq_lbl = llm.get("groq_label", "")
mistral_lbl = llm.get("mistral_label", "") or llm.get("hf_mistral_label", "") or llm.get("flan_label", "")
openrouter_lbl = llm.get("openrouter_label", "")
cohere_lbl = llm.get("cohere_label", "")
hf_z_lbl = llm.get("hf_zephyr_label", "") or llm.get("gemini_label", "")
if probabilities is None or len(probabilities) == 0:
strong_count = int(count)
else:
mask = np.array(labels) == int(topic_id)
strong_count = int((np.asarray(probabilities)[mask] >= 0.5).sum()) if mask.any() else 0
weak_count = int(count) - int(strong_count)
try:
persistence = float(persistence_scores[int(topic_id)]) if (persistence_scores is not None and len(persistence_scores) > int(topic_id)) else 0.0
except Exception:
persistence = 0.0
rows.append({
"Topic ID": int(topic_id),
"Keywords": ", ".join(terms),
"groq_label": groq_lbl,
"mistral_label": mistral_lbl,
"openrouter_label": openrouter_lbl,
"cohere_label": cohere_lbl,
"hf_mistral_label": mistral_lbl,
"hf_zephyr_label": hf_z_lbl,
"flan_label": mistral_lbl,
"gemini_label": hf_z_lbl,
"Label": default_label,
"cluster_size": int(count),
"Count": int(count),
"strong_count": int(strong_count),
"weak_count": int(weak_count),
"persistence": float(persistence),
})
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# LLM helpers
# ---------------------------------------------------------------------------
def create_groq_llm(temperature: float = 0.0):
api_key = os.getenv("GROQ_API_KEY", "").strip()
if not api_key:
logger.warning("⚠️ No GROQ_API_KEY found")
return None
try:
from langchain_groq import ChatGroq
llm = ChatGroq(model="llama-3.3-70b-versatile", temperature=temperature, max_tokens=200)
logger.info("βœ“ Groq LLM created (llama-3.3-70b-versatile)")
return llm
except Exception as e:
logger.warning(f"Failed to create Groq LLM: {e}")
return None
def _clean_llm_label(text: str, fallback: str) -> str:
text = re.sub(r"[\n\r]+", " ", str(text)).strip()
text = re.sub(r"^\s*\d+[\.\:\)]\s*", "", text)
text = re.sub(r"^\s*[-\*β€’]\s*", "", text)
text = re.sub(r'^["\'](.+)["\']$', r"\1", text.strip())
text = re.sub(r"^(?:the\s+)?(?:topic\s+)?(?:label|name|title)\s*[:–-]\s*", "", text, flags=re.IGNORECASE)
text = re.sub(r"[^a-zA-Z0-9\-\s]", "", text)
compact = " ".join(text.split())
words = compact.split()
if len(words) > 8:
compact = " ".join(words[:6])
return compact[:80] if compact else fallback
def label_topics_with_llm(
topic_table: pd.DataFrame,
topic_keywords: Dict[int, List[str]],
llm: Any,
) -> Dict[int, str]:
if topic_table.empty:
return {}
labels_out: Dict[int, str] = {}
llm_failed = False
for _, row in topic_table.iterrows():
topic_id = int(row["Topic ID"])
keywords = topic_keywords.get(topic_id, [])
fallback = str(row["Label"]).strip() or f"Topic {topic_id}"
if llm is None or llm_failed:
labels_out[topic_id] = fallback
continue
kw_str = ", ".join(keywords[:8])
prompt = (
f"Research cluster keywords: {kw_str}\n\n"
"Generate a concise academic topic label (3-6 words). "
"Output ONLY the label β€” no explanation, no punctuation, no quotes."
)
try:
response = llm.invoke(prompt)
content = getattr(response, "content", str(response))
labels_out[topic_id] = _clean_llm_label(content, fallback)
except Exception as e:
logger.warning(f"LLM labeling failed for topic {topic_id}: {e}")
labels_out[topic_id] = fallback
llm_failed = True
return labels_out
# ---------------------------------------------------------------------------
# JSON helpers
# ---------------------------------------------------------------------------
def upsert_json_bucket(file_path: str, key: str, payload: Any) -> None:
path = Path(file_path).resolve()
logger.info(f"πŸ’Ύ upsert_json_bucket β†’ {path} [key={key}]")
if path.exists():
try:
existing = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(existing, dict):
existing = {}
except json.JSONDecodeError:
existing = {}
else:
existing = {}
existing[key] = payload
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(existing, indent=2, ensure_ascii=False), encoding="utf-8")
logger.info(f"βœ… Saved {len(existing)} keys β†’ {path.name} ({path.stat().st_size} bytes)")
# ---------------------------------------------------------------------------
# Theme consolidation
# ---------------------------------------------------------------------------
def consolidate_themes(
topic_table: pd.DataFrame,
embeddings: Optional[np.ndarray] = None,
labels: Optional[np.ndarray] = None,
min_themes: int = 4,
max_themes: int = 8,
) -> List[Dict[str, Any]]:
if topic_table.empty:
return []
n_topics = len(topic_table)
logger.info(f"🌿 consolidate_themes: {n_topics} clusters β†’ {min_themes}–{max_themes} themes")
substantial = topic_table[topic_table["Count"] >= MIN_THEME_PAPERS].copy()
niche = topic_table[topic_table["Count"] < MIN_THEME_PAPERS].copy()
logger.info(
f" Substantial (β‰₯{MIN_THEME_PAPERS} papers): {len(substantial)} | "
f"Niche: {len(niche)}"
)
themes: List[Dict[str, Any]] = []
if not substantial.empty:
n_sub = len(substantial)
large_count = int((substantial["Count"] >= 20).sum())
target = max(min_themes, min(max_themes, max(large_count // 3, 3)))
target = min(target, n_sub)
target = max(target, 1)
logger.info(f" Large clusters (β‰₯20 papers): {large_count} β†’ target themes: {target}")
cluster_emb_matrix: Optional[np.ndarray] = None
if embeddings is not None and labels is not None and len(embeddings) > 0:
logger.info(" Computing centroids from document embeddings...")
centroids = []
for tid in substantial["Topic ID"].tolist():
mask = labels == tid
if mask.any():
centroids.append(embeddings[mask].mean(axis=0))
else:
centroids.append(np.zeros(embeddings.shape[1]))
cluster_emb_matrix = normalize(np.vstack(centroids), norm="l2")
else:
texts = (
substantial["Label"].astype(str).fillna("") + " " +
substantial["Keywords"].astype(str).fillna("")
).tolist()
try:
cluster_emb_matrix = embed_documents(texts)
except Exception as e:
logger.warning(f" Embedding failed: {e}")
if cluster_emb_matrix is not None and len(cluster_emb_matrix) >= target:
if target == 1:
theme_labels_arr = np.zeros(n_sub, dtype=int)
else:
try:
themer = AgglomerativeClustering(n_clusters=target, metric="cosine", linkage="average")
except TypeError:
themer = AgglomerativeClustering(n_clusters=target, affinity="cosine", linkage="average")
theme_labels_arr = themer.fit_predict(cluster_emb_matrix)
logger.info(f" Theme clustering β†’ {len(np.unique(theme_labels_arr))} groups")
else:
logger.warning(" Fallback: round-robin theme assignment")
theme_labels_arr = np.array([i % target for i in range(n_sub)], dtype=int)
substantial = substantial.copy()
substantial["_theme_id"] = theme_labels_arr
for theme_id, frame in substantial.groupby("_theme_id"):
sorted_frame = frame.sort_values("Count", ascending=False)
combined_kw = " ".join(sorted_frame["Keywords"].astype(str).tolist())
token_counts = Counter(
w for w in re.findall(r"[a-zA-Z]{3,}", combined_kw.lower())
if w not in STOPWORDS
)
theme_keywords = [term for term, _ in token_counts.most_common(12) if len(term) > 2][:8]
dominant_label = sorted_frame.iloc[0]["Label"]
themes.append({
"theme_id": int(theme_id),
"theme_name": str(dominant_label),
"topic_ids": sorted_frame["Topic ID"].astype(int).tolist(),
"topic_labels": sorted_frame["Label"].astype(str).tolist(),
"keywords": theme_keywords,
"topic_count": int(len(sorted_frame)),
"paper_count": int(sorted_frame["Count"].sum()),
})
if not niche.empty:
combined_kw = " ".join(niche["Keywords"].astype(str).tolist())
token_counts = Counter(
w for w in re.findall(r"[a-zA-Z]{3,}", combined_kw.lower())
if w not in STOPWORDS
)
niche_keywords = [term for term, _ in token_counts.most_common(10) if len(term) > 2][:8]
themes.append({
"theme_id": 99,
"theme_name": "Niche and Emerging Topics",
"topic_ids": niche["Topic ID"].astype(int).tolist(),
"topic_labels": niche["Label"].astype(str).tolist(),
"keywords": niche_keywords,
"topic_count": int(len(niche)),
"paper_count": int(niche["Count"].sum()),
})
themes = sorted(themes, key=lambda x: x["paper_count"], reverse=True)
logger.info(f"βœ… Generated {len(themes)} themes")
return themes
def review_themes(themes: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return [{**t, "meaningful": len(t.get("keywords", [])) >= 3} for t in themes]
def name_themes_with_llm(themes: List[Dict[str, Any]], llm: Any) -> List[Dict[str, Any]]:
if not themes:
return themes
named = []
llm_failed = False
for theme in themes:
fallback = theme["theme_name"]
if theme.get("theme_id") == 99:
named.append(theme)
continue
if llm is None or llm_failed:
named.append(theme)
continue
topic_labels_str = "; ".join(theme.get("topic_labels", [])[:6])
kw_str = ", ".join(theme.get("keywords", [])[:8])
prompt = (
"You are naming a high-level research theme that groups related academic topics.\n\n"
f"Topics in this theme: {topic_labels_str}\n"
f"Common keywords: {kw_str}\n\n"
"Generate a concise academic theme name (3-7 words). "
"Output ONLY the theme name β€” no explanation, no quotes, no punctuation, no numbering."
)
try:
response = llm.invoke(prompt)
content = getattr(response, "content", str(response))
new_name = _clean_llm_label(content, fallback)
named.append({**theme, "theme_name": new_name})
except Exception as e:
logger.warning(f"Theme naming failed: {e}")
named.append(theme)
llm_failed = True
return named
# ---------------------------------------------------------------------------
# Taxonomy mapping
# ---------------------------------------------------------------------------
def create_taxonomy_map(topic_table: pd.DataFrame, source_column: str) -> List[Dict[str, str]]:
if topic_table.empty:
return []
mappings: List[Dict[str, str]] = []
for _, row in topic_table.iterrows():
topic_name = str(row["Label"])
text_blob = f"{row['Label']} {row['Keywords']}".lower()
tokens = set(re.findall(r"[a-z]{3,}", text_blob))
best_category = None
best_score = 0
for category, kws in PAJAIS_CATEGORIES.items():
exact = len(tokens.intersection(kws))
partial = sum(1 for kw in kws if any(kw in tok or tok in kw for tok in tokens if len(tok) > 3))
score = exact * 2 + partial
if score > best_score:
best_score = score
best_category = category
if best_category and best_score >= 2:
mappings.append({
"source": source_column,
"topic": topic_name,
"classification": "MAPPED",
"domain": best_category,
"reason": f"Matches '{best_category}' with semantic score={best_score}.",
})
else:
mappings.append({
"source": source_column,
"topic": topic_name,
"classification": "NOVEL",
"domain": "Emerging / Interdisciplinary",
"reason": "No strong overlap with predefined PAJAIS taxonomy categories.",
})
return mappings
# ---------------------------------------------------------------------------
# Charts
# ---------------------------------------------------------------------------
def create_topic_distribution_chart(topic_table: pd.DataFrame, source_column: str):
if topic_table.empty:
return px.bar(title=f"No topics available for {source_column}")
frame = topic_table.sort_values("Count", ascending=False).head(30)
return px.bar(frame, x="Label", y="Count", title=f"Topic Distribution β€” {source_column}")
def create_keyword_chart(topic_table: pd.DataFrame, source_column: str):
if topic_table.empty:
return px.bar(title=f"No keywords available for {source_column}")
keyword_counter: Counter = Counter()
for _, row in topic_table.iterrows():
terms = [t.strip() for t in str(row["Keywords"]).split(",") if t.strip()]
weight = int(row["Count"])
keyword_counter.update({term: weight for term in terms[:8]})
if not keyword_counter:
return px.bar(title=f"No keywords available for {source_column}")
key_df = pd.DataFrame(keyword_counter.items(), columns=["Keyword", "Weight"])
key_df = key_df.sort_values("Weight", ascending=False).head(30)
return px.bar(key_df, x="Keyword", y="Weight", title=f"Keyword Salience β€” {source_column}")
# ---------------------------------------------------------------------------
# Main pipeline
# ---------------------------------------------------------------------------
def run_braun_clarke_pipeline(
df: pd.DataFrame,
source_column: str,
llm: Any,
output_dir: str = ".",
use_multi_llm: bool = True,
progress_callback=None,
suggested_params: Optional[Dict[str, Any]] = None,
) -> "AnalysisResult":
output_dir = str(Path(output_dir).resolve())
def _progress(msg: str):
logger.info(msg)
if progress_callback is not None:
try:
progress_callback(msg)
except Exception:
pass
files = ensure_output_artifacts(output_dir)
# Step 1: Build documents
_progress(f"[{source_column}] πŸ“ Step 1/7 β€” Building documents...")
if source_column.lower() == "combined":
embedding_docs, tfidf_docs, paper_count, row_indices = build_documents_combined(df)
else:
embedding_docs, tfidf_docs, paper_count, row_indices = build_documents(df, source_column)
_progress(f"[{source_column}] βœ“ {paper_count} papers β†’ {len(embedding_docs)} valid documents")
if not embedding_docs:
empty_table = pd.DataFrame(columns=[
"Topic ID", "Keywords", "groq_label", "hf_mistral_label",
"hf_zephyr_label", "flan_label", "gemini_label",
"Label", "cluster_size", "Count",
])
return AnalysisResult(
source_column=source_column, paper_count=paper_count, unit_count=0,
topic_table=empty_table,
taxonomy_table=pd.DataFrame(columns=["source","topic","classification","domain","reason"]),
distribution_fig=create_topic_distribution_chart(empty_table, source_column),
keyword_fig=create_keyword_chart(empty_table, source_column),
labels_payload={}, themes_payload={}, taxonomy_payload=[],
suggested_params=suggested_params or {}, trial_log=[],
paper_assignments=[], doc_row_indices=[],
llm_suggested_params={}, bayesian_best_params={},
)
# Step 2: Embed
_progress(f"[{source_column}] πŸ”’ Step 2/7 β€” Embedding {len(embedding_docs)} documents with SPECTER2...")
embeddings = embed_documents(embedding_docs, cache_dir=output_dir)
_progress(f"[{source_column}] βœ“ Embeddings shape: {embeddings.shape}")
# Step 3: Cluster
_progress(f"[{source_column}] πŸ“Š Step 3/7 β€” Clustering with UMAP + HDBSCAN...")
if suggested_params is None:
try:
tfidf_vectorizer = TfidfVectorizer(stop_words="english", ngram_range=(1,2), max_features=2000)
corpus_texts = (
(df.get("Title", pd.Series(dtype=str)).fillna("").astype(str).str.strip()
+ " \n " +
df.get("Abstract", pd.Series(dtype=str)).fillna("").astype(str).str.strip()).tolist()
if source_column.lower() == "combined"
else df[source_column].dropna().astype(str).tolist()
)
tfidf_matrix = tfidf_vectorizer.fit_transform(corpus_texts)
vocab = np.array(tfidf_vectorizer.get_feature_names_out())
summed = np.asarray(tfidf_matrix.sum(axis=0)).ravel()
top_idx = summed.argsort()[::-1][:40]
sample_keywords = [str(vocab[i]) for i in top_idx if summed[i] > 0][:40]
except Exception:
sample_keywords = []
suggested_params = suggest_clustering_params_with_llm(len(embedding_docs), sample_keywords, llm=llm)
_progress(f"[{source_column}] LLM-suggested params: {suggested_params}")
llm_suggested_params = {k: v for k, v in suggested_params.items() if k != "reasoning"}
opt_result = run_bayesian_hdbscan_optimization(embeddings, n_trials=80, warm_start_params=suggested_params)
_progress(f"[{source_column}] Bayesian best_score={opt_result.get('best_score')} n_valid={opt_result.get('n_valid_trials')}")
best = opt_result.get("best_params") or {}
params_to_use = best if (best and opt_result.get("best_score", -1.0) > -1.0) else {
"n_neighbors": 15, "min_cluster_size": 5, "min_samples": 1,
"min_dist": 0.0, "n_components": 8, "cluster_selection_method": "eom",
"cluster_selection_epsilon": 0.0,
}
bayesian_best_params = dict(params_to_use)
try:
labels, probabilities, persistence_scores = cluster_embeddings_umap_hdbscan(
embeddings,
min_cluster_size=int(params_to_use.get("min_cluster_size", 5)),
min_samples=int(params_to_use.get("min_samples", 1)),
n_neighbors=int(params_to_use.get("n_neighbors", min(15, len(embeddings)-1))),
min_dist=float(params_to_use.get("min_dist", 0.0)),
cluster_selection_method=str(params_to_use.get("cluster_selection_method", "eom")),
cluster_selection_epsilon=float(params_to_use.get("cluster_selection_epsilon", 0.0)),
)
except Exception as e:
_progress(f"[{source_column}] Clustering failed: {e} β€” falling back to defaults")
labels, probabilities, persistence_scores = cluster_embeddings_umap_hdbscan(
embeddings, min_cluster_size=5, n_neighbors=min(15, len(embeddings) - 1), min_dist=0.0,
)
n_clusters = len(np.unique(labels))
if opt_result.get("best_score", -1.0) <= -1.0:
labels = control_cluster_count(embeddings, labels, min_clusters=15, max_clusters=30)
n_clusters = len(np.unique(labels))
_progress(f"[{source_column}] βœ“ {n_clusters} clusters found")
# Step 4: Keywords
_progress(f"[{source_column}] πŸ”‘ Step 4/7 β€” Extracting TF-IDF keywords...")
topic_keywords = extract_topic_keywords(tfidf_docs, labels, top_n=10)
noise_cluster_ids = {cid for cid, kws in topic_keywords.items() if is_noise_cluster(kws)}
if noise_cluster_ids:
valid_ids = [cid for cid in topic_keywords if cid not in noise_cluster_ids]
if valid_ids:
largest_valid = max(valid_ids, key=lambda cid: int((labels == cid).sum()))
labels = np.array(
[largest_valid if int(l) in noise_cluster_ids else int(l) for l in labels], dtype=int
)
topic_keywords = extract_topic_keywords(tfidf_docs, labels, top_n=10)
n_clusters = len(np.unique(labels))
_progress(f"[{source_column}] βœ“ After noise removal: {n_clusters} clusters")
_progress(f"[{source_column}] βœ“ Keywords extracted for {len(topic_keywords)} topics")
# Step 5: Multi-LLM labeling
multi_llm_data: Dict[int, Dict[str, str]] = {}
topic_table = build_topic_table(tfidf_docs, labels, topic_keywords, multi_llm_data,
probabilities=probabilities, persistence_scores=persistence_scores)
if use_multi_llm:
_progress(f"[{source_column}] πŸ›οΈ Step 5/7 β€” Batch Multi-LLM Council labeling...")
try:
import llm_council
topics_batch = [(int(row["Topic ID"]), topic_keywords.get(int(row["Topic ID"]), []))
for _, row in topic_table.iterrows()]
n_topics = len(topics_batch)
token_budget = min(60 * n_topics, 8000)
if n_topics > 60:
chunked: Dict[int, Dict[str, str]] = {}
for chunk_start in range(0, n_topics, 50):
chunk = topics_batch[chunk_start:chunk_start + 50]
result = llm_council.run_council_batch(
topics=chunk, system_prompt="You are an expert academic research analyst.",
max_tokens=token_budget,
)
for tid, plabels in result.items():
if plabels:
chunked.setdefault(tid, {}).update(plabels)
batch_result = chunked
else:
batch_result = llm_council.run_council_batch(
topics=topics_batch, system_prompt="You are an expert academic research analyst.",
max_tokens=token_budget,
)
llm_label_map: Dict[int, str] = {}
for topic_id, provider_labels in batch_result.items():
if not provider_labels:
continue
multi_llm_data[topic_id] = {
"groq_label": provider_labels.get("Groq", ""),
"mistral_label": provider_labels.get("Mistral", ""),
"openrouter_label": provider_labels.get("OpenRouter", ""),
"cohere_label": provider_labels.get("Cohere", ""),
"hf_mistral_label": provider_labels.get("Mistral", ""),
"hf_zephyr_label": "",
"flan_label": provider_labels.get("Mistral", ""),
"gemini_label": "",
}
llm_label_map[topic_id] = next(iter(provider_labels.values()))
if llm_label_map:
topic_table = build_topic_table(tfidf_docs, labels, topic_keywords, multi_llm_data,
probabilities=probabilities, persistence_scores=persistence_scores)
topic_table["Label"] = topic_table["Topic ID"].map(llm_label_map).fillna(topic_table["Label"])
_progress(f"[{source_column}] βœ“ Batch labeling done β€” {len(llm_label_map)}/{n_topics} labels assigned")
else:
_progress(f"[{source_column}] ⚠️ Batch labeling returned 0 labels, falling back to single-LLM")
llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm)
if not topic_table.empty:
topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"])
except ImportError:
_progress(f"[{source_column}] ⚠️ llm_council not found, using single-LLM")
llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm)
if not topic_table.empty:
topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"])
except Exception as e:
_progress(f"[{source_column}] ⚠️ Batch council error ({e}), using single-LLM")
llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm)
if not topic_table.empty:
topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"])
else:
llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm)
if not topic_table.empty:
topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"])
# Step 6: Save labels
_progress(f"[{source_column}] πŸ’Ύ Step 6/7 β€” Saving labels...")
labels_payload = {
"phase": "Phase 2 - Generate Initial Topics (SPECTER2 + UMAP + HDBSCAN)",
"source": source_column,
"topic_labels": [
{
"topic_id": int(row["Topic ID"]),
"label": str(row["Label"]),
"keywords": [k.strip() for k in str(row["Keywords"]).split(",") if k.strip()],
"groq_label": row.get("groq_label", ""),
"hf_mistral_label": row.get("hf_mistral_label", ""),
"mistral_label": row.get("mistral_label", ""),
"openrouter_label": row.get("openrouter_label", ""),
"cohere_label": row.get("cohere_label", ""),
"hf_zephyr_label": row.get("hf_zephyr_label", ""),
"flan_label": row.get("flan_label", ""),
"gemini_label": row.get("gemini_label", ""),
"cluster_size": int(row["cluster_size"]),
}
for _, row in topic_table.iterrows()
],
}
upsert_json_bucket(files["labels"], source_column.lower(), labels_payload)
# Step 7: Themes
_progress(f"[{source_column}] 🌿 Step 7/7 β€” Consolidating themes...")
phase3_themes = consolidate_themes(topic_table, embeddings=embeddings, labels=labels, min_themes=4, max_themes=8)
phase4_reviewed = review_themes(phase3_themes)
phase5_named = name_themes_with_llm(phase4_reviewed, llm)
themes_payload = {"phase3": phase3_themes, "phase4": phase4_reviewed, "phase5": phase5_named}
upsert_json_bucket(files["themes"], source_column.lower(), themes_payload)
# Taxonomy
taxonomy_payload = create_taxonomy_map(topic_table, source_column)
taxonomy_table = pd.DataFrame(taxonomy_payload, columns=["source","topic","classification","domain","reason"])
taxonomy_path = Path(files["taxonomy"])
existing_taxonomy: List[Dict[str, str]] = []
if taxonomy_path.exists():
try:
loaded = json.loads(taxonomy_path.read_text(encoding="utf-8"))
if isinstance(loaded, list):
existing_taxonomy = loaded
except json.JSONDecodeError:
existing_taxonomy = []
remaining = [r for r in existing_taxonomy if r.get("source") != source_column]
taxonomy_path.write_text(
json.dumps(remaining + taxonomy_payload, indent=2, ensure_ascii=False), encoding="utf-8"
)
distribution_fig = create_topic_distribution_chart(topic_table, source_column)
keyword_fig = create_keyword_chart(topic_table, source_column)
_progress(
f"[{source_column}] βœ… Done β€” "
f"{len(embedding_docs)} docs β†’ {n_clusters} clusters β†’ {len(phase5_named)} themes"
)
return AnalysisResult(
source_column=source_column,
paper_count=paper_count,
unit_count=len(embedding_docs),
topic_table=topic_table,
taxonomy_table=taxonomy_table,
distribution_fig=distribution_fig,
keyword_fig=keyword_fig,
labels_payload=labels_payload,
themes_payload=themes_payload,
taxonomy_payload=taxonomy_payload,
suggested_params=suggested_params or {},
trial_log=opt_result.get("trial_log", []),
paper_assignments=[int(l) for l in labels],
doc_row_indices=row_indices,
llm_suggested_params=llm_suggested_params,
bayesian_best_params=bayesian_best_params,
)
# ---------------------------------------------------------------------------
# Topic comparison
# ---------------------------------------------------------------------------
def compare_topic_frames(
abstract_topics: pd.DataFrame,
title_topics: pd.DataFrame,
output_path: str = "comparison.csv",
) -> pd.DataFrame:
output_path = str(Path(output_path).resolve())
logger.info("compare_topic_frames start: abstract=%s topics, title=%s topics", len(abstract_topics), len(title_topics))
if abstract_topics.empty and title_topics.empty:
frame = pd.DataFrame([{"Abstract Topic": "N/A", "Title Topic": "N/A", "Similarity": 0.0, "Notes": "No topics available."}])
frame.to_csv(output_path, index=False)
return frame
if abstract_topics.empty:
frame = pd.DataFrame([{"Abstract Topic": "N/A", "Title Topic": str(row["Label"]), "Similarity": 0.0, "Notes": "Only title analysis available."} for _, row in title_topics.head(25).iterrows()])
frame.to_csv(output_path, index=False)
return frame
if title_topics.empty:
frame = pd.DataFrame([{"Abstract Topic": str(row["Label"]), "Title Topic": "N/A", "Similarity": 0.0, "Notes": "Only abstract analysis available."} for _, row in abstract_topics.head(25).iterrows()])
frame.to_csv(output_path, index=False)
return frame
abs_desc = (abstract_topics["Label"].astype(str) + ". " + abstract_topics["Keywords"].astype(str)).tolist()
title_desc = (title_topics["Label"].astype(str) + ". " + title_topics["Keywords"].astype(str)).tolist()
try:
all_emb = embed_documents(abs_desc + title_desc)
abs_emb = all_emb[:len(abs_desc)]
title_emb = all_emb[len(abs_desc):]
sim_matrix = cosine_similarity(abs_emb, title_emb)
except Exception as e:
logger.warning(f"Embedding comparison failed ({e}), using TF-IDF fallback")
vec = TfidfVectorizer(stop_words="english", ngram_range=(1, 2))
mat = vec.fit_transform(abs_desc + title_desc)
sim_matrix = cosine_similarity(mat[:len(abs_desc)], mat[len(abs_desc):])
rows = []
for i, abs_label in enumerate(abstract_topics["Label"].tolist()):
best_idx = int(np.argmax(sim_matrix[i]))
score = float(sim_matrix[i, best_idx])
title_label = str(title_topics.iloc[best_idx]["Label"])
alignment = ("Strong alignment" if score > 0.70 else "Moderate alignment" if score > 0.45 else "Weak alignment")
rows.append({"Abstract Topic": abs_label, "Title Topic": title_label, "Similarity": round(score, 4), "Notes": f"{alignment} (cosine={score:.3f})"})
comparison_df = pd.DataFrame(rows)
comparison_df.to_csv(output_path, index=False)
logger.info(f"βœ… comparison.csv written ({len(comparison_df)} rows)")
return comparison_df
# ---------------------------------------------------------------------------
# Narrative
# ---------------------------------------------------------------------------
def _fallback_narrative(abstract_result, title_result, comparison_df) -> str:
abs_n = 0 if abstract_result is None else len(abstract_result.topic_table)
title_n = 0 if title_result is None else len(title_result.topic_table)
def _novel(result):
if result is None or result.taxonomy_table.empty:
return 0
if "classification" not in result.taxonomy_table.columns:
return 0
return int((result.taxonomy_table["classification"] == "NOVEL").sum())
top_abs = (abstract_result.topic_table.sort_values("Count", ascending=False)["Label"].head(5).tolist()
if abstract_result and not abstract_result.topic_table.empty else [])
top_title = (title_result.topic_table.sort_values("Count", ascending=False)["Label"].head(5).tolist()
if title_result and not title_result.topic_table.empty else [])
abs_themes = []
if abstract_result:
abs_themes = [t.get("theme_name","") for t in abstract_result.themes_payload.get("phase5",[])[:8] if t.get("theme_name")]
strong_pairs = []
if not comparison_df.empty and "Similarity" in comparison_df.columns:
strong = comparison_df[comparison_df["Similarity"] > 0.60]
strong_pairs = [f"'{r['Abstract Topic']}' ↔ '{r['Title Topic']}' ({r['Similarity']:.2f})" for _, r in strong.head(5).iterrows()]
return f"""
This report synthesizes thematic topic modelling outcomes for Scopus metadata using a research-grade pipeline based on SPECTER2 document embeddings, UMAP dimensionality reduction, and HDBSCAN clustering. The analysis was run separately for abstracts and titles to reveal differences between detailed narrative content and concise paper framing language. A combined Title+Abstract stream was also analysed to capture the full bibliographic signal.
In the abstract stream, {abs_n} distinct topics were identified, while the title stream generated {title_n} topics. These clusters were further consolidated into higher-level themes using agglomerative clustering on cluster centroids, yielding {len(abs_themes)} themes from the abstract stream.
The abstract-derived topic space is dominated by: {', '.join(top_abs) if top_abs else 'insufficient abstract topics'}. Abstracts capture motivation, methods, and outcomes in full sentences, so the resulting clusters separate into fine-grained research niches.
Title-driven topics include: {', '.join(top_title) if top_title else 'insufficient title topics'}. Titles present compressed intent, often merging conceptually adjacent ideas that abstracts keep separate.
The identified themes from the abstract stream are: {'; '.join(abs_themes) if abs_themes else 'see themes.json for full list'}. These themes represent the major intellectual territories within this body of literature.
Taxonomy mapping against PAJAIS reference categories identified {_novel(abstract_result)} novel topic instances in the abstract stream and {_novel(title_result)} in the title stream. These represent potential interdisciplinary frontiers not captured by canonical categories.
Cross-stream alignment (embedding cosine similarity) reveals strongly aligned topic pairs: {'; '.join(strong_pairs) if strong_pairs else 'no strongly aligned pairs found (similarity > 0.60)'}. High-similarity pairs indicate stable thematic framing across metadata fields.
The pipeline implements Braun and Clarke thematic phases end-to-end: familiarisation, coding, candidate-theme development, review, naming, and PAJAIS taxonomy positioning. Future iterations should include domain-expert validation of labels and longitudinal tracking of emergent themes.
""".strip()
def generate_narrative(abstract_result, title_result, comparison_df, llm, output_path="narrative.txt") -> str:
output_path = str(Path(output_path).resolve())
abs_themes = []
if abstract_result:
abs_themes = [t.get("theme_name","") for t in abstract_result.themes_payload.get("phase5",[])[:6]]
if llm is None:
narrative = _fallback_narrative(abstract_result, title_result, comparison_df)
Path(output_path).write_text(narrative, encoding="utf-8")
return narrative
prompt = (
"Write an academic narrative of approximately 500 words for a research methods section. "
"Context: Topic modeling on Scopus bibliometric data using SPECTER2 embeddings, "
"UMAP + HDBSCAN clustering, and multi-LLM labeling (Groq + HuggingFace models). "
f"Abstract topic count: {0 if abstract_result is None else len(abstract_result.topic_table)}. "
f"Title topic count: {0 if title_result is None else len(title_result.topic_table)}. "
f"Identified themes: {', '.join(abs_themes[:6]) if abs_themes else 'see themes.json'}. "
"Discuss: methodology, key findings, thematic structure, novel topics, and research implications."
)
try:
response = llm.invoke(prompt)
narrative = str(getattr(response, "content", response)).strip()
if len(narrative.split()) < 350:
narrative = _fallback_narrative(abstract_result, title_result, comparison_df)
except Exception:
narrative = _fallback_narrative(abstract_result, title_result, comparison_df)
Path(output_path).write_text(narrative, encoding="utf-8")
logger.info(f"βœ… narrative.txt written ({len(narrative.split())} words)")
return narrative
# ---------------------------------------------------------------------------
# Entry points
# ---------------------------------------------------------------------------
def run_single_analysis(
mode: str,
csv_path: Optional[str] = None,
output_dir: str = ".",
progress_callback=None,
suggested_params: Optional[Dict[str, Any]] = None,
) -> "AnalysisResult":
output_dir = str(Path(output_dir).resolve())
files = ensure_output_artifacts(output_dir)
df, _ = load_scopus_csv(csv_path=csv_path, directory=output_dir)
llm = create_groq_llm(temperature=0.0)
col = {"abstract": "Abstract", "title": "Title", "combined": "Combined"}.get(mode.lower())
if col is None:
raise ValueError("mode must be 'abstract', 'title', or 'combined'.")
result = run_braun_clarke_pipeline(
df, col, llm=llm, output_dir=output_dir,
progress_callback=progress_callback, suggested_params=suggested_params,
)
for fname, default_fn in [
(files["comparison"], lambda: pd.DataFrame(columns=["Abstract Topic","Title Topic","Similarity","Notes"]).to_csv(files["comparison"], index=False)),
(files["narrative"], lambda: Path(files["narrative"]).write_text("Narrative placeholder.", encoding="utf-8")),
]:
if not Path(fname).exists():
default_fn()
return result
def run_full_pipeline(
csv_path: Optional[str] = None,
output_dir: str = ".",
progress_callback=None,
) -> Dict[str, Any]:
"""
Run the full 3-stream pipeline: Abstract β†’ Title β†’ Combined (Title+Abstract).
Returns a dict with keys: csv_path, abstract, title, combined, comparison, narrative, files.
"""
output_dir = str(Path(output_dir).resolve())
files = ensure_output_artifacts(output_dir)
df, resolved_csv = load_scopus_csv(csv_path=csv_path, directory=output_dir)
llm = create_groq_llm(temperature=0.0)
abstract_result = run_braun_clarke_pipeline(
df, "Abstract", llm=llm, output_dir=output_dir, progress_callback=progress_callback,
)
title_result = run_braun_clarke_pipeline(
df, "Title", llm=llm, output_dir=output_dir, progress_callback=progress_callback,
)
# ── NEW: Combined stream ──────────────────────────────────────────────────
combined_result = run_braun_clarke_pipeline(
df, "Combined", llm=llm, output_dir=output_dir, progress_callback=progress_callback,
)
# ─────────────────────────────────────────────────────────────────────────
comparison_df = compare_topic_frames(
abstract_result.topic_table, title_result.topic_table, output_path=files["comparison"],
)
narrative = generate_narrative(
abstract_result, title_result, comparison_df, llm=llm, output_path=files["narrative"],
)
return {
"csv_path": resolved_csv,
"abstract": abstract_result,
"title": title_result,
"combined": combined_result, # ← now populated
"comparison": comparison_df,
"narrative": narrative,
"files": files,
}
# Backward-compatibility alias
run_specter2_multi_llm_pipeline = run_full_pipeline