Spaces:
Running
Running
| """ | |
| tools.py β Core pipeline for topic modelling on Scopus CSV data. | |
| FIXES IN THIS VERSION: | |
| 1. consolidate_themes() β Complete rewrite. Was producing 1 theme due to | |
| threshold=0.35 being too tight. Now uses AgglomerativeClustering with | |
| direct n_clusters target. Guaranteed 4-8 themes. | |
| 2. Theme balancing β Filters noise clusters (< MIN_THEME_PAPERS) before | |
| theming. Orphaned small clusters collected into "Niche Topics" group. | |
| Prevents 1 giant catch-all theme + 5 singletons. | |
| 3. Noise cluster detection β Post-clustering filter removes clusters whose | |
| top keywords are metadata garbage (conference years, editorial tokens etc.) | |
| 4. build_topic_table() β Updated column names to match labeler.py | |
| (hf_mistral_label, hf_zephyr_label) with backward-compat aliases. | |
| 5. name_themes_with_llm() β Better prompting, proper academic style. | |
| 6. create_taxonomy_map() β Weighted semantic scoring (exact + partial match). | |
| 7. upsert_json_bucket() β Added explicit flush + resolve logging so writes | |
| never silently fail. | |
| 8. run_braun_clarke_pipeline() β Passes embeddings+labels into | |
| consolidate_themes() so centroids are computed on real document vectors. | |
| 9. OUTPUT_DIR handling β All file writes use the resolved absolute path | |
| passed in from app.py, never a relative ".". | |
| 10. run_full_pipeline() β NOW includes Combined (Title+Abstract) analysis | |
| as third pipeline run; returns dict["combined"] so the UI can display it. | |
| """ | |
| import json | |
| import os | |
| import re | |
| import logging | |
| import hashlib | |
| from collections import Counter | |
| from dataclasses import dataclass | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.express as px | |
| from dotenv import load_dotenv | |
| from sklearn.cluster import AgglomerativeClustering | |
| from sklearn.feature_extraction.text import TfidfVectorizer, ENGLISH_STOP_WORDS | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.preprocessing import normalize | |
| import cleaner | |
| load_dotenv() | |
| hf_token = os.getenv("HF_TOKEN", "").strip() | |
| if hf_token and not os.getenv("HUGGING_FACE_HUB_TOKEN"): | |
| os.environ["HUGGING_FACE_HUB_TOKEN"] = hf_token | |
| logger = logging.getLogger(__name__) | |
| SPECTER2_MODEL_REPO = "vvinayakkkkk/specter2-cached" | |
| STOPWORDS = set(ENGLISH_STOP_WORDS) | |
| # Minimum papers a cluster must have to participate in theme grouping. | |
| MIN_THEME_PAPERS = 10 | |
| # Tokens that indicate a cluster is metadata noise, not a research topic. | |
| METADATA_NOISE_TOKENS = { | |
| "conference", "international", "editorial", "guest", "proceedings", | |
| "workshop", "journal", "vol", "issue", "abstract", "available", | |
| "know", "tell", "king", "live", "flow", "long", "path", "features", | |
| "domains", "experiments", "dsp", "sixth", "2001", "2002", "2003", | |
| "2004", "2005", "2006", "2007", "2008", "2009", "2010", | |
| } | |
| PAJAIS_CATEGORIES = { | |
| "artificial intelligence and machine learning": { | |
| "ai", "artificial", "intelligence", "neural", "deep", "learning", | |
| "machine", "algorithm", "model", "prediction", "classification", | |
| "nlp", "language", "generative", "llm", "transformer", | |
| }, | |
| "data analytics and information systems": { | |
| "analytics", "data", "mining", "insight", "dashboard", "system", | |
| "information", "platform", "digital", "infrastructure", "database", | |
| "big", "cloud", "iot", "real", "time", | |
| }, | |
| "sustainability and environment": { | |
| "sustainability", "green", "environment", "climate", "carbon", | |
| "energy", "renewable", "emission", "ecological", "circular", | |
| }, | |
| "healthcare and medicine": { | |
| "health", "clinical", "patient", "medicine", "medical", "hospital", | |
| "drug", "disease", "treatment", "diagnosis", "telemedicine", "ehealth", | |
| }, | |
| "education and learning technology": { | |
| "education", "learning", "student", "teaching", "curriculum", | |
| "online", "elearning", "academic", "university", "pedagogy", | |
| }, | |
| "business strategy and management": { | |
| "business", "strategy", "management", "firm", "organization", | |
| "leadership", "performance", "competitive", "governance", "agile", | |
| }, | |
| "finance and economics": { | |
| "finance", "financial", "economic", "market", "investment", | |
| "banking", "cryptocurrency", "fintech", "stock", "portfolio", | |
| }, | |
| "supply chain and operations": { | |
| "supply", "logistics", "operations", "inventory", "manufacturing", | |
| "procurement", "lean", "warehouse", "distribution", "production", | |
| }, | |
| "social media and digital marketing": { | |
| "social", "media", "marketing", "digital", "brand", "consumer", | |
| "influencer", "advertising", "engagement", "content", "platform", | |
| }, | |
| "cybersecurity and risk": { | |
| "risk", "security", "privacy", "cyber", "resilience", "threat", | |
| "fraud", "vulnerability", "authentication", "blockchain", | |
| }, | |
| "human behavior and psychology": { | |
| "behavior", "psychology", "attitude", "intention", "motivation", | |
| "trust", "adoption", "perception", "cognitive", "satisfaction", | |
| }, | |
| "policy and governance": { | |
| "policy", "governance", "regulation", "public", "government", | |
| "compliance", "law", "equity", "ethics", "stakeholder", | |
| }, | |
| } | |
| class AnalysisResult: | |
| source_column: str | |
| paper_count: int | |
| unit_count: int | |
| topic_table: pd.DataFrame | |
| taxonomy_table: pd.DataFrame | |
| distribution_fig: Any | |
| keyword_fig: Any | |
| labels_payload: Dict[str, Any] | |
| themes_payload: Dict[str, Any] | |
| taxonomy_payload: List[Dict[str, str]] | |
| suggested_params: Dict[str, Any] = None | |
| trial_log: List[Dict[str, Any]] = None | |
| paper_assignments: List[int] = None | |
| doc_row_indices: List[int] = None | |
| llm_suggested_params: Dict[str, Any] = None | |
| bayesian_best_params: Dict[str, Any] = None | |
| # --------------------------------------------------------------------------- | |
| # Embedding model | |
| # --------------------------------------------------------------------------- | |
| def get_embedding_model(model_name: str = "allenai/specter2"): | |
| logger.info(f"π₯ Loading embedding model: {model_name}") | |
| if model_name != "allenai/specter2": | |
| raise ValueError("This pipeline only supports allenai/specter2 embeddings.") | |
| from adapters import AutoAdapterModel, Stack | |
| from transformers import AutoTokenizer | |
| class Specter2Encoder: | |
| def __init__(self): | |
| import torch | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| logger.info("π€ Loading SPECTER2 tokenizer...") | |
| self.tokenizer = AutoTokenizer.from_pretrained("allenai/specter2_base") | |
| logger.info("π€ Loading SPECTER2 base model...") | |
| self.model = AutoAdapterModel.from_pretrained("allenai/specter2_base") | |
| logger.info("π€ Loading SPECTER2 adapter from cached repo...") | |
| self.model.load_adapter( | |
| SPECTER2_MODEL_REPO, source="hf", load_as="specter2", set_active=False | |
| ) | |
| self.model.set_active_adapters(Stack("specter2")) | |
| self.model.to(self.device) | |
| self.model.eval() | |
| logger.info("β SPECTER2 ready") | |
| def encode( | |
| self, | |
| documents: List[str], | |
| batch_size: int = 64, | |
| show_progress_bar: bool = False, | |
| normalize_embeddings: bool = True, | |
| ) -> np.ndarray: | |
| import torch | |
| logger.info(f"π€ Encoding {len(documents)} documents (batch_size={batch_size})") | |
| embeddings: List[np.ndarray] = [] | |
| num_batches = (len(documents) + batch_size - 1) // batch_size | |
| for batch_idx, start in enumerate(range(0, len(documents), batch_size)): | |
| batch = documents[start : start + batch_size] | |
| logger.info(f" Batch [{batch_idx + 1}/{num_batches}] β {len(batch)} docs") | |
| inputs = self.tokenizer( | |
| batch, | |
| padding=True, | |
| truncation=True, | |
| return_tensors="pt", | |
| return_token_type_ids=False, | |
| max_length=512, | |
| ) | |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy() | |
| embeddings.append(batch_embeddings) | |
| return np.vstack(embeddings) if embeddings else np.empty((0, 768), dtype=float) | |
| return Specter2Encoder() | |
| def warm_specter2_cache() -> None: | |
| logger.info("π₯ SPECTER2 cache warmup START") | |
| model = get_embedding_model("allenai/specter2") | |
| logger.info(f" warmup loaded model: {type(model).__name__}") | |
| logger.info("β SPECTER2 cache warmup COMPLETE") | |
| def _embedding_cache_key(documents: List[str], model_name: str, batch_size: int) -> str: | |
| hasher = hashlib.sha256() | |
| hasher.update(model_name.encode("utf-8")) | |
| hasher.update(str(batch_size).encode("utf-8")) | |
| for document in documents: | |
| hasher.update(b"\0") | |
| hasher.update(document.encode("utf-8", errors="ignore")) | |
| return hasher.hexdigest()[:16] | |
| def get_cached_embeddings( | |
| documents: List[str], | |
| model: Any, | |
| cache_dir: str = ".", | |
| model_name: str = "allenai/specter2", | |
| batch_size: int = 64, | |
| ) -> np.ndarray: | |
| cache_root = Path(cache_dir).resolve() / ".embedding_cache" | |
| cache_root.mkdir(parents=True, exist_ok=True) | |
| key = _embedding_cache_key(documents, model_name, batch_size) | |
| cache_file = cache_root / f"embeddings_{key}.npy" | |
| meta_file = cache_root / f"embeddings_{key}.json" | |
| if cache_file.exists(): | |
| logger.info(f"β Loading cached embeddings from {cache_file}") | |
| try: | |
| if meta_file.exists(): | |
| meta = json.loads(meta_file.read_text(encoding="utf-8")) | |
| logger.info(f" cache metadata: {meta}") | |
| except Exception as exc: | |
| logger.warning(f" cache metadata read failed: {exc}") | |
| return np.load(cache_file, allow_pickle=False) | |
| logger.info(f"π’ Computing embeddings for {len(documents)} documents") | |
| embeddings = model.encode(documents, batch_size=batch_size) | |
| np.save(cache_file, embeddings) | |
| meta_file.write_text( | |
| json.dumps( | |
| { | |
| "model_name": model_name, | |
| "batch_size": batch_size, | |
| "document_count": len(documents), | |
| "embedding_shape": list(np.asarray(embeddings).shape), | |
| "cache_file": cache_file.name, | |
| }, | |
| indent=2, | |
| ensure_ascii=False, | |
| ), | |
| encoding="utf-8", | |
| ) | |
| logger.info(f"πΎ Saved cached embeddings to {cache_file}") | |
| return np.asarray(embeddings) | |
| # --------------------------------------------------------------------------- | |
| # Artifact helpers | |
| # --------------------------------------------------------------------------- | |
| def ensure_output_artifacts(output_dir: str = ".") -> Dict[str, str]: | |
| base = Path(output_dir).resolve() | |
| base.mkdir(parents=True, exist_ok=True) | |
| paths = { | |
| "comparison": base / "comparison.csv", | |
| "taxonomy": base / "taxonomy_map.json", | |
| "narrative": base / "narrative.txt", | |
| "labels": base / "labels.json", | |
| "themes": base / "themes.json", | |
| } | |
| if not paths["comparison"].exists(): | |
| pd.DataFrame( | |
| columns=["Abstract Topic", "Title Topic", "Similarity", "Notes"] | |
| ).to_csv(paths["comparison"], index=False) | |
| if not paths["taxonomy"].exists(): | |
| paths["taxonomy"].write_text("[]", encoding="utf-8") | |
| if not paths["narrative"].exists(): | |
| paths["narrative"].write_text("Narrative placeholder.", encoding="utf-8") | |
| if not paths["labels"].exists(): | |
| paths["labels"].write_text("{}", encoding="utf-8") | |
| if not paths["themes"].exists(): | |
| paths["themes"].write_text("{}", encoding="utf-8") | |
| return {k: str(v) for k, v in paths.items()} | |
| def detect_csv_file(directory: str = ".") -> Optional[str]: | |
| excluded = {"comparison.csv"} | |
| csv_files = sorted( | |
| [p for p in Path(directory).iterdir() if p.is_file() and p.suffix.lower() == ".csv"] | |
| ) | |
| candidates = [p for p in csv_files if p.name.lower() not in excluded] | |
| for path in candidates: | |
| try: | |
| sample = pd.read_csv(path, nrows=1, low_memory=False) | |
| cols = {str(c).strip().lower() for c in sample.columns} | |
| if "abstract" in cols or "title" in cols or "matched_csv_title" in cols: | |
| return str(path) | |
| except Exception: | |
| continue | |
| if candidates: | |
| return str(candidates[0]) | |
| return str(csv_files[0]) if csv_files else None | |
| def load_scopus_csv( | |
| csv_path: Optional[str] = None, directory: str = "." | |
| ) -> Tuple[pd.DataFrame, str]: | |
| path = csv_path or detect_csv_file(directory) | |
| if not path: | |
| raise FileNotFoundError("No CSV file found in the working directory.") | |
| df = pd.read_csv(path, low_memory=False) | |
| # Normalise column names: strip BOM and whitespace | |
| df.columns = [c.strip().lstrip("\ufeff") for c in df.columns] | |
| # Resolve title column to "Title" if variant exists | |
| _title_variants = [ | |
| "Title", "title", "Article Title", "Document Title", | |
| "Paper Title", "matched_csv_title", "document_title", | |
| ] | |
| for variant in _title_variants: | |
| if variant in df.columns and variant != "Title": | |
| df = df.rename(columns={variant: "Title"}) | |
| break | |
| if "Abstract" not in df.columns: | |
| df["Abstract"] = "" | |
| if "Title" not in df.columns: | |
| df["Title"] = "" | |
| return df, str(path) | |
| # --------------------------------------------------------------------------- | |
| # Document preparation | |
| # --------------------------------------------------------------------------- | |
| def build_documents( | |
| df: pd.DataFrame, | |
| column: str, | |
| ) -> Tuple[List[str], List[str], int, List[int]]: | |
| series = df[column].dropna().astype(str) | |
| series = series[series.str.strip() != ""] | |
| logger.info(f"[{column}] build_documents start: {len(series)} non-empty source rows") | |
| embedding_docs: List[str] = [] | |
| tfidf_docs: List[str] = [] | |
| row_indices: List[int] = [] | |
| for idx, text in series.items(): | |
| emb = cleaner.prepare_for_embedding(text) | |
| if not emb: | |
| continue | |
| tfi = cleaner.prepare_for_tfidf(text) | |
| if not tfi: | |
| continue | |
| embedding_docs.append(emb) | |
| tfidf_docs.append(tfi) | |
| row_indices.append(int(idx)) | |
| logger.info( | |
| f"[{column}] build_documents complete: " | |
| f"{len(embedding_docs)} embedding docs / {len(tfidf_docs)} TF-IDF docs" | |
| ) | |
| return embedding_docs, tfidf_docs, len(series), row_indices | |
| def build_documents_legacy( | |
| df: pd.DataFrame, column: str | |
| ) -> Tuple[List[str], int]: | |
| emb, _, paper_count, _ = build_documents(df, column) | |
| return emb, paper_count | |
| def build_documents_combined(df: pd.DataFrame) -> Tuple[List[str], List[str], int, List[int]]: | |
| title_series = df.get("Title", pd.Series(dtype=str)).fillna("").astype(str) | |
| abstract_series = df.get("Abstract", pd.Series(dtype=str)).fillna("").astype(str) | |
| combined = (title_series.str.strip() + " \n " + abstract_series.str.strip()).str.strip() | |
| combined = combined[combined != ""] | |
| logger.info(f"[Combined] build_documents start: {len(combined)} non-empty source rows") | |
| embedding_docs: List[str] = [] | |
| tfidf_docs: List[str] = [] | |
| row_indices: List[int] = [] | |
| for idx, text in combined.items(): | |
| emb = cleaner.prepare_for_embedding(text) | |
| if not emb: | |
| continue | |
| tfi = cleaner.prepare_for_tfidf(text) | |
| if not tfi: | |
| continue | |
| embedding_docs.append(emb) | |
| tfidf_docs.append(tfi) | |
| row_indices.append(int(idx)) | |
| logger.info( | |
| f"[Combined] build_documents complete: " | |
| f"{len(embedding_docs)} embedding docs / {len(tfidf_docs)} TF-IDF docs" | |
| ) | |
| return embedding_docs, tfidf_docs, len(combined), row_indices | |
| # --------------------------------------------------------------------------- | |
| # Embedding | |
| # --------------------------------------------------------------------------- | |
| def embed_documents( | |
| documents: List[str], | |
| batch_size: int = 64, | |
| cache_dir: str = ".", | |
| ) -> np.ndarray: | |
| if not documents: | |
| return np.empty((0, 768), dtype=float) | |
| logger.info(f"embed_documents start: {len(documents)} documents") | |
| model = get_embedding_model("allenai/specter2") | |
| embeddings = get_cached_embeddings( | |
| documents, model, cache_dir=cache_dir, | |
| model_name="allenai/specter2", batch_size=batch_size, | |
| ) | |
| logger.info(f"embed_documents complete: output shape={np.array(embeddings).shape}") | |
| return normalize(np.array(embeddings), norm="l2") | |
| # --------------------------------------------------------------------------- | |
| # Noise cluster detection | |
| # --------------------------------------------------------------------------- | |
| def is_noise_cluster(keywords: List[str], threshold: float = 0.55) -> bool: | |
| if not keywords: | |
| return True | |
| check = keywords[:6] | |
| noise_count = sum(1 for kw in check if kw.lower().strip() in METADATA_NOISE_TOKENS) | |
| return (noise_count / len(check)) >= threshold | |
| # --------------------------------------------------------------------------- | |
| # Clustering β UMAP + HDBSCAN | |
| # --------------------------------------------------------------------------- | |
| def cluster_embeddings_umap_hdbscan( | |
| embeddings: np.ndarray, | |
| min_cluster_size: int = 5, | |
| min_samples: int = 1, | |
| n_neighbors: int = 15, | |
| min_dist: float = 0.0, | |
| cluster_selection_method: str = "eom", | |
| cluster_selection_epsilon: float = 0.0, | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| if len(embeddings) == 0: | |
| return np.array([], dtype=int), np.array([]), np.array([]) | |
| if len(embeddings) == 1: | |
| return np.array([0], dtype=int), np.array([1.0]), np.array([]) | |
| try: | |
| from umap import UMAP | |
| import hdbscan | |
| logger.info(f"UMAP reduction (n_neighbors={n_neighbors}, min_dist={min_dist})...") | |
| umap_model = UMAP( | |
| n_neighbors=min(n_neighbors, len(embeddings) - 1), | |
| min_dist=min_dist, | |
| metric="cosine", | |
| n_components=min(8, len(embeddings) - 1), | |
| random_state=42, | |
| ) | |
| reduced = umap_model.fit_transform(embeddings) | |
| logger.info(f"HDBSCAN clustering (min_cluster_size={min_cluster_size})...") | |
| clusterer = hdbscan.HDBSCAN( | |
| min_cluster_size=min(min_cluster_size, max(2, len(embeddings) // 5)), | |
| min_samples=min_samples, | |
| metric="euclidean", | |
| prediction_data=True, | |
| cluster_selection_method=cluster_selection_method, | |
| cluster_selection_epsilon=cluster_selection_epsilon, | |
| ) | |
| labels = clusterer.fit_predict(reduced) | |
| try: | |
| probabilities = np.asarray(clusterer.probabilities_) | |
| except Exception: | |
| probabilities = np.array([]) | |
| try: | |
| persistence_scores = np.asarray(clusterer.cluster_persistence_) | |
| except Exception: | |
| persistence_scores = np.array([]) | |
| return labels, probabilities, persistence_scores | |
| except ImportError as e: | |
| logger.error(f"UMAP or HDBSCAN not installed: {e}") | |
| return cluster_embeddings(embeddings, threshold=0.7) | |
| def control_cluster_count( | |
| embeddings: np.ndarray, | |
| labels: np.ndarray, | |
| min_clusters: int = 15, | |
| max_clusters: int = 30, | |
| ) -> np.ndarray: | |
| unique_labels = np.unique(labels) | |
| current_count = len(unique_labels[unique_labels >= 0]) | |
| logger.info(f"Cluster count: {current_count} (target: {min_clusters}β{max_clusters})") | |
| if min_clusters <= current_count <= max_clusters: | |
| return labels | |
| if current_count > max_clusters: | |
| valid = unique_labels[unique_labels >= 0] | |
| centroids = np.vstack([embeddings[labels == lbl].mean(axis=0) for lbl in valid]) | |
| centroids = normalize(centroids, norm="l2") | |
| n_target = min(max_clusters, len(centroids)) | |
| try: | |
| merger = AgglomerativeClustering(n_clusters=n_target, metric="cosine", linkage="average") | |
| except TypeError: | |
| merger = AgglomerativeClustering(n_clusters=n_target, affinity="cosine", linkage="average") | |
| merged = merger.fit_predict(centroids) | |
| label_map = {int(valid[i]): int(merged[i]) for i in range(len(valid))} | |
| return np.array([label_map.get(int(l), 0) for l in labels], dtype=int) | |
| if current_count < min_clusters: | |
| new_min_size = max(2, min_clusters // 5) | |
| labels, probs, persist = cluster_embeddings_umap_hdbscan( | |
| embeddings, min_cluster_size=new_min_size, | |
| n_neighbors=min(15, len(embeddings) - 1), | |
| ) | |
| return labels | |
| return labels | |
| def cluster_embeddings(embeddings: np.ndarray, threshold: float = 0.7) -> np.ndarray: | |
| if len(embeddings) == 0: | |
| return np.array([], dtype=int) | |
| if len(embeddings) == 1: | |
| return np.array([0], dtype=int) | |
| try: | |
| clustering = AgglomerativeClustering( | |
| n_clusters=None, metric="cosine", linkage="average", | |
| distance_threshold=threshold, | |
| ) | |
| except TypeError: | |
| clustering = AgglomerativeClustering( | |
| n_clusters=None, affinity="cosine", linkage="average", | |
| distance_threshold=threshold, | |
| ) | |
| return clustering.fit_predict(embeddings) | |
| def reduce_topic_count( | |
| embeddings: np.ndarray, labels: np.ndarray, | |
| min_topics: int = 50, max_topics: int = 120, | |
| ) -> np.ndarray: | |
| if len(labels) == 0: | |
| return labels | |
| unique_labels = np.unique(labels) | |
| if len(unique_labels) <= max_topics: | |
| return labels | |
| topic_to_idx = {int(lbl): idx for idx, lbl in enumerate(unique_labels)} | |
| centroids = np.vstack([embeddings[labels == lbl].mean(axis=0) for lbl in unique_labels]) | |
| centroids = normalize(centroids, norm="l2") | |
| target = min(max(min_topics, min(len(unique_labels), max_topics)), len(centroids)) | |
| try: | |
| reducer = AgglomerativeClustering(n_clusters=target, metric="cosine", linkage="average") | |
| except TypeError: | |
| reducer = AgglomerativeClustering(n_clusters=target, affinity="cosine", linkage="average") | |
| reduced_ids = reducer.fit_predict(centroids) | |
| remap = {int(lbl): int(reduced_ids[idx]) for lbl, idx in topic_to_idx.items()} | |
| return np.array([remap[int(lbl)] for lbl in labels], dtype=int) | |
| # --------------------------------------------------------------------------- | |
| # LLM-guided parameter suggestion | |
| # --------------------------------------------------------------------------- | |
| def suggest_clustering_params_with_llm( | |
| n_docs: int, sample_keywords: List[str], llm: Any = None, | |
| ) -> Dict[str, Any]: | |
| defaults = { | |
| "n_neighbors": 15, | |
| "n_components": 8, | |
| "min_cluster_size": max(5, n_docs // 100), | |
| "min_samples": 1, | |
| "cluster_selection_method": "eom", | |
| "cluster_selection_epsilon": 0.0, | |
| "reasoning": "default", | |
| } | |
| prompt = ( | |
| f"Corpus size: {n_docs}\n" | |
| f"Sample keywords: {', '.join(sample_keywords[:20])}\n\n" | |
| "Recommend UMAP and HDBSCAN parameters for clustering academic papers using SPECTER2 embeddings. " | |
| "Return a valid JSON object with only the keys: n_neighbors, n_components, min_cluster_size, " | |
| "min_samples, cluster_selection_method, cluster_selection_epsilon." | |
| ) | |
| if llm is None: | |
| try: | |
| llm = create_groq_llm(temperature=0.0) | |
| except Exception: | |
| llm = None | |
| try: | |
| if llm is not None: | |
| try: | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| resp = llm.invoke([ | |
| SystemMessage(content="Respond with ONLY a raw JSON object and nothing else."), | |
| HumanMessage(content=prompt), | |
| ]) | |
| except Exception: | |
| resp = llm.invoke(prompt) | |
| content = getattr(resp, "content", str(resp)) | |
| else: | |
| raise Exception("No LLM available") | |
| jtext = str(content).strip() | |
| if not jtext: | |
| raise ValueError("Empty LLM response") | |
| jtext = re.sub(r"^```(?:json)?\s*", "", jtext, flags=re.IGNORECASE) | |
| jtext = re.sub(r"\s*```$", "", jtext) | |
| parsed = json.loads(jtext) | |
| params = { | |
| "n_neighbors": int(parsed.get("n_neighbors", defaults["n_neighbors"])), | |
| "n_components": int(parsed.get("n_components", defaults["n_components"])), | |
| "min_cluster_size": int(parsed.get("min_cluster_size", defaults["min_cluster_size"])), | |
| "min_samples": int(parsed.get("min_samples", defaults["min_samples"])), | |
| "cluster_selection_method": str(parsed.get("cluster_selection_method", defaults["cluster_selection_method"])), | |
| "cluster_selection_epsilon": float(parsed.get("cluster_selection_epsilon", defaults["cluster_selection_epsilon"])), | |
| "reasoning": content[:1000], | |
| } | |
| except Exception as e: | |
| logger.warning(f"suggest_clustering_params_with_llm failed: {e}") | |
| params = defaults | |
| params["n_neighbors"] = max(5, min(50, int(params.get("n_neighbors", 15)))) | |
| params["n_components"] = max(5, min(10, int(params.get("n_components", 8)))) | |
| min_size_lower = max(5, n_docs // 100) | |
| min_size_upper = max(20, n_docs // 20) | |
| params["min_cluster_size"] = max(min_size_lower, min(min_size_upper, int(params.get("min_cluster_size", min_size_lower)))) | |
| params["min_samples"] = max(1, min(int(params.get("min_samples", 1)), int(params["min_cluster_size"]))) | |
| params["cluster_selection_method"] = params.get("cluster_selection_method", "eom") if params.get("cluster_selection_method") in ("eom", "leaf") else "eom" | |
| params["cluster_selection_epsilon"] = max(0.0, min(0.3, float(params.get("cluster_selection_epsilon", 0.0)))) | |
| return params | |
| # --------------------------------------------------------------------------- | |
| # Bayesian optimisation (Optuna) over UMAP + HDBSCAN | |
| # --------------------------------------------------------------------------- | |
| def run_bayesian_hdbscan_optimization( | |
| embeddings: np.ndarray, | |
| n_trials: int = 50, | |
| warm_start_params: Optional[Dict[str, Any]] = None, | |
| ) -> Dict[str, Any]: | |
| try: | |
| import optuna | |
| except Exception as e: | |
| logger.error(f"Optuna not available: {e}") | |
| return {"best_params": {}, "best_score": -1.0, "n_valid_trials": 0, "trial_log": []} | |
| import hdbscan | |
| from hdbscan.validity import validity_index | |
| from umap import UMAP | |
| n_docs = len(embeddings) | |
| trial_log: List[Dict[str, Any]] = [] | |
| umap_cache: Dict[Tuple[int, int], np.ndarray] = {} | |
| def objective(trial: optuna.Trial): | |
| params = { | |
| "n_neighbors": trial.suggest_categorical("n_neighbors", [5, 10, 15, 25, 35, 50]), | |
| "n_components": trial.suggest_int("n_components", 5, 10), | |
| "min_cluster_size": trial.suggest_int("min_cluster_size", max(5, n_docs // 100), max(20, n_docs // 20)), | |
| "min_samples": trial.suggest_int("min_samples", 1, 20), | |
| "cluster_selection_method": trial.suggest_categorical("cluster_selection_method", ["eom", "leaf"]), | |
| "cluster_selection_epsilon": trial.suggest_float("cluster_selection_epsilon", 0.0, 0.3), | |
| } | |
| if params["min_samples"] > params["min_cluster_size"]: | |
| params["min_samples"] = params["min_cluster_size"] | |
| cache_key = (int(params["n_neighbors"]), int(params["n_components"])) | |
| try: | |
| if cache_key in umap_cache: | |
| reduced = umap_cache[cache_key] | |
| else: | |
| umap_model = UMAP( | |
| n_neighbors=min(params["n_neighbors"], len(embeddings) - 1), | |
| n_components=params["n_components"], | |
| min_dist=0.0, metric="cosine", random_state=42, | |
| ) | |
| reduced = umap_model.fit_transform(embeddings) | |
| umap_cache[cache_key] = reduced | |
| clusterer = hdbscan.HDBSCAN( | |
| min_cluster_size=min(params["min_cluster_size"], max(2, len(embeddings) // 5)), | |
| min_samples=params["min_samples"], | |
| metric="euclidean", prediction_data=True, | |
| cluster_selection_method=params["cluster_selection_method"], | |
| cluster_selection_epsilon=params["cluster_selection_epsilon"], | |
| ) | |
| labels = clusterer.fit_predict(reduced) | |
| persistence = np.asarray(getattr(clusterer, "cluster_persistence_", np.array([]))) | |
| except Exception as e: | |
| logger.warning(f"Trial failed to cluster: {e}") | |
| return -1.0 | |
| unique, counts = np.unique(labels, return_counts=True) | |
| max_frac = 0.0 | |
| n_valid = len([lbl for lbl in unique if lbl != -1]) | |
| for c, cnt in zip(unique, counts): | |
| if c == -1: | |
| continue | |
| frac = float(cnt) / max(1, n_docs) | |
| if frac > max_frac: | |
| max_frac = frac | |
| noise_count = int((labels == -1).sum()) | |
| target_min_clusters = max(10, n_docs // 120) | |
| target_max_clusters = max(35, n_docs // 35) | |
| passed = True | |
| score = -1.0 | |
| persistence_mean = float(np.mean(persistence)) if len(persistence) > 0 else 0.0 | |
| dbcv = 0.0 | |
| if max_frac > 0.25: | |
| passed = False | |
| elif any((labels == lbl).sum() < 5 for lbl in unique if lbl != -1): | |
| passed = False | |
| elif n_valid < target_min_clusters or n_valid > target_max_clusters: | |
| passed = False | |
| else: | |
| try: | |
| dbcv = validity_index(embeddings, labels) | |
| except Exception: | |
| dbcv = 0.0 | |
| score = 0.6 * persistence_mean + 0.4 * float(dbcv) | |
| trial_log.append({ | |
| "trial": len(trial_log) + 1, | |
| **params, | |
| "n_clusters": int(len(unique)), | |
| "n_valid": int(n_valid), | |
| "max_frac": float(max_frac), | |
| "noise_count": int(noise_count), | |
| "persistence": float(persistence_mean), | |
| "dbcv": float(dbcv), | |
| "constraints_passed": bool(passed), | |
| "score": float(score), | |
| }) | |
| return float(score) | |
| study = optuna.create_study(direction="maximize") | |
| if warm_start_params: | |
| try: | |
| study.enqueue_trial({ | |
| k: warm_start_params[k] | |
| for k in warm_start_params | |
| if k in ["n_neighbors","n_components","min_cluster_size","min_samples", | |
| "cluster_selection_method","cluster_selection_epsilon"] | |
| }) | |
| except Exception: | |
| pass | |
| study.optimize(objective, n_trials=n_trials) | |
| try: | |
| best = study.best_trial | |
| best_params = best.params | |
| best_score = float(best.value) | |
| except Exception: | |
| best_params = {} | |
| best_score = -1.0 | |
| n_valid_count = sum(1 for t in trial_log if t.get("constraints_passed")) | |
| return {"best_params": best_params, "best_score": best_score, "n_valid_trials": n_valid_count, "trial_log": trial_log} | |
| # --------------------------------------------------------------------------- | |
| # Keyword extraction β TF-IDF | |
| # --------------------------------------------------------------------------- | |
| def extract_topic_keywords( | |
| tfidf_docs: List[str], | |
| labels: np.ndarray, | |
| top_n: int = 10, | |
| ) -> Dict[int, List[str]]: | |
| if not tfidf_docs or len(labels) == 0: | |
| return {} | |
| grouped: Dict[int, List[str]] = {} | |
| for doc, label in zip(tfidf_docs, labels): | |
| grouped.setdefault(int(label), []).append(doc) | |
| cluster_ids = sorted(grouped.keys()) | |
| pseudo_docs = [" ".join(grouped[cid]) for cid in cluster_ids] | |
| vectorizer = TfidfVectorizer( | |
| stop_words="english", ngram_range=(1, 2), | |
| max_features=10_000, min_df=1, sublinear_tf=True, | |
| ) | |
| try: | |
| matrix = vectorizer.fit_transform(pseudo_docs) | |
| except ValueError: | |
| return {cid: [] for cid in cluster_ids} | |
| vocab = np.array(vectorizer.get_feature_names_out()) | |
| keywords: Dict[int, List[str]] = {} | |
| for row_idx, cid in enumerate(cluster_ids): | |
| scores = np.asarray(matrix[row_idx].todense()).ravel() | |
| top_idx = scores.argsort()[::-1][:top_n] | |
| words = [str(vocab[i]) for i in top_idx if scores[i] > 0] | |
| keywords[cid] = words | |
| return keywords | |
| # --------------------------------------------------------------------------- | |
| # Topic table builder | |
| # --------------------------------------------------------------------------- | |
| def build_topic_table( | |
| tfidf_docs: List[str], | |
| labels: np.ndarray, | |
| keywords: Dict[int, List[str]], | |
| multi_llm_data: Optional[Dict[int, Dict[str, str]]] = None, | |
| probabilities: Optional[np.ndarray] = None, | |
| persistence_scores: Optional[np.ndarray] = None, | |
| ) -> pd.DataFrame: | |
| if not tfidf_docs or len(labels) == 0: | |
| return pd.DataFrame(columns=[ | |
| "Topic ID", "Keywords", | |
| "groq_label", "mistral_label", "openrouter_label", "cohere_label", | |
| "hf_mistral_label", "hf_zephyr_label", "flan_label", "gemini_label", | |
| "Label", "cluster_size", "Count", "strong_count", "weak_count", "persistence", | |
| ]) | |
| counts = pd.Series(labels).value_counts().sort_values(ascending=False) | |
| rows = [] | |
| for topic_id, count in counts.items(): | |
| terms = keywords.get(int(topic_id), []) | |
| default_label = " ".join(terms[:4]).strip().title() if terms else f"Topic {int(topic_id)}" | |
| llm = (multi_llm_data or {}).get(int(topic_id), {}) | |
| groq_lbl = llm.get("groq_label", "") | |
| mistral_lbl = llm.get("mistral_label", "") or llm.get("hf_mistral_label", "") or llm.get("flan_label", "") | |
| openrouter_lbl = llm.get("openrouter_label", "") | |
| cohere_lbl = llm.get("cohere_label", "") | |
| hf_z_lbl = llm.get("hf_zephyr_label", "") or llm.get("gemini_label", "") | |
| if probabilities is None or len(probabilities) == 0: | |
| strong_count = int(count) | |
| else: | |
| mask = np.array(labels) == int(topic_id) | |
| strong_count = int((np.asarray(probabilities)[mask] >= 0.5).sum()) if mask.any() else 0 | |
| weak_count = int(count) - int(strong_count) | |
| try: | |
| persistence = float(persistence_scores[int(topic_id)]) if (persistence_scores is not None and len(persistence_scores) > int(topic_id)) else 0.0 | |
| except Exception: | |
| persistence = 0.0 | |
| rows.append({ | |
| "Topic ID": int(topic_id), | |
| "Keywords": ", ".join(terms), | |
| "groq_label": groq_lbl, | |
| "mistral_label": mistral_lbl, | |
| "openrouter_label": openrouter_lbl, | |
| "cohere_label": cohere_lbl, | |
| "hf_mistral_label": mistral_lbl, | |
| "hf_zephyr_label": hf_z_lbl, | |
| "flan_label": mistral_lbl, | |
| "gemini_label": hf_z_lbl, | |
| "Label": default_label, | |
| "cluster_size": int(count), | |
| "Count": int(count), | |
| "strong_count": int(strong_count), | |
| "weak_count": int(weak_count), | |
| "persistence": float(persistence), | |
| }) | |
| return pd.DataFrame(rows) | |
| # --------------------------------------------------------------------------- | |
| # LLM helpers | |
| # --------------------------------------------------------------------------- | |
| def create_groq_llm(temperature: float = 0.0): | |
| api_key = os.getenv("GROQ_API_KEY", "").strip() | |
| if not api_key: | |
| logger.warning("β οΈ No GROQ_API_KEY found") | |
| return None | |
| try: | |
| from langchain_groq import ChatGroq | |
| llm = ChatGroq(model="llama-3.3-70b-versatile", temperature=temperature, max_tokens=200) | |
| logger.info("β Groq LLM created (llama-3.3-70b-versatile)") | |
| return llm | |
| except Exception as e: | |
| logger.warning(f"Failed to create Groq LLM: {e}") | |
| return None | |
| def _clean_llm_label(text: str, fallback: str) -> str: | |
| text = re.sub(r"[\n\r]+", " ", str(text)).strip() | |
| text = re.sub(r"^\s*\d+[\.\:\)]\s*", "", text) | |
| text = re.sub(r"^\s*[-\*β’]\s*", "", text) | |
| text = re.sub(r'^["\'](.+)["\']$', r"\1", text.strip()) | |
| text = re.sub(r"^(?:the\s+)?(?:topic\s+)?(?:label|name|title)\s*[:β-]\s*", "", text, flags=re.IGNORECASE) | |
| text = re.sub(r"[^a-zA-Z0-9\-\s]", "", text) | |
| compact = " ".join(text.split()) | |
| words = compact.split() | |
| if len(words) > 8: | |
| compact = " ".join(words[:6]) | |
| return compact[:80] if compact else fallback | |
| def label_topics_with_llm( | |
| topic_table: pd.DataFrame, | |
| topic_keywords: Dict[int, List[str]], | |
| llm: Any, | |
| ) -> Dict[int, str]: | |
| if topic_table.empty: | |
| return {} | |
| labels_out: Dict[int, str] = {} | |
| llm_failed = False | |
| for _, row in topic_table.iterrows(): | |
| topic_id = int(row["Topic ID"]) | |
| keywords = topic_keywords.get(topic_id, []) | |
| fallback = str(row["Label"]).strip() or f"Topic {topic_id}" | |
| if llm is None or llm_failed: | |
| labels_out[topic_id] = fallback | |
| continue | |
| kw_str = ", ".join(keywords[:8]) | |
| prompt = ( | |
| f"Research cluster keywords: {kw_str}\n\n" | |
| "Generate a concise academic topic label (3-6 words). " | |
| "Output ONLY the label β no explanation, no punctuation, no quotes." | |
| ) | |
| try: | |
| response = llm.invoke(prompt) | |
| content = getattr(response, "content", str(response)) | |
| labels_out[topic_id] = _clean_llm_label(content, fallback) | |
| except Exception as e: | |
| logger.warning(f"LLM labeling failed for topic {topic_id}: {e}") | |
| labels_out[topic_id] = fallback | |
| llm_failed = True | |
| return labels_out | |
| # --------------------------------------------------------------------------- | |
| # JSON helpers | |
| # --------------------------------------------------------------------------- | |
| def upsert_json_bucket(file_path: str, key: str, payload: Any) -> None: | |
| path = Path(file_path).resolve() | |
| logger.info(f"πΎ upsert_json_bucket β {path} [key={key}]") | |
| if path.exists(): | |
| try: | |
| existing = json.loads(path.read_text(encoding="utf-8")) | |
| if not isinstance(existing, dict): | |
| existing = {} | |
| except json.JSONDecodeError: | |
| existing = {} | |
| else: | |
| existing = {} | |
| existing[key] = payload | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(existing, indent=2, ensure_ascii=False), encoding="utf-8") | |
| logger.info(f"β Saved {len(existing)} keys β {path.name} ({path.stat().st_size} bytes)") | |
| # --------------------------------------------------------------------------- | |
| # Theme consolidation | |
| # --------------------------------------------------------------------------- | |
| def consolidate_themes( | |
| topic_table: pd.DataFrame, | |
| embeddings: Optional[np.ndarray] = None, | |
| labels: Optional[np.ndarray] = None, | |
| min_themes: int = 4, | |
| max_themes: int = 8, | |
| ) -> List[Dict[str, Any]]: | |
| if topic_table.empty: | |
| return [] | |
| n_topics = len(topic_table) | |
| logger.info(f"πΏ consolidate_themes: {n_topics} clusters β {min_themes}β{max_themes} themes") | |
| substantial = topic_table[topic_table["Count"] >= MIN_THEME_PAPERS].copy() | |
| niche = topic_table[topic_table["Count"] < MIN_THEME_PAPERS].copy() | |
| logger.info( | |
| f" Substantial (β₯{MIN_THEME_PAPERS} papers): {len(substantial)} | " | |
| f"Niche: {len(niche)}" | |
| ) | |
| themes: List[Dict[str, Any]] = [] | |
| if not substantial.empty: | |
| n_sub = len(substantial) | |
| large_count = int((substantial["Count"] >= 20).sum()) | |
| target = max(min_themes, min(max_themes, max(large_count // 3, 3))) | |
| target = min(target, n_sub) | |
| target = max(target, 1) | |
| logger.info(f" Large clusters (β₯20 papers): {large_count} β target themes: {target}") | |
| cluster_emb_matrix: Optional[np.ndarray] = None | |
| if embeddings is not None and labels is not None and len(embeddings) > 0: | |
| logger.info(" Computing centroids from document embeddings...") | |
| centroids = [] | |
| for tid in substantial["Topic ID"].tolist(): | |
| mask = labels == tid | |
| if mask.any(): | |
| centroids.append(embeddings[mask].mean(axis=0)) | |
| else: | |
| centroids.append(np.zeros(embeddings.shape[1])) | |
| cluster_emb_matrix = normalize(np.vstack(centroids), norm="l2") | |
| else: | |
| texts = ( | |
| substantial["Label"].astype(str).fillna("") + " " + | |
| substantial["Keywords"].astype(str).fillna("") | |
| ).tolist() | |
| try: | |
| cluster_emb_matrix = embed_documents(texts) | |
| except Exception as e: | |
| logger.warning(f" Embedding failed: {e}") | |
| if cluster_emb_matrix is not None and len(cluster_emb_matrix) >= target: | |
| if target == 1: | |
| theme_labels_arr = np.zeros(n_sub, dtype=int) | |
| else: | |
| try: | |
| themer = AgglomerativeClustering(n_clusters=target, metric="cosine", linkage="average") | |
| except TypeError: | |
| themer = AgglomerativeClustering(n_clusters=target, affinity="cosine", linkage="average") | |
| theme_labels_arr = themer.fit_predict(cluster_emb_matrix) | |
| logger.info(f" Theme clustering β {len(np.unique(theme_labels_arr))} groups") | |
| else: | |
| logger.warning(" Fallback: round-robin theme assignment") | |
| theme_labels_arr = np.array([i % target for i in range(n_sub)], dtype=int) | |
| substantial = substantial.copy() | |
| substantial["_theme_id"] = theme_labels_arr | |
| for theme_id, frame in substantial.groupby("_theme_id"): | |
| sorted_frame = frame.sort_values("Count", ascending=False) | |
| combined_kw = " ".join(sorted_frame["Keywords"].astype(str).tolist()) | |
| token_counts = Counter( | |
| w for w in re.findall(r"[a-zA-Z]{3,}", combined_kw.lower()) | |
| if w not in STOPWORDS | |
| ) | |
| theme_keywords = [term for term, _ in token_counts.most_common(12) if len(term) > 2][:8] | |
| dominant_label = sorted_frame.iloc[0]["Label"] | |
| themes.append({ | |
| "theme_id": int(theme_id), | |
| "theme_name": str(dominant_label), | |
| "topic_ids": sorted_frame["Topic ID"].astype(int).tolist(), | |
| "topic_labels": sorted_frame["Label"].astype(str).tolist(), | |
| "keywords": theme_keywords, | |
| "topic_count": int(len(sorted_frame)), | |
| "paper_count": int(sorted_frame["Count"].sum()), | |
| }) | |
| if not niche.empty: | |
| combined_kw = " ".join(niche["Keywords"].astype(str).tolist()) | |
| token_counts = Counter( | |
| w for w in re.findall(r"[a-zA-Z]{3,}", combined_kw.lower()) | |
| if w not in STOPWORDS | |
| ) | |
| niche_keywords = [term for term, _ in token_counts.most_common(10) if len(term) > 2][:8] | |
| themes.append({ | |
| "theme_id": 99, | |
| "theme_name": "Niche and Emerging Topics", | |
| "topic_ids": niche["Topic ID"].astype(int).tolist(), | |
| "topic_labels": niche["Label"].astype(str).tolist(), | |
| "keywords": niche_keywords, | |
| "topic_count": int(len(niche)), | |
| "paper_count": int(niche["Count"].sum()), | |
| }) | |
| themes = sorted(themes, key=lambda x: x["paper_count"], reverse=True) | |
| logger.info(f"β Generated {len(themes)} themes") | |
| return themes | |
| def review_themes(themes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| return [{**t, "meaningful": len(t.get("keywords", [])) >= 3} for t in themes] | |
| def name_themes_with_llm(themes: List[Dict[str, Any]], llm: Any) -> List[Dict[str, Any]]: | |
| if not themes: | |
| return themes | |
| named = [] | |
| llm_failed = False | |
| for theme in themes: | |
| fallback = theme["theme_name"] | |
| if theme.get("theme_id") == 99: | |
| named.append(theme) | |
| continue | |
| if llm is None or llm_failed: | |
| named.append(theme) | |
| continue | |
| topic_labels_str = "; ".join(theme.get("topic_labels", [])[:6]) | |
| kw_str = ", ".join(theme.get("keywords", [])[:8]) | |
| prompt = ( | |
| "You are naming a high-level research theme that groups related academic topics.\n\n" | |
| f"Topics in this theme: {topic_labels_str}\n" | |
| f"Common keywords: {kw_str}\n\n" | |
| "Generate a concise academic theme name (3-7 words). " | |
| "Output ONLY the theme name β no explanation, no quotes, no punctuation, no numbering." | |
| ) | |
| try: | |
| response = llm.invoke(prompt) | |
| content = getattr(response, "content", str(response)) | |
| new_name = _clean_llm_label(content, fallback) | |
| named.append({**theme, "theme_name": new_name}) | |
| except Exception as e: | |
| logger.warning(f"Theme naming failed: {e}") | |
| named.append(theme) | |
| llm_failed = True | |
| return named | |
| # --------------------------------------------------------------------------- | |
| # Taxonomy mapping | |
| # --------------------------------------------------------------------------- | |
| def create_taxonomy_map(topic_table: pd.DataFrame, source_column: str) -> List[Dict[str, str]]: | |
| if topic_table.empty: | |
| return [] | |
| mappings: List[Dict[str, str]] = [] | |
| for _, row in topic_table.iterrows(): | |
| topic_name = str(row["Label"]) | |
| text_blob = f"{row['Label']} {row['Keywords']}".lower() | |
| tokens = set(re.findall(r"[a-z]{3,}", text_blob)) | |
| best_category = None | |
| best_score = 0 | |
| for category, kws in PAJAIS_CATEGORIES.items(): | |
| exact = len(tokens.intersection(kws)) | |
| partial = sum(1 for kw in kws if any(kw in tok or tok in kw for tok in tokens if len(tok) > 3)) | |
| score = exact * 2 + partial | |
| if score > best_score: | |
| best_score = score | |
| best_category = category | |
| if best_category and best_score >= 2: | |
| mappings.append({ | |
| "source": source_column, | |
| "topic": topic_name, | |
| "classification": "MAPPED", | |
| "domain": best_category, | |
| "reason": f"Matches '{best_category}' with semantic score={best_score}.", | |
| }) | |
| else: | |
| mappings.append({ | |
| "source": source_column, | |
| "topic": topic_name, | |
| "classification": "NOVEL", | |
| "domain": "Emerging / Interdisciplinary", | |
| "reason": "No strong overlap with predefined PAJAIS taxonomy categories.", | |
| }) | |
| return mappings | |
| # --------------------------------------------------------------------------- | |
| # Charts | |
| # --------------------------------------------------------------------------- | |
| def create_topic_distribution_chart(topic_table: pd.DataFrame, source_column: str): | |
| if topic_table.empty: | |
| return px.bar(title=f"No topics available for {source_column}") | |
| frame = topic_table.sort_values("Count", ascending=False).head(30) | |
| return px.bar(frame, x="Label", y="Count", title=f"Topic Distribution β {source_column}") | |
| def create_keyword_chart(topic_table: pd.DataFrame, source_column: str): | |
| if topic_table.empty: | |
| return px.bar(title=f"No keywords available for {source_column}") | |
| keyword_counter: Counter = Counter() | |
| for _, row in topic_table.iterrows(): | |
| terms = [t.strip() for t in str(row["Keywords"]).split(",") if t.strip()] | |
| weight = int(row["Count"]) | |
| keyword_counter.update({term: weight for term in terms[:8]}) | |
| if not keyword_counter: | |
| return px.bar(title=f"No keywords available for {source_column}") | |
| key_df = pd.DataFrame(keyword_counter.items(), columns=["Keyword", "Weight"]) | |
| key_df = key_df.sort_values("Weight", ascending=False).head(30) | |
| return px.bar(key_df, x="Keyword", y="Weight", title=f"Keyword Salience β {source_column}") | |
| # --------------------------------------------------------------------------- | |
| # Main pipeline | |
| # --------------------------------------------------------------------------- | |
| def run_braun_clarke_pipeline( | |
| df: pd.DataFrame, | |
| source_column: str, | |
| llm: Any, | |
| output_dir: str = ".", | |
| use_multi_llm: bool = True, | |
| progress_callback=None, | |
| suggested_params: Optional[Dict[str, Any]] = None, | |
| ) -> "AnalysisResult": | |
| output_dir = str(Path(output_dir).resolve()) | |
| def _progress(msg: str): | |
| logger.info(msg) | |
| if progress_callback is not None: | |
| try: | |
| progress_callback(msg) | |
| except Exception: | |
| pass | |
| files = ensure_output_artifacts(output_dir) | |
| # Step 1: Build documents | |
| _progress(f"[{source_column}] π Step 1/7 β Building documents...") | |
| if source_column.lower() == "combined": | |
| embedding_docs, tfidf_docs, paper_count, row_indices = build_documents_combined(df) | |
| else: | |
| embedding_docs, tfidf_docs, paper_count, row_indices = build_documents(df, source_column) | |
| _progress(f"[{source_column}] β {paper_count} papers β {len(embedding_docs)} valid documents") | |
| if not embedding_docs: | |
| empty_table = pd.DataFrame(columns=[ | |
| "Topic ID", "Keywords", "groq_label", "hf_mistral_label", | |
| "hf_zephyr_label", "flan_label", "gemini_label", | |
| "Label", "cluster_size", "Count", | |
| ]) | |
| return AnalysisResult( | |
| source_column=source_column, paper_count=paper_count, unit_count=0, | |
| topic_table=empty_table, | |
| taxonomy_table=pd.DataFrame(columns=["source","topic","classification","domain","reason"]), | |
| distribution_fig=create_topic_distribution_chart(empty_table, source_column), | |
| keyword_fig=create_keyword_chart(empty_table, source_column), | |
| labels_payload={}, themes_payload={}, taxonomy_payload=[], | |
| suggested_params=suggested_params or {}, trial_log=[], | |
| paper_assignments=[], doc_row_indices=[], | |
| llm_suggested_params={}, bayesian_best_params={}, | |
| ) | |
| # Step 2: Embed | |
| _progress(f"[{source_column}] π’ Step 2/7 β Embedding {len(embedding_docs)} documents with SPECTER2...") | |
| embeddings = embed_documents(embedding_docs, cache_dir=output_dir) | |
| _progress(f"[{source_column}] β Embeddings shape: {embeddings.shape}") | |
| # Step 3: Cluster | |
| _progress(f"[{source_column}] π Step 3/7 β Clustering with UMAP + HDBSCAN...") | |
| if suggested_params is None: | |
| try: | |
| tfidf_vectorizer = TfidfVectorizer(stop_words="english", ngram_range=(1,2), max_features=2000) | |
| corpus_texts = ( | |
| (df.get("Title", pd.Series(dtype=str)).fillna("").astype(str).str.strip() | |
| + " \n " + | |
| df.get("Abstract", pd.Series(dtype=str)).fillna("").astype(str).str.strip()).tolist() | |
| if source_column.lower() == "combined" | |
| else df[source_column].dropna().astype(str).tolist() | |
| ) | |
| tfidf_matrix = tfidf_vectorizer.fit_transform(corpus_texts) | |
| vocab = np.array(tfidf_vectorizer.get_feature_names_out()) | |
| summed = np.asarray(tfidf_matrix.sum(axis=0)).ravel() | |
| top_idx = summed.argsort()[::-1][:40] | |
| sample_keywords = [str(vocab[i]) for i in top_idx if summed[i] > 0][:40] | |
| except Exception: | |
| sample_keywords = [] | |
| suggested_params = suggest_clustering_params_with_llm(len(embedding_docs), sample_keywords, llm=llm) | |
| _progress(f"[{source_column}] LLM-suggested params: {suggested_params}") | |
| llm_suggested_params = {k: v for k, v in suggested_params.items() if k != "reasoning"} | |
| opt_result = run_bayesian_hdbscan_optimization(embeddings, n_trials=80, warm_start_params=suggested_params) | |
| _progress(f"[{source_column}] Bayesian best_score={opt_result.get('best_score')} n_valid={opt_result.get('n_valid_trials')}") | |
| best = opt_result.get("best_params") or {} | |
| params_to_use = best if (best and opt_result.get("best_score", -1.0) > -1.0) else { | |
| "n_neighbors": 15, "min_cluster_size": 5, "min_samples": 1, | |
| "min_dist": 0.0, "n_components": 8, "cluster_selection_method": "eom", | |
| "cluster_selection_epsilon": 0.0, | |
| } | |
| bayesian_best_params = dict(params_to_use) | |
| try: | |
| labels, probabilities, persistence_scores = cluster_embeddings_umap_hdbscan( | |
| embeddings, | |
| min_cluster_size=int(params_to_use.get("min_cluster_size", 5)), | |
| min_samples=int(params_to_use.get("min_samples", 1)), | |
| n_neighbors=int(params_to_use.get("n_neighbors", min(15, len(embeddings)-1))), | |
| min_dist=float(params_to_use.get("min_dist", 0.0)), | |
| cluster_selection_method=str(params_to_use.get("cluster_selection_method", "eom")), | |
| cluster_selection_epsilon=float(params_to_use.get("cluster_selection_epsilon", 0.0)), | |
| ) | |
| except Exception as e: | |
| _progress(f"[{source_column}] Clustering failed: {e} β falling back to defaults") | |
| labels, probabilities, persistence_scores = cluster_embeddings_umap_hdbscan( | |
| embeddings, min_cluster_size=5, n_neighbors=min(15, len(embeddings) - 1), min_dist=0.0, | |
| ) | |
| n_clusters = len(np.unique(labels)) | |
| if opt_result.get("best_score", -1.0) <= -1.0: | |
| labels = control_cluster_count(embeddings, labels, min_clusters=15, max_clusters=30) | |
| n_clusters = len(np.unique(labels)) | |
| _progress(f"[{source_column}] β {n_clusters} clusters found") | |
| # Step 4: Keywords | |
| _progress(f"[{source_column}] π Step 4/7 β Extracting TF-IDF keywords...") | |
| topic_keywords = extract_topic_keywords(tfidf_docs, labels, top_n=10) | |
| noise_cluster_ids = {cid for cid, kws in topic_keywords.items() if is_noise_cluster(kws)} | |
| if noise_cluster_ids: | |
| valid_ids = [cid for cid in topic_keywords if cid not in noise_cluster_ids] | |
| if valid_ids: | |
| largest_valid = max(valid_ids, key=lambda cid: int((labels == cid).sum())) | |
| labels = np.array( | |
| [largest_valid if int(l) in noise_cluster_ids else int(l) for l in labels], dtype=int | |
| ) | |
| topic_keywords = extract_topic_keywords(tfidf_docs, labels, top_n=10) | |
| n_clusters = len(np.unique(labels)) | |
| _progress(f"[{source_column}] β After noise removal: {n_clusters} clusters") | |
| _progress(f"[{source_column}] β Keywords extracted for {len(topic_keywords)} topics") | |
| # Step 5: Multi-LLM labeling | |
| multi_llm_data: Dict[int, Dict[str, str]] = {} | |
| topic_table = build_topic_table(tfidf_docs, labels, topic_keywords, multi_llm_data, | |
| probabilities=probabilities, persistence_scores=persistence_scores) | |
| if use_multi_llm: | |
| _progress(f"[{source_column}] ποΈ Step 5/7 β Batch Multi-LLM Council labeling...") | |
| try: | |
| import llm_council | |
| topics_batch = [(int(row["Topic ID"]), topic_keywords.get(int(row["Topic ID"]), [])) | |
| for _, row in topic_table.iterrows()] | |
| n_topics = len(topics_batch) | |
| token_budget = min(60 * n_topics, 8000) | |
| if n_topics > 60: | |
| chunked: Dict[int, Dict[str, str]] = {} | |
| for chunk_start in range(0, n_topics, 50): | |
| chunk = topics_batch[chunk_start:chunk_start + 50] | |
| result = llm_council.run_council_batch( | |
| topics=chunk, system_prompt="You are an expert academic research analyst.", | |
| max_tokens=token_budget, | |
| ) | |
| for tid, plabels in result.items(): | |
| if plabels: | |
| chunked.setdefault(tid, {}).update(plabels) | |
| batch_result = chunked | |
| else: | |
| batch_result = llm_council.run_council_batch( | |
| topics=topics_batch, system_prompt="You are an expert academic research analyst.", | |
| max_tokens=token_budget, | |
| ) | |
| llm_label_map: Dict[int, str] = {} | |
| for topic_id, provider_labels in batch_result.items(): | |
| if not provider_labels: | |
| continue | |
| multi_llm_data[topic_id] = { | |
| "groq_label": provider_labels.get("Groq", ""), | |
| "mistral_label": provider_labels.get("Mistral", ""), | |
| "openrouter_label": provider_labels.get("OpenRouter", ""), | |
| "cohere_label": provider_labels.get("Cohere", ""), | |
| "hf_mistral_label": provider_labels.get("Mistral", ""), | |
| "hf_zephyr_label": "", | |
| "flan_label": provider_labels.get("Mistral", ""), | |
| "gemini_label": "", | |
| } | |
| llm_label_map[topic_id] = next(iter(provider_labels.values())) | |
| if llm_label_map: | |
| topic_table = build_topic_table(tfidf_docs, labels, topic_keywords, multi_llm_data, | |
| probabilities=probabilities, persistence_scores=persistence_scores) | |
| topic_table["Label"] = topic_table["Topic ID"].map(llm_label_map).fillna(topic_table["Label"]) | |
| _progress(f"[{source_column}] β Batch labeling done β {len(llm_label_map)}/{n_topics} labels assigned") | |
| else: | |
| _progress(f"[{source_column}] β οΈ Batch labeling returned 0 labels, falling back to single-LLM") | |
| llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm) | |
| if not topic_table.empty: | |
| topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"]) | |
| except ImportError: | |
| _progress(f"[{source_column}] β οΈ llm_council not found, using single-LLM") | |
| llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm) | |
| if not topic_table.empty: | |
| topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"]) | |
| except Exception as e: | |
| _progress(f"[{source_column}] β οΈ Batch council error ({e}), using single-LLM") | |
| llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm) | |
| if not topic_table.empty: | |
| topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"]) | |
| else: | |
| llm_labels = label_topics_with_llm(topic_table, topic_keywords, llm) | |
| if not topic_table.empty: | |
| topic_table["Label"] = topic_table["Topic ID"].map(llm_labels).fillna(topic_table["Label"]) | |
| # Step 6: Save labels | |
| _progress(f"[{source_column}] πΎ Step 6/7 β Saving labels...") | |
| labels_payload = { | |
| "phase": "Phase 2 - Generate Initial Topics (SPECTER2 + UMAP + HDBSCAN)", | |
| "source": source_column, | |
| "topic_labels": [ | |
| { | |
| "topic_id": int(row["Topic ID"]), | |
| "label": str(row["Label"]), | |
| "keywords": [k.strip() for k in str(row["Keywords"]).split(",") if k.strip()], | |
| "groq_label": row.get("groq_label", ""), | |
| "hf_mistral_label": row.get("hf_mistral_label", ""), | |
| "mistral_label": row.get("mistral_label", ""), | |
| "openrouter_label": row.get("openrouter_label", ""), | |
| "cohere_label": row.get("cohere_label", ""), | |
| "hf_zephyr_label": row.get("hf_zephyr_label", ""), | |
| "flan_label": row.get("flan_label", ""), | |
| "gemini_label": row.get("gemini_label", ""), | |
| "cluster_size": int(row["cluster_size"]), | |
| } | |
| for _, row in topic_table.iterrows() | |
| ], | |
| } | |
| upsert_json_bucket(files["labels"], source_column.lower(), labels_payload) | |
| # Step 7: Themes | |
| _progress(f"[{source_column}] πΏ Step 7/7 β Consolidating themes...") | |
| phase3_themes = consolidate_themes(topic_table, embeddings=embeddings, labels=labels, min_themes=4, max_themes=8) | |
| phase4_reviewed = review_themes(phase3_themes) | |
| phase5_named = name_themes_with_llm(phase4_reviewed, llm) | |
| themes_payload = {"phase3": phase3_themes, "phase4": phase4_reviewed, "phase5": phase5_named} | |
| upsert_json_bucket(files["themes"], source_column.lower(), themes_payload) | |
| # Taxonomy | |
| taxonomy_payload = create_taxonomy_map(topic_table, source_column) | |
| taxonomy_table = pd.DataFrame(taxonomy_payload, columns=["source","topic","classification","domain","reason"]) | |
| taxonomy_path = Path(files["taxonomy"]) | |
| existing_taxonomy: List[Dict[str, str]] = [] | |
| if taxonomy_path.exists(): | |
| try: | |
| loaded = json.loads(taxonomy_path.read_text(encoding="utf-8")) | |
| if isinstance(loaded, list): | |
| existing_taxonomy = loaded | |
| except json.JSONDecodeError: | |
| existing_taxonomy = [] | |
| remaining = [r for r in existing_taxonomy if r.get("source") != source_column] | |
| taxonomy_path.write_text( | |
| json.dumps(remaining + taxonomy_payload, indent=2, ensure_ascii=False), encoding="utf-8" | |
| ) | |
| distribution_fig = create_topic_distribution_chart(topic_table, source_column) | |
| keyword_fig = create_keyword_chart(topic_table, source_column) | |
| _progress( | |
| f"[{source_column}] β Done β " | |
| f"{len(embedding_docs)} docs β {n_clusters} clusters β {len(phase5_named)} themes" | |
| ) | |
| return AnalysisResult( | |
| source_column=source_column, | |
| paper_count=paper_count, | |
| unit_count=len(embedding_docs), | |
| topic_table=topic_table, | |
| taxonomy_table=taxonomy_table, | |
| distribution_fig=distribution_fig, | |
| keyword_fig=keyword_fig, | |
| labels_payload=labels_payload, | |
| themes_payload=themes_payload, | |
| taxonomy_payload=taxonomy_payload, | |
| suggested_params=suggested_params or {}, | |
| trial_log=opt_result.get("trial_log", []), | |
| paper_assignments=[int(l) for l in labels], | |
| doc_row_indices=row_indices, | |
| llm_suggested_params=llm_suggested_params, | |
| bayesian_best_params=bayesian_best_params, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Topic comparison | |
| # --------------------------------------------------------------------------- | |
| def compare_topic_frames( | |
| abstract_topics: pd.DataFrame, | |
| title_topics: pd.DataFrame, | |
| output_path: str = "comparison.csv", | |
| ) -> pd.DataFrame: | |
| output_path = str(Path(output_path).resolve()) | |
| logger.info("compare_topic_frames start: abstract=%s topics, title=%s topics", len(abstract_topics), len(title_topics)) | |
| if abstract_topics.empty and title_topics.empty: | |
| frame = pd.DataFrame([{"Abstract Topic": "N/A", "Title Topic": "N/A", "Similarity": 0.0, "Notes": "No topics available."}]) | |
| frame.to_csv(output_path, index=False) | |
| return frame | |
| if abstract_topics.empty: | |
| frame = pd.DataFrame([{"Abstract Topic": "N/A", "Title Topic": str(row["Label"]), "Similarity": 0.0, "Notes": "Only title analysis available."} for _, row in title_topics.head(25).iterrows()]) | |
| frame.to_csv(output_path, index=False) | |
| return frame | |
| if title_topics.empty: | |
| frame = pd.DataFrame([{"Abstract Topic": str(row["Label"]), "Title Topic": "N/A", "Similarity": 0.0, "Notes": "Only abstract analysis available."} for _, row in abstract_topics.head(25).iterrows()]) | |
| frame.to_csv(output_path, index=False) | |
| return frame | |
| abs_desc = (abstract_topics["Label"].astype(str) + ". " + abstract_topics["Keywords"].astype(str)).tolist() | |
| title_desc = (title_topics["Label"].astype(str) + ". " + title_topics["Keywords"].astype(str)).tolist() | |
| try: | |
| all_emb = embed_documents(abs_desc + title_desc) | |
| abs_emb = all_emb[:len(abs_desc)] | |
| title_emb = all_emb[len(abs_desc):] | |
| sim_matrix = cosine_similarity(abs_emb, title_emb) | |
| except Exception as e: | |
| logger.warning(f"Embedding comparison failed ({e}), using TF-IDF fallback") | |
| vec = TfidfVectorizer(stop_words="english", ngram_range=(1, 2)) | |
| mat = vec.fit_transform(abs_desc + title_desc) | |
| sim_matrix = cosine_similarity(mat[:len(abs_desc)], mat[len(abs_desc):]) | |
| rows = [] | |
| for i, abs_label in enumerate(abstract_topics["Label"].tolist()): | |
| best_idx = int(np.argmax(sim_matrix[i])) | |
| score = float(sim_matrix[i, best_idx]) | |
| title_label = str(title_topics.iloc[best_idx]["Label"]) | |
| alignment = ("Strong alignment" if score > 0.70 else "Moderate alignment" if score > 0.45 else "Weak alignment") | |
| rows.append({"Abstract Topic": abs_label, "Title Topic": title_label, "Similarity": round(score, 4), "Notes": f"{alignment} (cosine={score:.3f})"}) | |
| comparison_df = pd.DataFrame(rows) | |
| comparison_df.to_csv(output_path, index=False) | |
| logger.info(f"β comparison.csv written ({len(comparison_df)} rows)") | |
| return comparison_df | |
| # --------------------------------------------------------------------------- | |
| # Narrative | |
| # --------------------------------------------------------------------------- | |
| def _fallback_narrative(abstract_result, title_result, comparison_df) -> str: | |
| abs_n = 0 if abstract_result is None else len(abstract_result.topic_table) | |
| title_n = 0 if title_result is None else len(title_result.topic_table) | |
| def _novel(result): | |
| if result is None or result.taxonomy_table.empty: | |
| return 0 | |
| if "classification" not in result.taxonomy_table.columns: | |
| return 0 | |
| return int((result.taxonomy_table["classification"] == "NOVEL").sum()) | |
| top_abs = (abstract_result.topic_table.sort_values("Count", ascending=False)["Label"].head(5).tolist() | |
| if abstract_result and not abstract_result.topic_table.empty else []) | |
| top_title = (title_result.topic_table.sort_values("Count", ascending=False)["Label"].head(5).tolist() | |
| if title_result and not title_result.topic_table.empty else []) | |
| abs_themes = [] | |
| if abstract_result: | |
| abs_themes = [t.get("theme_name","") for t in abstract_result.themes_payload.get("phase5",[])[:8] if t.get("theme_name")] | |
| strong_pairs = [] | |
| if not comparison_df.empty and "Similarity" in comparison_df.columns: | |
| strong = comparison_df[comparison_df["Similarity"] > 0.60] | |
| strong_pairs = [f"'{r['Abstract Topic']}' β '{r['Title Topic']}' ({r['Similarity']:.2f})" for _, r in strong.head(5).iterrows()] | |
| return f""" | |
| This report synthesizes thematic topic modelling outcomes for Scopus metadata using a research-grade pipeline based on SPECTER2 document embeddings, UMAP dimensionality reduction, and HDBSCAN clustering. The analysis was run separately for abstracts and titles to reveal differences between detailed narrative content and concise paper framing language. A combined Title+Abstract stream was also analysed to capture the full bibliographic signal. | |
| In the abstract stream, {abs_n} distinct topics were identified, while the title stream generated {title_n} topics. These clusters were further consolidated into higher-level themes using agglomerative clustering on cluster centroids, yielding {len(abs_themes)} themes from the abstract stream. | |
| The abstract-derived topic space is dominated by: {', '.join(top_abs) if top_abs else 'insufficient abstract topics'}. Abstracts capture motivation, methods, and outcomes in full sentences, so the resulting clusters separate into fine-grained research niches. | |
| Title-driven topics include: {', '.join(top_title) if top_title else 'insufficient title topics'}. Titles present compressed intent, often merging conceptually adjacent ideas that abstracts keep separate. | |
| The identified themes from the abstract stream are: {'; '.join(abs_themes) if abs_themes else 'see themes.json for full list'}. These themes represent the major intellectual territories within this body of literature. | |
| Taxonomy mapping against PAJAIS reference categories identified {_novel(abstract_result)} novel topic instances in the abstract stream and {_novel(title_result)} in the title stream. These represent potential interdisciplinary frontiers not captured by canonical categories. | |
| Cross-stream alignment (embedding cosine similarity) reveals strongly aligned topic pairs: {'; '.join(strong_pairs) if strong_pairs else 'no strongly aligned pairs found (similarity > 0.60)'}. High-similarity pairs indicate stable thematic framing across metadata fields. | |
| The pipeline implements Braun and Clarke thematic phases end-to-end: familiarisation, coding, candidate-theme development, review, naming, and PAJAIS taxonomy positioning. Future iterations should include domain-expert validation of labels and longitudinal tracking of emergent themes. | |
| """.strip() | |
| def generate_narrative(abstract_result, title_result, comparison_df, llm, output_path="narrative.txt") -> str: | |
| output_path = str(Path(output_path).resolve()) | |
| abs_themes = [] | |
| if abstract_result: | |
| abs_themes = [t.get("theme_name","") for t in abstract_result.themes_payload.get("phase5",[])[:6]] | |
| if llm is None: | |
| narrative = _fallback_narrative(abstract_result, title_result, comparison_df) | |
| Path(output_path).write_text(narrative, encoding="utf-8") | |
| return narrative | |
| prompt = ( | |
| "Write an academic narrative of approximately 500 words for a research methods section. " | |
| "Context: Topic modeling on Scopus bibliometric data using SPECTER2 embeddings, " | |
| "UMAP + HDBSCAN clustering, and multi-LLM labeling (Groq + HuggingFace models). " | |
| f"Abstract topic count: {0 if abstract_result is None else len(abstract_result.topic_table)}. " | |
| f"Title topic count: {0 if title_result is None else len(title_result.topic_table)}. " | |
| f"Identified themes: {', '.join(abs_themes[:6]) if abs_themes else 'see themes.json'}. " | |
| "Discuss: methodology, key findings, thematic structure, novel topics, and research implications." | |
| ) | |
| try: | |
| response = llm.invoke(prompt) | |
| narrative = str(getattr(response, "content", response)).strip() | |
| if len(narrative.split()) < 350: | |
| narrative = _fallback_narrative(abstract_result, title_result, comparison_df) | |
| except Exception: | |
| narrative = _fallback_narrative(abstract_result, title_result, comparison_df) | |
| Path(output_path).write_text(narrative, encoding="utf-8") | |
| logger.info(f"β narrative.txt written ({len(narrative.split())} words)") | |
| return narrative | |
| # --------------------------------------------------------------------------- | |
| # Entry points | |
| # --------------------------------------------------------------------------- | |
| def run_single_analysis( | |
| mode: str, | |
| csv_path: Optional[str] = None, | |
| output_dir: str = ".", | |
| progress_callback=None, | |
| suggested_params: Optional[Dict[str, Any]] = None, | |
| ) -> "AnalysisResult": | |
| output_dir = str(Path(output_dir).resolve()) | |
| files = ensure_output_artifacts(output_dir) | |
| df, _ = load_scopus_csv(csv_path=csv_path, directory=output_dir) | |
| llm = create_groq_llm(temperature=0.0) | |
| col = {"abstract": "Abstract", "title": "Title", "combined": "Combined"}.get(mode.lower()) | |
| if col is None: | |
| raise ValueError("mode must be 'abstract', 'title', or 'combined'.") | |
| result = run_braun_clarke_pipeline( | |
| df, col, llm=llm, output_dir=output_dir, | |
| progress_callback=progress_callback, suggested_params=suggested_params, | |
| ) | |
| for fname, default_fn in [ | |
| (files["comparison"], lambda: pd.DataFrame(columns=["Abstract Topic","Title Topic","Similarity","Notes"]).to_csv(files["comparison"], index=False)), | |
| (files["narrative"], lambda: Path(files["narrative"]).write_text("Narrative placeholder.", encoding="utf-8")), | |
| ]: | |
| if not Path(fname).exists(): | |
| default_fn() | |
| return result | |
| def run_full_pipeline( | |
| csv_path: Optional[str] = None, | |
| output_dir: str = ".", | |
| progress_callback=None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run the full 3-stream pipeline: Abstract β Title β Combined (Title+Abstract). | |
| Returns a dict with keys: csv_path, abstract, title, combined, comparison, narrative, files. | |
| """ | |
| output_dir = str(Path(output_dir).resolve()) | |
| files = ensure_output_artifacts(output_dir) | |
| df, resolved_csv = load_scopus_csv(csv_path=csv_path, directory=output_dir) | |
| llm = create_groq_llm(temperature=0.0) | |
| abstract_result = run_braun_clarke_pipeline( | |
| df, "Abstract", llm=llm, output_dir=output_dir, progress_callback=progress_callback, | |
| ) | |
| title_result = run_braun_clarke_pipeline( | |
| df, "Title", llm=llm, output_dir=output_dir, progress_callback=progress_callback, | |
| ) | |
| # ββ NEW: Combined stream ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| combined_result = run_braun_clarke_pipeline( | |
| df, "Combined", llm=llm, output_dir=output_dir, progress_callback=progress_callback, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| comparison_df = compare_topic_frames( | |
| abstract_result.topic_table, title_result.topic_table, output_path=files["comparison"], | |
| ) | |
| narrative = generate_narrative( | |
| abstract_result, title_result, comparison_df, llm=llm, output_path=files["narrative"], | |
| ) | |
| return { | |
| "csv_path": resolved_csv, | |
| "abstract": abstract_result, | |
| "title": title_result, | |
| "combined": combined_result, # β now populated | |
| "comparison": comparison_df, | |
| "narrative": narrative, | |
| "files": files, | |
| } | |
| # Backward-compatibility alias | |
| run_specter2_multi_llm_pipeline = run_full_pipeline |