""" tools.py — Core pipeline for topic modelling on Scopus CSV data. FIXES IN THIS VERSION: 1. consolidate_themes() — Complete rewrite. Was producing 1 theme due to threshold=0.35 being too tight. Now uses AgglomerativeClustering with direct n_clusters target. Guaranteed 4-8 themes. 2. Theme balancing — Filters noise clusters (< MIN_THEME_PAPERS) before theming. Orphaned small clusters collected into "Niche Topics" group. Prevents 1 giant catch-all theme + 5 singletons. 3. Noise cluster detection — Post-clustering filter removes clusters whose top keywords are metadata garbage (conference years, editorial tokens etc.) 4. build_topic_table() — Updated column names to match labeler.py (hf_mistral_label, hf_zephyr_label) with backward-compat aliases. 5. name_themes_with_llm() — Better prompting, proper academic style. 6. create_taxonomy_map() — Weighted semantic scoring (exact + partial match). 7. upsert_json_bucket() — Added explicit flush + resolve logging so writes never silently fail. 8. run_braun_clarke_pipeline() — Passes embeddings+labels into consolidate_themes() so centroids are computed on real document vectors. 9. OUTPUT_DIR handling — All file writes use the resolved absolute path passed in from app.py, never a relative ".". 10. run_full_pipeline() — NOW includes Combined (Title+Abstract) analysis as third pipeline run; returns dict["combined"] so the UI can display it. """ import json import os import re import logging import hashlib from collections import Counter from dataclasses import dataclass from functools import lru_cache from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd import plotly.express as px from dotenv import load_dotenv from sklearn.cluster import AgglomerativeClustering from sklearn.feature_extraction.text import TfidfVectorizer, ENGLISH_STOP_WORDS from sklearn.metrics.pairwise import cosine_similarity from sklearn.preprocessing import normalize import cleaner load_dotenv() hf_token = os.getenv("HF_TOKEN", "").strip() if hf_token and not os.getenv("HUGGING_FACE_HUB_TOKEN"): os.environ["HUGGING_FACE_HUB_TOKEN"] = hf_token logger = logging.getLogger(__name__) SPECTER2_MODEL_REPO = "vvinayakkkkk/specter2-cached" STOPWORDS = set(ENGLISH_STOP_WORDS) # Minimum papers a cluster must have to participate in theme grouping. MIN_THEME_PAPERS = 10 # Tokens that indicate a cluster is metadata noise, not a research topic. METADATA_NOISE_TOKENS = { "conference", "international", "editorial", "guest", "proceedings", "workshop", "journal", "vol", "issue", "abstract", "available", "know", "tell", "king", "live", "flow", "long", "path", "features", "domains", "experiments", "dsp", "sixth", "2001", "2002", "2003", "2004", "2005", "2006", "2007", "2008", "2009", "2010", } PAJAIS_CATEGORIES = { "artificial intelligence and machine learning": { "ai", "artificial", "intelligence", "neural", "deep", "learning", "machine", "algorithm", "model", "prediction", "classification", "nlp", "language", "generative", "llm", "transformer", }, "data analytics and information systems": { "analytics", "data", "mining", "insight", "dashboard", "system", "information", "platform", "digital", "infrastructure", "database", "big", "cloud", "iot", "real", "time", }, "sustainability and environment": { "sustainability", "green", "environment", "climate", "carbon", "energy", "renewable", "emission", "ecological", "circular", }, "healthcare and medicine": { "health", "clinical", "patient", "medicine", "medical", "hospital", "drug", "disease", "treatment", "diagnosis", "telemedicine", "ehealth", }, "education and learning technology": { "education", "learning", "student", "teaching", "curriculum", "online", "elearning", "academic", "university", "pedagogy", }, "business strategy and management": { "business", "strategy", "management", "firm", "organization", "leadership", "performance", "competitive", "governance", "agile", }, "finance and economics": { "finance", "financial", "economic", "market", "investment", "banking", "cryptocurrency", "fintech", "stock", "portfolio", }, "supply chain and operations": { "supply", "logistics", "operations", "inventory", "manufacturing", "procurement", "lean", "warehouse", "distribution", "production", }, "social media and digital marketing": { "social", "media", "marketing", "digital", "brand", "consumer", "influencer", "advertising", "engagement", "content", "platform", }, "cybersecurity and risk": { "risk", "security", "privacy", "cyber", "resilience", "threat", "fraud", "vulnerability", "authentication", "blockchain", }, "human behavior and psychology": { "behavior", "psychology", "attitude", "intention", "motivation", "trust", "adoption", "perception", "cognitive", "satisfaction", }, "policy and governance": { "policy", "governance", "regulation", "public", "government", "compliance", "law", "equity", "ethics", "stakeholder", }, } @dataclass class AnalysisResult: source_column: str paper_count: int unit_count: int topic_table: pd.DataFrame taxonomy_table: pd.DataFrame distribution_fig: Any keyword_fig: Any labels_payload: Dict[str, Any] themes_payload: Dict[str, Any] taxonomy_payload: List[Dict[str, str]] suggested_params: Dict[str, Any] = None trial_log: List[Dict[str, Any]] = None paper_assignments: List[int] = None doc_row_indices: List[int] = None llm_suggested_params: Dict[str, Any] = None bayesian_best_params: Dict[str, Any] = None # --------------------------------------------------------------------------- # Embedding model # --------------------------------------------------------------------------- @lru_cache(maxsize=1) def get_embedding_model(model_name: str = "allenai/specter2"): logger.info(f"📥 Loading embedding model: {model_name}") if model_name != "allenai/specter2": raise ValueError("This pipeline only supports allenai/specter2 embeddings.") from adapters import AutoAdapterModel, Stack from transformers import AutoTokenizer class Specter2Encoder: def __init__(self): import torch self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info("🤖 Loading SPECTER2 tokenizer...") self.tokenizer = AutoTokenizer.from_pretrained("allenai/specter2_base") logger.info("🤖 Loading SPECTER2 base model...") self.model = AutoAdapterModel.from_pretrained("allenai/specter2_base") logger.info("🤖 Loading SPECTER2 adapter from cached repo...") self.model.load_adapter( SPECTER2_MODEL_REPO, source="hf", load_as="specter2", set_active=False ) self.model.set_active_adapters(Stack("specter2")) self.model.to(self.device) self.model.eval() logger.info("✅ SPECTER2 ready") def encode( self, documents: List[str], batch_size: int = 64, show_progress_bar: bool = False, normalize_embeddings: bool = True, ) -> np.ndarray: import torch logger.info(f"🔤 Encoding {len(documents)} documents (batch_size={batch_size})") embeddings: List[np.ndarray] = [] num_batches = (len(documents) + batch_size - 1) // batch_size for batch_idx, start in enumerate(range(0, len(documents), batch_size)): batch = documents[start : start + batch_size] logger.info(f" Batch [{batch_idx + 1}/{num_batches}] — {len(batch)} docs") inputs = self.tokenizer( batch, padding=True, truncation=True, return_tensors="pt", return_token_type_ids=False, max_length=512, ) inputs = {k: v.to(self.device) for k, v in inputs.items()} with torch.no_grad(): outputs = self.model(**inputs) batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy() embeddings.append(batch_embeddings) return np.vstack(embeddings) if embeddings else np.empty((0, 768), dtype=float) return Specter2Encoder() def warm_specter2_cache() -> None: logger.info("🔥 SPECTER2 cache warmup START") model = get_embedding_model("allenai/specter2") logger.info(f" warmup loaded model: {type(model).__name__}") logger.info("✅ SPECTER2 cache warmup COMPLETE") def _embedding_cache_key(documents: List[str], model_name: str, batch_size: int) -> str: hasher = hashlib.sha256() hasher.update(model_name.encode("utf-8")) hasher.update(str(batch_size).encode("utf-8")) for document in documents: hasher.update(b"\0") hasher.update(document.encode("utf-8", errors="ignore")) return hasher.hexdigest()[:16] def get_cached_embeddings( documents: List[str], model: Any, cache_dir: str = ".", model_name: str = "allenai/specter2", batch_size: int = 64, ) -> np.ndarray: cache_root = Path(cache_dir).resolve() / ".embedding_cache" cache_root.mkdir(parents=True, exist_ok=True) key = _embedding_cache_key(documents, model_name, batch_size) cache_file = cache_root / f"embeddings_{key}.npy" meta_file = cache_root / f"embeddings_{key}.json" if cache_file.exists(): logger.info(f"✅ Loading cached embeddings from {cache_file}") try: if meta_file.exists(): meta = json.loads(meta_file.read_text(encoding="utf-8")) logger.info(f" cache metadata: {meta}") except Exception as exc: logger.warning(f" cache metadata read failed: {exc}") return np.load(cache_file, allow_pickle=False) logger.info(f"🔢 Computing embeddings for {len(documents)} documents") embeddings = model.encode(documents, batch_size=batch_size) np.save(cache_file, embeddings) meta_file.write_text( json.dumps( { "model_name": model_name, "batch_size": batch_size, "document_count": len(documents), "embedding_shape": list(np.asarray(embeddings).shape), "cache_file": cache_file.name, }, indent=2, ensure_ascii=False, ), encoding="utf-8", ) logger.info(f"💾 Saved cached embeddings to {cache_file}") return np.asarray(embeddings) # --------------------------------------------------------------------------- # Artifact helpers # --------------------------------------------------------------------------- def ensure_output_artifacts(output_dir: str = ".") -> Dict[str, str]: base = Path(output_dir).resolve() base.mkdir(parents=True, exist_ok=True) paths = { "comparison": base / "comparison.csv", "taxonomy": base / "taxonomy_map.json", "narrative": base / "narrative.txt", "labels": base / "labels.json", "themes": base / "themes.json", } if not paths["comparison"].exists(): pd.DataFrame( columns=["Abstract Topic", "Title Topic", "Similarity", "Notes"] ).to_csv(paths["comparison"], index=False) if not paths["taxonomy"].exists(): paths["taxonomy"].write_text("[]", encoding="utf-8") if not paths["narrative"].exists(): paths["narrative"].write_text("Narrative placeholder.", encoding="utf-8") if not paths["labels"].exists(): paths["labels"].write_text("{}", encoding="utf-8") if not paths["themes"].exists(): paths["themes"].write_text("{}", encoding="utf-8") return {k: str(v) for k, v in paths.items()} def detect_csv_file(directory: str = ".") -> Optional[str]: excluded = {"comparison.csv"} csv_files = sorted( [p for p in Path(directory).iterdir() if p.is_file() and p.suffix.lower() == ".csv"] ) candidates = [p for p in csv_files if p.name.lower() not in excluded] for path in candidates: try: sample = pd.read_csv(path, nrows=1, low_memory=False) cols = {str(c).strip().lower() for c in sample.columns} if "abstract" in cols or "title" in cols or "matched_csv_title" in cols: return str(path) except Exception: continue if candidates: return str(candidates[0]) return str(csv_files[0]) if csv_files else None def load_scopus_csv( csv_path: Optional[str] = None, directory: str = "." ) -> Tuple[pd.DataFrame, str]: path = csv_path or detect_csv_file(directory) if not path: raise FileNotFoundError("No CSV file found in the working directory.") df = pd.read_csv(path, low_memory=False) # Normalise column names: strip BOM and whitespace df.columns = [c.strip().lstrip("\ufeff") for c in df.columns] # Resolve title column to "Title" if variant exists _title_variants = [ "Title", "title", "Article Title", "Document Title", "Paper Title", "matched_csv_title", "document_title", ] for variant in _title_variants: if variant in df.columns and variant != "Title": df = df.rename(columns={variant: "Title"}) break if "Abstract" not in df.columns: df["Abstract"] = "" if "Title" not in df.columns: df["Title"] = "" return df, str(path) # --------------------------------------------------------------------------- # Document preparation # --------------------------------------------------------------------------- def build_documents( df: pd.DataFrame, column: str, ) -> Tuple[List[str], List[str], int, List[int]]: series = df[column].dropna().astype(str) series = series[series.str.strip() != ""] logger.info(f"[{column}] build_documents start: {len(series)} non-empty source rows") embedding_docs: List[str] = [] tfidf_docs: List[str] = [] row_indices: List[int] = [] for idx, text in series.items(): emb = cleaner.prepare_for_embedding(text) if not emb: continue tfi = cleaner.prepare_for_tfidf(text) if not tfi: continue embedding_docs.append(emb) tfidf_docs.append(tfi) row_indices.append(int(idx)) logger.info( f"[{column}] build_documents complete: " f"{len(embedding_docs)} embedding docs / {len(tfidf_docs)} TF-IDF docs" ) return embedding_docs, tfidf_docs, len(series), row_indices def build_documents_legacy( df: pd.DataFrame, column: str ) -> Tuple[List[str], int]: emb, _, paper_count, _ = build_documents(df, column) return emb, paper_count def build_documents_combined(df: pd.DataFrame) -> Tuple[List[str], List[str], int, List[int]]: title_series = df.get("Title", pd.Series(dtype=str)).fillna("").astype(str) abstract_series = df.get("Abstract", pd.Series(dtype=str)).fillna("").astype(str) combined = (title_series.str.strip() + " \n " + abstract_series.str.strip()).str.strip() combined = combined[combined != ""] logger.info(f"[Combined] build_documents start: {len(combined)} non-empty source rows") embedding_docs: List[str] = [] tfidf_docs: List[str] = [] row_indices: List[int] = [] for idx, text in combined.items(): emb = cleaner.prepare_for_embedding(text) if not emb: continue tfi = cleaner.prepare_for_tfidf(text) if not tfi: continue embedding_docs.append(emb) tfidf_docs.append(tfi) row_indices.append(int(idx)) logger.info( f"[Combined] build_documents complete: " f"{len(embedding_docs)} embedding docs / {len(tfidf_docs)} TF-IDF docs" ) return embedding_docs, tfidf_docs, len(combined), row_indices # --------------------------------------------------------------------------- # Embedding # --------------------------------------------------------------------------- def embed_documents( documents: List[str], batch_size: int = 64, cache_dir: str = ".", ) -> np.ndarray: if not documents: return np.empty((0, 768), dtype=float) logger.info(f"embed_documents start: {len(documents)} documents") model = get_embedding_model("allenai/specter2") embeddings = get_cached_embeddings( documents, model, cache_dir=cache_dir, model_name="allenai/specter2", batch_size=batch_size, ) logger.info(f"embed_documents complete: output shape={np.array(embeddings).shape}") return normalize(np.array(embeddings), norm="l2") # --------------------------------------------------------------------------- # Noise cluster detection # --------------------------------------------------------------------------- def is_noise_cluster(keywords: List[str], threshold: float = 0.55) -> bool: if not keywords: return True check = keywords[:6] noise_count = sum(1 for kw in check if kw.lower().strip() in METADATA_NOISE_TOKENS) return (noise_count / len(check)) >= threshold # --------------------------------------------------------------------------- # Clustering — UMAP + HDBSCAN # --------------------------------------------------------------------------- def cluster_embeddings_umap_hdbscan( embeddings: np.ndarray, min_cluster_size: int = 5, min_samples: int = 1, n_neighbors: int = 15, min_dist: float = 0.0, cluster_selection_method: str = "eom", cluster_selection_epsilon: float = 0.0, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: if len(embeddings) == 0: return np.array([], dtype=int), np.array([]), np.array([]) if len(embeddings) == 1: return np.array([0], dtype=int), np.array([1.0]), np.array([]) try: from umap import UMAP import hdbscan logger.info(f"UMAP reduction (n_neighbors={n_neighbors}, min_dist={min_dist})...") umap_model = UMAP( n_neighbors=min(n_neighbors, len(embeddings) - 1), min_dist=min_dist, metric="cosine", n_components=min(8, len(embeddings) - 1), random_state=42, ) reduced = umap_model.fit_transform(embeddings) logger.info(f"HDBSCAN clustering (min_cluster_size={min_cluster_size})...") clusterer = hdbscan.HDBSCAN( min_cluster_size=min(min_cluster_size, max(2, len(embeddings) // 5)), min_samples=min_samples, metric="euclidean", prediction_data=True, cluster_selection_method=cluster_selection_method, cluster_selection_epsilon=cluster_selection_epsilon, ) labels = clusterer.fit_predict(reduced) try: probabilities = np.asarray(clusterer.probabilities_) except Exception: probabilities = np.array([]) try: persistence_scores = np.asarray(clusterer.cluster_persistence_) except Exception: persistence_scores = np.array([]) return labels, probabilities, persistence_scores except ImportError as e: logger.error(f"UMAP or HDBSCAN not installed: {e}") return cluster_embeddings(embeddings, threshold=0.7) def control_cluster_count( embeddings: np.ndarray, labels: np.ndarray, min_clusters: int = 15, max_clusters: int = 30, ) -> np.ndarray: unique_labels = np.unique(labels) current_count = len(unique_labels[unique_labels >= 0]) logger.info(f"Cluster count: {current_count} (target: {min_clusters}–{max_clusters})") if min_clusters <= current_count <= max_clusters: return labels if current_count > max_clusters: valid = unique_labels[unique_labels >= 0] centroids = np.vstack([embeddings[labels == lbl].mean(axis=0) for lbl in valid]) centroids = normalize(centroids, norm="l2") n_target = min(max_clusters, len(centroids)) try: merger = AgglomerativeClustering(n_clusters=n_target, metric="cosine", linkage="average") except TypeError: merger = AgglomerativeClustering(n_clusters=n_target, affinity="cosine", linkage="average") merged = merger.fit_predict(centroids) label_map = {int(valid[i]): int(merged[i]) for i in range(len(valid))} return np.array([label_map.get(int(l), 0) for l in labels], dtype=int) if current_count < min_clusters: new_min_size = max(2, min_clusters // 5) labels, probs, persist = cluster_embeddings_umap_hdbscan( embeddings, min_cluster_size=new_min_size, n_neighbors=min(15, len(embeddings) - 1), ) return labels return labels def cluster_embeddings(embeddings: np.ndarray, threshold: float = 0.7) -> np.ndarray: if len(embeddings) == 0: return np.array([], dtype=int) if len(embeddings) == 1: return np.array([0], dtype=int) try: clustering = AgglomerativeClustering( n_clusters=None, metric="cosine", linkage="average", distance_threshold=threshold, ) except TypeError: clustering = AgglomerativeClustering( n_clusters=None, affinity="cosine", linkage="average", distance_threshold=threshold, ) return clustering.fit_predict(embeddings) def reduce_topic_count( embeddings: np.ndarray, labels: np.ndarray, min_topics: int = 50, max_topics: int = 120, ) -> np.ndarray: if len(labels) == 0: return labels unique_labels = np.unique(labels) if len(unique_labels) <= max_topics: return labels topic_to_idx = {int(lbl): idx for idx, lbl in enumerate(unique_labels)} centroids = np.vstack([embeddings[labels == lbl].mean(axis=0) for lbl in unique_labels]) centroids = normalize(centroids, norm="l2") target = min(max(min_topics, min(len(unique_labels), max_topics)), len(centroids)) try: reducer = AgglomerativeClustering(n_clusters=target, metric="cosine", linkage="average") except TypeError: reducer = AgglomerativeClustering(n_clusters=target, affinity="cosine", linkage="average") reduced_ids = reducer.fit_predict(centroids) remap = {int(lbl): int(reduced_ids[idx]) for lbl, idx in topic_to_idx.items()} return np.array([remap[int(lbl)] for lbl in labels], dtype=int) # --------------------------------------------------------------------------- # LLM-guided parameter suggestion # --------------------------------------------------------------------------- def suggest_clustering_params_with_llm( n_docs: int, sample_keywords: List[str], llm: Any = None, ) -> Dict[str, Any]: defaults = { "n_neighbors": 15, "n_components": 8, "min_cluster_size": max(5, n_docs // 100), "min_samples": 1, "cluster_selection_method": "eom", "cluster_selection_epsilon": 0.0, "reasoning": "default", } prompt = ( f"Corpus size: {n_docs}\n" f"Sample keywords: {', '.join(sample_keywords[:20])}\n\n" "Recommend UMAP and HDBSCAN parameters for clustering academic papers using SPECTER2 embeddings. " "Return a valid JSON object with only the keys: n_neighbors, n_components, min_cluster_size, " "min_samples, cluster_selection_method, cluster_selection_epsilon." ) if llm is None: try: llm = create_groq_llm(temperature=0.0) except Exception: llm = None try: if llm is not None: try: from langchain_core.messages import HumanMessage, SystemMessage resp = llm.invoke([ SystemMessage(content="Respond with ONLY a raw JSON object and nothing else."), HumanMessage(content=prompt), ]) except Exception: resp = llm.invoke(prompt) content = getattr(resp, "content", str(resp)) else: raise Exception("No LLM available") jtext = str(content).strip() if not jtext: raise ValueError("Empty LLM response") jtext = re.sub(r"^```(?:json)?\s*", "", jtext, flags=re.IGNORECASE) jtext = re.sub(r"\s*```$", "", jtext) parsed = json.loads(jtext) params = { "n_neighbors": int(parsed.get("n_neighbors", defaults["n_neighbors"])), "n_components": int(parsed.get("n_components", defaults["n_components"])), "min_cluster_size": int(parsed.get("min_cluster_size", defaults["min_cluster_size"])), "min_samples": int(parsed.get("min_samples", defaults["min_samples"])), "cluster_selection_method": str(parsed.get("cluster_selection_method", defaults["cluster_selection_method"])), "cluster_selection_epsilon": float(parsed.get("cluster_selection_epsilon", defaults["cluster_selection_epsilon"])), "reasoning": content[:1000], } except Exception as e: logger.warning(f"suggest_clustering_params_with_llm failed: {e}") params = defaults params["n_neighbors"] = max(5, min(50, int(params.get("n_neighbors", 15)))) params["n_components"] = max(5, min(10, int(params.get("n_components", 8)))) min_size_lower = max(5, n_docs // 100) min_size_upper = max(20, n_docs // 20) params["min_cluster_size"] = max(min_size_lower, min(min_size_upper, int(params.get("min_cluster_size", min_size_lower)))) params["min_samples"] = max(1, min(int(params.get("min_samples", 1)), int(params["min_cluster_size"]))) params["cluster_selection_method"] = params.get("cluster_selection_method", "eom") if params.get("cluster_selection_method") in ("eom", "leaf") else "eom" params["cluster_selection_epsilon"] = max(0.0, min(0.3, float(params.get("cluster_selection_epsilon", 0.0)))) return params # --------------------------------------------------------------------------- # Bayesian optimisation (Optuna) over UMAP + HDBSCAN # --------------------------------------------------------------------------- def run_bayesian_hdbscan_optimization( embeddings: np.ndarray, n_trials: int = 50, warm_start_params: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: try: import optuna except Exception as e: logger.error(f"Optuna not available: {e}") return {"best_params": {}, "best_score": -1.0, "n_valid_trials": 0, "trial_log": []} import hdbscan from hdbscan.validity import validity_index from umap import UMAP n_docs = len(embeddings) trial_log: List[Dict[str, Any]] = [] umap_cache: Dict[Tuple[int, int], np.ndarray] = {} def objective(trial: optuna.Trial): params = { "n_neighbors": trial.suggest_categorical("n_neighbors", [5, 10, 15, 25, 35, 50]), "n_components": trial.suggest_int("n_components", 5, 10), "min_cluster_size": trial.suggest_int("min_cluster_size", max(5, n_docs // 100), max(20, n_docs // 20)), "min_samples": trial.suggest_int("min_samples", 1, 20), "cluster_selection_method": trial.suggest_categorical("cluster_selection_method", ["eom", "leaf"]), "cluster_selection_epsilon": trial.suggest_float("cluster_selection_epsilon", 0.0, 0.3), } if params["min_samples"] > params["min_cluster_size"]: params["min_samples"] = params["min_cluster_size"] cache_key = (int(params["n_neighbors"]), int(params["n_components"])) try: if cache_key in umap_cache: reduced = umap_cache[cache_key] else: umap_model = UMAP( n_neighbors=min(params["n_neighbors"], len(embeddings) - 1), n_components=params["n_components"], min_dist=0.0, metric="cosine", random_state=42, ) reduced = umap_model.fit_transform(embeddings) umap_cache[cache_key] = reduced clusterer = hdbscan.HDBSCAN( min_cluster_size=min(params["min_cluster_size"], max(2, len(embeddings) // 5)), min_samples=params["min_samples"], metric="euclidean", prediction_data=True, cluster_selection_method=params["cluster_selection_method"], cluster_selection_epsilon=params["cluster_selection_epsilon"], ) labels = clusterer.fit_predict(reduced) persistence = np.asarray(getattr(clusterer, "cluster_persistence_", np.array([]))) except Exception as e: logger.warning(f"Trial failed to cluster: {e}") return -1.0 unique, counts = np.unique(labels, return_counts=True) max_frac = 0.0 n_valid = len([lbl for lbl in unique if lbl != -1]) for c, cnt in zip(unique, counts): if c == -1: continue frac = float(cnt) / max(1, n_docs) if frac > max_frac: max_frac = frac noise_count = int((labels == -1).sum()) target_min_clusters = max(10, n_docs // 120) target_max_clusters = max(35, n_docs // 35) passed = True score = -1.0 persistence_mean = float(np.mean(persistence)) if len(persistence) > 0 else 0.0 dbcv = 0.0 if max_frac > 0.25: passed = False elif any((labels == lbl).sum() < 5 for lbl in unique if lbl != -1): passed = False elif n_valid < target_min_clusters or n_valid > target_max_clusters: passed = False else: try: dbcv = validity_index(embeddings, labels) except Exception: dbcv = 0.0 score = 0.6 * persistence_mean + 0.4 * float(dbcv) trial_log.append({ "trial": len(trial_log) + 1, **params, "n_clusters": int(len(unique)), "n_valid": int(n_valid), "max_frac": float(max_frac), "noise_count": int(noise_count), "persistence": float(persistence_mean), "dbcv": float(dbcv), "constraints_passed": bool(passed), "score": float(score), }) return float(score) study = optuna.create_study(direction="maximize") if warm_start_params: try: study.enqueue_trial({ k: warm_start_params[k] for k in warm_start_params if k in ["n_neighbors","n_components","min_cluster_size","min_samples", "cluster_selection_method","cluster_selection_epsilon"] }) except Exception: pass study.optimize(objective, n_trials=n_trials) try: best = study.best_trial best_params = best.params best_score = float(best.value) except Exception: best_params = {} best_score = -1.0 n_valid_count = sum(1 for t in trial_log if t.get("constraints_passed")) return {"best_params": best_params, "best_score": best_score, "n_valid_trials": n_valid_count, "trial_log": trial_log} # --------------------------------------------------------------------------- # Keyword extraction — TF-IDF # --------------------------------------------------------------------------- def extract_topic_keywords( tfidf_docs: List[str], labels: np.ndarray, top_n: int = 10, ) -> Dict[int, List[str]]: if not tfidf_docs or len(labels) == 0: return {} grouped: Dict[int, List[str]] = {} for doc, label in zip(tfidf_docs, labels): grouped.setdefault(int(label), []).append(doc) cluster_ids = sorted(grouped.keys()) pseudo_docs = [" ".join(grouped[cid]) for cid in cluster_ids] vectorizer = TfidfVectorizer( stop_words="english", ngram_range=(1, 2), max_features=10_000, min_df=1, sublinear_tf=True, ) try: matrix = vectorizer.fit_transform(pseudo_docs) except ValueError: return {cid: [] for cid in cluster_ids} vocab = np.array(vectorizer.get_feature_names_out()) keywords: Dict[int, List[str]] = {} for row_idx, cid in enumerate(cluster_ids): scores = np.asarray(matrix[row_idx].todense()).ravel() top_idx = scores.argsort()[::-1][:top_n] words = [str(vocab[i]) for i in top_idx if scores[i] > 0] keywords[cid] = words return keywords # --------------------------------------------------------------------------- # Topic table builder # --------------------------------------------------------------------------- def build_topic_table( tfidf_docs: List[str], labels: np.ndarray, keywords: Dict[int, List[str]], multi_llm_data: Optional[Dict[int, Dict[str, str]]] = None, probabilities: Optional[np.ndarray] = None, persistence_scores: Optional[np.ndarray] = None, ) -> pd.DataFrame: if not tfidf_docs or len(labels) == 0: return pd.DataFrame(columns=[ "Topic ID", "Keywords", "groq_label", "mistral_label", "openrouter_label", "cohere_label", "hf_mistral_label", "hf_zephyr_label", "flan_label", "gemini_label", "Label", "cluster_size", "Count", "strong_count", "weak_count", "persistence", ]) counts = pd.Series(labels).value_counts().sort_values(ascending=False) rows = [] for topic_id, count in counts.items(): terms = keywords.get(int(topic_id), []) default_label = " ".join(terms[:4]).strip().title() if terms else f"Topic {int(topic_id)}" llm = (multi_llm_data or {}).get(int(topic_id), {}) groq_lbl = llm.get("groq_label", "") mistral_lbl = llm.get("mistral_label", "") or llm.get("hf_mistral_label", "") or llm.get("flan_label", "") openrouter_lbl = llm.get("openrouter_label", "") cohere_lbl = llm.get("cohere_label", "") hf_z_lbl = llm.get("hf_zephyr_label", "") or llm.get("gemini_label", "") if probabilities is None or len(probabilities) == 0: strong_count = int(count) else: mask = np.array(labels) == int(topic_id) strong_count = int((np.asarray(probabilities)[mask] >= 0.5).sum()) if mask.any() else 0 weak_count = int(count) - int(strong_count) try: persistence = float(persistence_scores[int(topic_id)]) if (persistence_scores is not None and len(persistence_scores) > int(topic_id)) else 0.0 except Exception: persistence = 0.0 rows.append({ "Topic ID": int(topic_id), "Keywords": ", ".join(terms), "groq_label": groq_lbl, "mistral_label": mistral_lbl, "openrouter_label": openrouter_lbl, "cohere_label": cohere_lbl, "hf_mistral_label": mistral_lbl, "hf_zephyr_label": hf_z_lbl, "flan_label": mistral_lbl, "gemini_label": hf_z_lbl, "Label": default_label, "cluster_size": int(count), "Count": int(count), "strong_count": int(strong_count), "weak_count": int(weak_count), "persistence": float(persistence), }) return pd.DataFrame(rows) # --------------------------------------------------------------------------- # LLM helpers # --------------------------------------------------------------------------- def create_groq_llm(temperature: float = 0.0): api_key = os.getenv("GROQ_API_KEY", "").strip() if not api_key: logger.warning("⚠️ No GROQ_API_KEY found") return None try: from langchain_groq import ChatGroq llm = ChatGroq(model="llama-3.3-70b-versatile", temperature=temperature, max_tokens=200) logger.info("✓ Groq LLM created (llama-3.3-70b-versatile)") return llm except Exception as e: logger.warning(f"Failed to create Groq LLM: {e}") return None def _clean_llm_label(text: str, fallback: str) -> str: text = re.sub(r"[\n\r]+", " ", str(text)).strip() text = re.sub(r"^\s*\d+[\.\:\)]\s*", "", text) text = re.sub(r"^\s*[-\*•]\s*", "", text) text = re.sub(r'^["\'](.+)["\']$', r"\1", text.strip()) text = re.sub(r"^(?:the\s+)?(?:topic\s+)?(?:label|name|title)\s*[:–-]\s*", "", text, flags=re.IGNORECASE) text = re.sub(r"[^a-zA-Z0-9\-\s]", "", text) compact = " ".join(text.split()) words = compact.split() if len(words) > 8: compact = " ".join(words[:6]) return compact[:80] if compact else fallback def label_topics_with_llm( topic_table: pd.DataFrame, topic_keywords: Dict[int, List[str]], llm: Any, ) -> Dict[int, str]: if topic_table.empty: return {} labels_out: Dict[int, str] = {} llm_failed = False for _, row in topic_table.iterrows(): topic_id = int(row["Topic ID"]) keywords = topic_keywords.get(topic_id, []) fallback = str(row["Label"]).strip() or f"Topic {topic_id}" if llm is None or llm_failed: labels_out[topic_id] = fallback continue kw_str = ", ".join(keywords[:8]) prompt = ( f"Research cluster keywords: {kw_str}\n\n" "Generate a concise academic topic label (3-6 words). " "Output ONLY the label — no explanation, no punctuation, no quotes." ) try: response = llm.invoke(prompt) content = getattr(response, "content", str(response)) labels_out[topic_id] = _clean_llm_label(content, fallback) except Exception as e: logger.warning(f"LLM labeling failed for topic {topic_id}: {e}") labels_out[topic_id] = fallback llm_failed = True return labels_out # --------------------------------------------------------------------------- # JSON helpers # --------------------------------------------------------------------------- def upsert_json_bucket(file_path: str, key: str, payload: Any) -> None: path = Path(file_path).resolve() logger.info(f"💾 upsert_json_bucket → {path} [key={key}]") if path.exists(): try: existing = json.loads(path.read_text(encoding="utf-8")) if not isinstance(existing, dict): existing = {} except json.JSONDecodeError: existing = {} else: existing = {} existing[key] = payload path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(existing, indent=2, ensure_ascii=False), encoding="utf-8") logger.info(f"✅ Saved {len(existing)} keys → {path.name} ({path.stat().st_size} bytes)") # --------------------------------------------------------------------------- # Theme consolidation # --------------------------------------------------------------------------- def consolidate_themes( topic_table: pd.DataFrame, embeddings: Optional[np.ndarray] = None, labels: Optional[np.ndarray] = None, min_themes: int = 4, max_themes: int = 8, ) -> List[Dict[str, Any]]: if topic_table.empty: return [] n_topics = len(topic_table) logger.info(f"🌿 consolidate_themes: {n_topics} clusters → {min_themes}–{max_themes} themes") substantial = topic_table[topic_table["Count"] >= MIN_THEME_PAPERS].copy() niche = topic_table[topic_table["Count"] < MIN_THEME_PAPERS].copy() logger.info( f" Substantial (≥{MIN_THEME_PAPERS} papers): {len(substantial)} | " f"Niche: {len(niche)}" ) themes: List[Dict[str, Any]] = [] if not substantial.empty: n_sub = len(substantial) large_count = int((substantial["Count"] >= 20).sum()) target = max(min_themes, min(max_themes, max(large_count // 3, 3))) target = min(target, n_sub) target = max(target, 1) logger.info(f" Large clusters (≥20 papers): {large_count} → target themes: {target}") cluster_emb_matrix: Optional[np.ndarray] = None if embeddings is not None and labels is not None and len(embeddings) > 0: logger.info(" Computing centroids from document embeddings...") centroids = [] for tid in substantial["Topic ID"].tolist(): mask = labels == tid if mask.any(): centroids.append(embeddings[mask].mean(axis=0)) else: centroids.append(np.zeros(embeddings.shape[1])) cluster_emb_matrix = normalize(np.vstack(centroids), norm="l2") else: texts = ( substantial["Label"].astype(str).fillna("") + " " + substantial["Keywords"].astype(str).fillna("") ).tolist() try: cluster_emb_matrix = embed_documents(texts) except Exception as e: logger.warning(f" Embedding failed: {e}") if cluster_emb_matrix is not None and len(cluster_emb_matrix) >= target: if target == 1: theme_labels_arr = np.zeros(n_sub, dtype=int) else: try: themer = AgglomerativeClustering(n_clusters=target, metric="cosine", linkage="average") except TypeError: themer = AgglomerativeClustering(n_clusters=target, affinity="cosine", linkage="average") theme_labels_arr = themer.fit_predict(cluster_emb_matrix) logger.info(f" Theme clustering → {len(np.unique(theme_labels_arr))} groups") else: logger.warning(" Fallback: round-robin theme assignment") theme_labels_arr = np.array([i % target for i in range(n_sub)], dtype=int) substantial = substantial.copy() substantial["_theme_id"] = theme_labels_arr for theme_id, frame in substantial.groupby("_theme_id"): sorted_frame = frame.sort_values("Count", ascending=False) combined_kw = " ".join(sorted_frame["Keywords"].astype(str).tolist()) token_counts = Counter( w for w in re.findall(r"[a-zA-Z]{3,}", combined_kw.lower()) if w not in STOPWORDS ) theme_keywords = [term for term, _ in token_counts.most_common(12) if len(term) > 2][:8] dominant_label = sorted_frame.iloc[0]["Label"] themes.append({ "theme_id": int(theme_id), "theme_name": str(dominant_label), "topic_ids": sorted_frame["Topic ID"].astype(int).tolist(), "topic_labels": sorted_frame["Label"].astype(str).tolist(), "keywords": theme_keywords, "topic_count": int(len(sorted_frame)), "paper_count": int(sorted_frame["Count"].sum()), }) if not niche.empty: combined_kw = " ".join(niche["Keywords"].astype(str).tolist()) token_counts = Counter( w for w in re.findall(r"[a-zA-Z]{3,}", combined_kw.lower()) if w not in STOPWORDS ) niche_keywords = [term for term, _ in token_counts.most_common(10) if len(term) > 2][:8] themes.append({ "theme_id": 99, "theme_name": "Niche and Emerging Topics", "topic_ids": niche["Topic ID"].astype(int).tolist(), "topic_labels": niche["Label"].astype(str).tolist(), "keywords": niche_keywords, "topic_count": int(len(niche)), "paper_count": int(niche["Count"].sum()), }) themes = sorted(themes, key=lambda x: x["paper_count"], reverse=True) logger.info(f"✅ Generated {len(themes)} themes") return themes def review_themes(themes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: return [{**t, "meaningful": len(t.get("keywords", [])) >= 3} for t in themes] def name_themes_with_llm(themes: List[Dict[str, Any]], llm: Any) -> List[Dict[str, Any]]: if not themes: return themes named = [] llm_failed = False for theme in themes: fallback = theme["theme_name"] if theme.get("theme_id") == 99: named.append(theme) continue if llm is None or llm_failed: named.append(theme) continue topic_labels_str = "; ".join(theme.get("topic_labels", [])[:6]) kw_str = ", ".join(theme.get("keywords", [])[:8]) prompt = ( "You are naming a high-level research theme that groups related academic topics.\n\n" f"Topics in this theme: {topic_labels_str}\n" f"Common keywords: {kw_str}\n\n" "Generate a concise academic theme name (3-7 words). " "Output ONLY the theme name — no explanation, no quotes, no punctuation, no numbering." ) try: response = llm.invoke(prompt) content = getattr(response, "content", str(response)) new_name = _clean_llm_label(content, fallback) named.append({**theme, "theme_name": new_name}) except Exception as e: logger.warning(f"Theme naming failed: {e}") named.append(theme) llm_failed = True return named # --------------------------------------------------------------------------- # Taxonomy mapping # --------------------------------------------------------------------------- def create_taxonomy_map(topic_table: pd.DataFrame, source_column: str) -> List[Dict[str, str]]: if topic_table.empty: return [] mappings: List[Dict[str, str]] = [] for _, row in topic_table.iterrows(): topic_name = str(row["Label"]) text_blob = f"{row['Label']} {row['Keywords']}".lower() tokens = set(re.findall(r"[a-z]{3,}", text_blob)) best_category = None best_score = 0 for category, kws in PAJAIS_CATEGORIES.items(): exact = len(tokens.intersection(kws)) partial = sum(1 for kw in kws if any(kw in tok or tok in kw for tok in tokens if len(tok) > 3)) score = exact * 2 + partial if score > best_score: best_score = score best_category = category if best_category and best_score >= 2: mappings.append({ "source": source_column, "topic": topic_name, "classification": "MAPPED", "domain": best_category, "reason": f"Matches '{best_category}' with semantic score={best_score}.", }) else: mappings.append({ "source": source_column, "topic": topic_name, "classification": "NOVEL", "domain": "Emerging / Interdisciplinary", "reason": "No strong overlap with predefined PAJAIS taxonomy categories.", }) return mappings # --------------------------------------------------------------------------- # Charts # --------------------------------------------------------------------------- def create_topic_distribution_chart(topic_table: pd.DataFrame, source_column: str): if topic_table.empty: return px.bar(title=f"No topics available for {source_column}") frame = topic_table.sort_values("Count", ascending=False).head(30) return px.bar(frame, x="Label", y="Count", title=f"Topic Distribution — {source_column}") def create_keyword_chart(topic_table: pd.DataFrame, source_column: str): if topic_table.empty: return px.bar(title=f"No keywords available for {source_column}") keyword_counter: Counter = Counter() for _, row in topic_table.iterrows(): terms = [t.strip() for t in str(row["Keywords"]).split(",") if t.strip()] weight = int(row["Count"]) keyword_counter.update({term: weight for term in terms[:8]}) if not keyword_counter: return px.bar(title=f"No keywords available for {source_column}") key_df = pd.DataFrame(keyword_counter.items(), columns=["Keyword", "Weight"]) key_df = key_df.sort_values("Weight", ascending=False).head(30) return px.bar(key_df, x="Keyword", y="Weight", title=f"Keyword Salience — {source_column}") # --------------------------------------------------------------------------- # Main pipeline # --------------------------------------------------------------------------- def run_braun_clarke_pipeline( df: pd.DataFrame, source_column: str, llm: Any, output_dir: str = ".", use_multi_llm: bool = True, progress_callback=None, suggested_params: Optional[Dict[str, Any]] = None, ) -> "AnalysisResult": output_dir = str(Path(output_dir).resolve()) def _progress(msg: str): logger.info(msg) if progress_callback is not None: try: progress_callback(msg) except Exception: pass files = ensure_output_artifacts(output_dir) # Step 1: Build documents _progress(f"[{source_column}] 📝 Step 1/7 — Building documents...") if source_column.lower() == "combined": embedding_docs, tfidf_docs, paper_count, row_indices = build_documents_combined(df) else: embedding_docs, tfidf_docs, paper_count, row_indices = build_documents(df, source_column) _progress(f"[{source_column}] ✓ {paper_count} papers → {len(embedding_docs)} valid documents") if not embedding_docs: empty_table = pd.DataFrame(columns=[ "Topic ID", "Keywords", "groq_label", "hf_mistral_label", "hf_zephyr_label", "flan_label", "gemini_label", "Label", "cluster_size", "Count", ]) return AnalysisResult( source_column=source_column, paper_count=paper_count, unit_count=0, topic_table=empty_table, taxonomy_table=pd.DataFrame(columns=["source","topic","classification","domain","reason"]), distribution_fig=create_topic_distribution_chart(empty_table, source_column), keyword_fig=create_keyword_chart(empty_table, source_column), labels_payload={}, themes_payload={}, taxonomy_payload=[], suggested_params=suggested_params or {}, trial_log=[], paper_assignments=[], doc_row_indices=[], llm_suggested_params={}, bayesian_best_params={}, ) # Step 2: Embed _progress(f"[{source_column}] 🔢 Step 2/7 — Embedding {len(embedding_docs)} documents with SPECTER2...") embeddings = embed_documents(embedding_docs, cache_dir=output_dir) _progress(f"[{source_column}] ✓ Embeddings shape: {embeddings.shape}") # Step 3: Cluster _progress(f"[{source_column}] 📊 Step 3/7 — Clustering with UMAP + HDBSCAN...") if suggested_params is None: try: tfidf_vectorizer = TfidfVectorizer(stop_words="english", ngram_range=(1,2), max_features=2000) corpus_texts = ( (df.get("Title", pd.Series(dtype=str)).fillna("").astype(str).str.strip() + " \n " + df.get("Abstract", pd.Series(dtype=str)).fillna("").astype(str).str.strip()).tolist() if source_column.lower() == "combined" else df[source_column].dropna().astype(str).tolist() ) tfidf_matrix = tfidf_vectorizer.fit_transform(corpus_texts) vocab = np.array(tfidf_vectorizer.get_feature_names_out()) summed = np.asarray(tfidf_matrix.sum(axis=0)).ravel() top_idx = summed.argsort()[::-1][:40] sample_keywords = [str(vocab[i]) for i in top_idx if summed[i] > 0][:40] except Exception: sample_keywords = [] suggested_params = suggest_clustering_params_with_llm(len(embedding_docs), sample_keywords, llm=llm) _progress(f"[{source_column}] LLM-suggested params: {suggested_params}") llm_suggested_params = {k: v for k, v in suggested_params.items() if k != "reasoning"} opt_result = run_bayesian_hdbscan_optimization(embeddings, n_trials=80, warm_start_params=suggested_params) _progress(f"[{source_column}] Bayesian best_score={opt_result.get('best_score')} n_valid={opt_result.get('n_valid_trials')}") best = opt_result.get("best_params") or {} params_to_use = best if (best and opt_result.get("best_score", -1.0) > -1.0) else { "n_neighbors": 15, "min_cluster_size": 5, "min_samples": 1, "min_dist": 0.0, "n_components": 8, "cluster_selection_method": "eom", "cluster_selection_epsilon": 0.0, } bayesian_best_params = dict(params_to_use) try: labels, probabilities, persistence_scores = cluster_embeddings_umap_hdbscan( embeddings, min_cluster_size=int(params_to_use.get("min_cluster_size", 5)), min_samples=int(params_to_use.get("min_samples", 1)), n_neighbors=int(params_to_use.get("n_neighbors", min(15, len(embeddings)-1))), min_dist=float(params_to_use.get("min_dist", 0.0)), cluster_selection_method=str(params_to_use.get("cluster_selection_method", "eom")), cluster_selection_epsilon=float(params_to_use.get("cluster_selection_epsilon", 0.0)), ) except Exception as e: _progress(f"[{source_column}] Clustering failed: {e} — falling back to defaults") labels, probabilities, persistence_scores = cluster_embeddings_umap_hdbscan( embeddings, min_cluster_size=5, n_neighbors=min(15, len(embeddings) - 1), min_dist=0.0, ) n_clusters = len(np.unique(labels)) if opt_result.get("best_score", -1.0) <= -1.0: labels = control_cluster_count(embeddings, labels, min_clusters=15, max_clusters=30) n_clusters = len(np.unique(labels)) _progress(f"[{source_column}] ✓ {n_clusters} clusters found") # Step 4: Keywords _progress(f"[{source_column}] 🔑 Step 4/7 — Extracting TF-IDF keywords...") topic_keywords = extract_topic_keywords(tfidf_docs, labels, top_n=10) noise_cluster_ids = {cid for cid, kws in topic_keywords.items() if is_noise_cluster(kws)} if noise_cluster_ids: valid_ids = [cid for cid in topic_keywords if cid not in noise_cluster_ids] if valid_ids: largest_valid = max(valid_ids, key=lambda cid: int((labels == cid).sum())) labels = np.array( [largest_valid if int(l) in noise_cluster_ids else int(l) for l in labels], dtype=int ) topic_keywords = extract_topic_keywords(tfidf_docs, labels, top_n=10) n_clusters = len(np.unique(labels)) _progress(f"[{source_column}] ✓ After noise removal: {n_clusters} clusters") _progress(f"[{source_column}] ✓ Keywords extracted for {len(topic_keywords)} topics") # Step 5: Multi-LLM labeling multi_llm_data: Dict[int, Dict[str, str]] = {} topic_table = build_topic_table(tfidf_docs, labels, topic_keywords, multi_llm_data, probabilities=probabilities, persistence_scores=persistence_scores) if use_multi_llm: _progress(f"[{source_column}] 🏛️ Step 5/7 — Batch Multi-LLM Council labeling...") try: import llm_council topics_batch = [(int(row["Topic ID"]), topic_keywords.get(int(row["Topic ID"]), [])) for _, row in topic_table.iterrows()] n_topics = len(topics_batch) token_budget = min(60 * n_topics, 8000) if n_topics > 60: chunked: Dict[int, Dict[str, str]] = {} for chunk_start in range(0, n_topics, 50): chunk = topics_batch[chunk_start:chunk_start + 50] result = llm_council.run_council_batch( topics=chunk, system_prompt="You are an expert academic research analyst.", max_tokens=token_budget, ) for tid, plabels in result.items(): if plabels: chunked.setdefault(tid, {}).update(plabels) batch_result = chunked else: batch_result = llm_council.run_council_batch( topics=topics_batch, system_prompt="You are an expert academic research analyst.", max_tokens=token_budget, ) llm_label_map: Dict[int, str] = {} for topic_id, provider_labels in batch_result.items(): if not provider_labels: continue multi_llm_data[topic_id] = { "groq_label": provider_labels.get("Groq", ""), "mistral_label": provider_labels.get("Mistral", ""), "openrouter_label": provider_labels.get("OpenRouter", ""), "cohere_label": provider_labels.get("Cohere", ""), "hf_mistral_label": provider_labels.get("Mistral", ""), "hf_zephyr_label": "", "flan_label": provider_labels.get("Mistral", ""), "gemini_label": "", } llm_label_map[topic_id] = next(iter(provider_labels.values())) if llm_label_map: topic_table = build_topic_table(tfidf_docs, labels, topic_keywords, multi_llm_data, probabilities=probabilities, persistence_scores=persistence_scores) topic_table["Label"] = topic_table["Topic ID"].map(llm_label_map).fillna(topic_table["Label"]) _progress(f"[{source_column}] ✓ Batch labeling done — {len(llm_label_map)}/{n_topics} labels assigned") else: _progress(f"[{source_column}] ⚠️ Batch labeling returned 0 labels, falling back to single-LLM") llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm) if not topic_table.empty: topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"]) except ImportError: _progress(f"[{source_column}] ⚠️ llm_council not found, using single-LLM") llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm) if not topic_table.empty: topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"]) except Exception as e: _progress(f"[{source_column}] ⚠️ Batch council error ({e}), using single-LLM") llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm) if not topic_table.empty: topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"]) else: llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm) if not topic_table.empty: topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"]) # Step 6: Save labels _progress(f"[{source_column}] 💾 Step 6/7 — Saving labels...") labels_payload = { "phase": "Phase 2 - Generate Initial Topics (SPECTER2 + UMAP + HDBSCAN)", "source": source_column, "topic_labels": [ { "topic_id": int(row["Topic ID"]), "label": str(row["Label"]), "keywords": [k.strip() for k in str(row["Keywords"]).split(",") if k.strip()], "groq_label": row.get("groq_label", ""), "hf_mistral_label": row.get("hf_mistral_label", ""), "mistral_label": row.get("mistral_label", ""), "openrouter_label": row.get("openrouter_label", ""), "cohere_label": row.get("cohere_label", ""), "hf_zephyr_label": row.get("hf_zephyr_label", ""), "flan_label": row.get("flan_label", ""), "gemini_label": row.get("gemini_label", ""), "cluster_size": int(row["cluster_size"]), } for _, row in topic_table.iterrows() ], } upsert_json_bucket(files["labels"], source_column.lower(), labels_payload) # Step 7: Themes _progress(f"[{source_column}] 🌿 Step 7/7 — Consolidating themes...") phase3_themes = consolidate_themes(topic_table, embeddings=embeddings, labels=labels, min_themes=4, max_themes=8) phase4_reviewed = review_themes(phase3_themes) phase5_named = name_themes_with_llm(phase4_reviewed, llm) themes_payload = {"phase3": phase3_themes, "phase4": phase4_reviewed, "phase5": phase5_named} upsert_json_bucket(files["themes"], source_column.lower(), themes_payload) # Taxonomy taxonomy_payload = create_taxonomy_map(topic_table, source_column) taxonomy_table = pd.DataFrame(taxonomy_payload, columns=["source","topic","classification","domain","reason"]) taxonomy_path = Path(files["taxonomy"]) existing_taxonomy: List[Dict[str, str]] = [] if taxonomy_path.exists(): try: loaded = json.loads(taxonomy_path.read_text(encoding="utf-8")) if isinstance(loaded, list): existing_taxonomy = loaded except json.JSONDecodeError: existing_taxonomy = [] remaining = [r for r in existing_taxonomy if r.get("source") != source_column] taxonomy_path.write_text( json.dumps(remaining + taxonomy_payload, indent=2, ensure_ascii=False), encoding="utf-8" ) distribution_fig = create_topic_distribution_chart(topic_table, source_column) keyword_fig = create_keyword_chart(topic_table, source_column) _progress( f"[{source_column}] ✅ Done — " f"{len(embedding_docs)} docs → {n_clusters} clusters → {len(phase5_named)} themes" ) return AnalysisResult( source_column=source_column, paper_count=paper_count, unit_count=len(embedding_docs), topic_table=topic_table, taxonomy_table=taxonomy_table, distribution_fig=distribution_fig, keyword_fig=keyword_fig, labels_payload=labels_payload, themes_payload=themes_payload, taxonomy_payload=taxonomy_payload, suggested_params=suggested_params or {}, trial_log=opt_result.get("trial_log", []), paper_assignments=[int(l) for l in labels], doc_row_indices=row_indices, llm_suggested_params=llm_suggested_params, bayesian_best_params=bayesian_best_params, ) # --------------------------------------------------------------------------- # Topic comparison # --------------------------------------------------------------------------- def compare_topic_frames( abstract_topics: pd.DataFrame, title_topics: pd.DataFrame, output_path: str = "comparison.csv", ) -> pd.DataFrame: output_path = str(Path(output_path).resolve()) logger.info("compare_topic_frames start: abstract=%s topics, title=%s topics", len(abstract_topics), len(title_topics)) if abstract_topics.empty and title_topics.empty: frame = pd.DataFrame([{"Abstract Topic": "N/A", "Title Topic": "N/A", "Similarity": 0.0, "Notes": "No topics available."}]) frame.to_csv(output_path, index=False) return frame if abstract_topics.empty: frame = pd.DataFrame([{"Abstract Topic": "N/A", "Title Topic": str(row["Label"]), "Similarity": 0.0, "Notes": "Only title analysis available."} for _, row in title_topics.head(25).iterrows()]) frame.to_csv(output_path, index=False) return frame if title_topics.empty: frame = pd.DataFrame([{"Abstract Topic": str(row["Label"]), "Title Topic": "N/A", "Similarity": 0.0, "Notes": "Only abstract analysis available."} for _, row in abstract_topics.head(25).iterrows()]) frame.to_csv(output_path, index=False) return frame abs_desc = (abstract_topics["Label"].astype(str) + ". " + abstract_topics["Keywords"].astype(str)).tolist() title_desc = (title_topics["Label"].astype(str) + ". " + title_topics["Keywords"].astype(str)).tolist() try: all_emb = embed_documents(abs_desc + title_desc) abs_emb = all_emb[:len(abs_desc)] title_emb = all_emb[len(abs_desc):] sim_matrix = cosine_similarity(abs_emb, title_emb) except Exception as e: logger.warning(f"Embedding comparison failed ({e}), using TF-IDF fallback") vec = TfidfVectorizer(stop_words="english", ngram_range=(1, 2)) mat = vec.fit_transform(abs_desc + title_desc) sim_matrix = cosine_similarity(mat[:len(abs_desc)], mat[len(abs_desc):]) rows = [] for i, abs_label in enumerate(abstract_topics["Label"].tolist()): best_idx = int(np.argmax(sim_matrix[i])) score = float(sim_matrix[i, best_idx]) title_label = str(title_topics.iloc[best_idx]["Label"]) alignment = ("Strong alignment" if score > 0.70 else "Moderate alignment" if score > 0.45 else "Weak alignment") rows.append({"Abstract Topic": abs_label, "Title Topic": title_label, "Similarity": round(score, 4), "Notes": f"{alignment} (cosine={score:.3f})"}) comparison_df = pd.DataFrame(rows) comparison_df.to_csv(output_path, index=False) logger.info(f"✅ comparison.csv written ({len(comparison_df)} rows)") return comparison_df # --------------------------------------------------------------------------- # Narrative # --------------------------------------------------------------------------- def _fallback_narrative(abstract_result, title_result, comparison_df) -> str: abs_n = 0 if abstract_result is None else len(abstract_result.topic_table) title_n = 0 if title_result is None else len(title_result.topic_table) def _novel(result): if result is None or result.taxonomy_table.empty: return 0 if "classification" not in result.taxonomy_table.columns: return 0 return int((result.taxonomy_table["classification"] == "NOVEL").sum()) top_abs = (abstract_result.topic_table.sort_values("Count", ascending=False)["Label"].head(5).tolist() if abstract_result and not abstract_result.topic_table.empty else []) top_title = (title_result.topic_table.sort_values("Count", ascending=False)["Label"].head(5).tolist() if title_result and not title_result.topic_table.empty else []) abs_themes = [] if abstract_result: abs_themes = [t.get("theme_name","") for t in abstract_result.themes_payload.get("phase5",[])[:8] if t.get("theme_name")] strong_pairs = [] if not comparison_df.empty and "Similarity" in comparison_df.columns: strong = comparison_df[comparison_df["Similarity"] > 0.60] strong_pairs = [f"'{r['Abstract Topic']}' ↔ '{r['Title Topic']}' ({r['Similarity']:.2f})" for _, r in strong.head(5).iterrows()] return f""" This report synthesizes thematic topic modelling outcomes for Scopus metadata using a research-grade pipeline based on SPECTER2 document embeddings, UMAP dimensionality reduction, and HDBSCAN clustering. The analysis was run separately for abstracts and titles to reveal differences between detailed narrative content and concise paper framing language. A combined Title+Abstract stream was also analysed to capture the full bibliographic signal. In the abstract stream, {abs_n} distinct topics were identified, while the title stream generated {title_n} topics. These clusters were further consolidated into higher-level themes using agglomerative clustering on cluster centroids, yielding {len(abs_themes)} themes from the abstract stream. The abstract-derived topic space is dominated by: {', '.join(top_abs) if top_abs else 'insufficient abstract topics'}. Abstracts capture motivation, methods, and outcomes in full sentences, so the resulting clusters separate into fine-grained research niches. Title-driven topics include: {', '.join(top_title) if top_title else 'insufficient title topics'}. Titles present compressed intent, often merging conceptually adjacent ideas that abstracts keep separate. The identified themes from the abstract stream are: {'; '.join(abs_themes) if abs_themes else 'see themes.json for full list'}. These themes represent the major intellectual territories within this body of literature. Taxonomy mapping against PAJAIS reference categories identified {_novel(abstract_result)} novel topic instances in the abstract stream and {_novel(title_result)} in the title stream. These represent potential interdisciplinary frontiers not captured by canonical categories. Cross-stream alignment (embedding cosine similarity) reveals strongly aligned topic pairs: {'; '.join(strong_pairs) if strong_pairs else 'no strongly aligned pairs found (similarity > 0.60)'}. High-similarity pairs indicate stable thematic framing across metadata fields. The pipeline implements Braun and Clarke thematic phases end-to-end: familiarisation, coding, candidate-theme development, review, naming, and PAJAIS taxonomy positioning. Future iterations should include domain-expert validation of labels and longitudinal tracking of emergent themes. """.strip() def generate_narrative(abstract_result, title_result, comparison_df, llm, output_path="narrative.txt") -> str: output_path = str(Path(output_path).resolve()) abs_themes = [] if abstract_result: abs_themes = [t.get("theme_name","") for t in abstract_result.themes_payload.get("phase5",[])[:6]] if llm is None: narrative = _fallback_narrative(abstract_result, title_result, comparison_df) Path(output_path).write_text(narrative, encoding="utf-8") return narrative prompt = ( "Write an academic narrative of approximately 500 words for a research methods section. " "Context: Topic modeling on Scopus bibliometric data using SPECTER2 embeddings, " "UMAP + HDBSCAN clustering, and multi-LLM labeling (Groq + HuggingFace models). " f"Abstract topic count: {0 if abstract_result is None else len(abstract_result.topic_table)}. " f"Title topic count: {0 if title_result is None else len(title_result.topic_table)}. " f"Identified themes: {', '.join(abs_themes[:6]) if abs_themes else 'see themes.json'}. " "Discuss: methodology, key findings, thematic structure, novel topics, and research implications." ) try: response = llm.invoke(prompt) narrative = str(getattr(response, "content", response)).strip() if len(narrative.split()) < 350: narrative = _fallback_narrative(abstract_result, title_result, comparison_df) except Exception: narrative = _fallback_narrative(abstract_result, title_result, comparison_df) Path(output_path).write_text(narrative, encoding="utf-8") logger.info(f"✅ narrative.txt written ({len(narrative.split())} words)") return narrative # --------------------------------------------------------------------------- # Entry points # --------------------------------------------------------------------------- def run_single_analysis( mode: str, csv_path: Optional[str] = None, output_dir: str = ".", progress_callback=None, suggested_params: Optional[Dict[str, Any]] = None, ) -> "AnalysisResult": output_dir = str(Path(output_dir).resolve()) files = ensure_output_artifacts(output_dir) df, _ = load_scopus_csv(csv_path=csv_path, directory=output_dir) llm = create_groq_llm(temperature=0.0) col = {"abstract": "Abstract", "title": "Title", "combined": "Combined"}.get(mode.lower()) if col is None: raise ValueError("mode must be 'abstract', 'title', or 'combined'.") result = run_braun_clarke_pipeline( df, col, llm=llm, output_dir=output_dir, progress_callback=progress_callback, suggested_params=suggested_params, ) for fname, default_fn in [ (files["comparison"], lambda: pd.DataFrame(columns=["Abstract Topic","Title Topic","Similarity","Notes"]).to_csv(files["comparison"], index=False)), (files["narrative"], lambda: Path(files["narrative"]).write_text("Narrative placeholder.", encoding="utf-8")), ]: if not Path(fname).exists(): default_fn() return result def run_full_pipeline( csv_path: Optional[str] = None, output_dir: str = ".", progress_callback=None, ) -> Dict[str, Any]: """ Run the full 3-stream pipeline: Abstract → Title → Combined (Title+Abstract). Returns a dict with keys: csv_path, abstract, title, combined, comparison, narrative, files. """ output_dir = str(Path(output_dir).resolve()) files = ensure_output_artifacts(output_dir) df, resolved_csv = load_scopus_csv(csv_path=csv_path, directory=output_dir) llm = create_groq_llm(temperature=0.0) abstract_result = run_braun_clarke_pipeline( df, "Abstract", llm=llm, output_dir=output_dir, progress_callback=progress_callback, ) title_result = run_braun_clarke_pipeline( df, "Title", llm=llm, output_dir=output_dir, progress_callback=progress_callback, ) # ── NEW: Combined stream ────────────────────────────────────────────────── combined_result = run_braun_clarke_pipeline( df, "Combined", llm=llm, output_dir=output_dir, progress_callback=progress_callback, ) # ───────────────────────────────────────────────────────────────────────── comparison_df = compare_topic_frames( abstract_result.topic_table, title_result.topic_table, output_path=files["comparison"], ) narrative = generate_narrative( abstract_result, title_result, comparison_df, llm=llm, output_path=files["narrative"], ) return { "csv_path": resolved_csv, "abstract": abstract_result, "title": title_result, "combined": combined_result, # ← now populated "comparison": comparison_df, "narrative": narrative, "files": files, } # Backward-compatibility alias run_specter2_multi_llm_pipeline = run_full_pipeline