Spaces:
Sleeping
Sleeping
| """ | |
| tools_v2.py - SPECTER2 + HDBSCAN + UMAP thematic analysis tools. | |
| COMPLETELY INDEPENDENT from tools.py (v1). No shared state, no ordering dependency. | |
| V2 can be run before, after, or without ever running V1. | |
| OPTIMIZATION: | |
| - Initial clustering uses min_cluster_size=5 (unoptimized, shows params). | |
| SPECTER2 is allenai/specter2_base — a local HuggingFace model. | |
| NO API KEY required. Downloads once, cached automatically. | |
| Pipeline: | |
| 1. Combined Title+Abstract per paper → SPECTER2 embedding (768-dim) | |
| 2. UMAP (cosine, 5D) → tight document clusters | |
| 3. HDBSCAN (min_cluster_size=10 after optimization) → 15-30 clusters | |
| 4. Council-of-3-LLMs → 3 expert personas → semantic consensus voting | |
| 5. PAJAIS mapping + audit CSV + narrative | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import io | |
| import time | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.express as px | |
| from langchain_core.tools import tool | |
| from langchain_core.messages import HumanMessage | |
| from langchain_mistralai import ChatMistralAI | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_groq import ChatGroq | |
| import os | |
| if not os.getenv("GOOGLE_API_KEY") and os.getenv("GEMINI_API_KEY"): | |
| os.environ["GOOGLE_API_KEY"] = os.environ["GEMINI_API_KEY"] | |
| DATA_DIR = Path("data") | |
| DATA_DIR.mkdir(exist_ok=True) | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # OPTIMIZATION SETTING — change this value to adjust what "optimize" produces | |
| OPTIMIZED_MIN_CLUSTER_SIZE = 10 | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| PAJAIS_CATEGORIES = [ | |
| "Information Systems Theory", "IS Strategy & Governance", | |
| "Digital Innovation", "Enterprise Systems", | |
| "AI & Intelligent Systems", "Big Data & Analytics", | |
| "Cybersecurity & Privacy", "Cloud Computing", | |
| "IS in Healthcare", "IS in Education", | |
| "E-Commerce & Digital Markets", "Social Media & Platforms", | |
| "Human-Computer Interaction", "IS Project Management", | |
| "IT Outsourcing", "Knowledge Management", | |
| "IS Development Methodologies", "Digital Transformation", | |
| "IS Ethics & Society", "IS in Developing Countries", | |
| "Mobile Computing", "IT Infrastructure", | |
| "IS Adoption & Diffusion", "IS Evaluation", | |
| "Organizational IS & Change", | |
| ] | |
| # ── Semantic consensus voting helpers ───────────────────────────────────────── | |
| _MINILM = None | |
| _MINILM_LOCK = None | |
| def _get_minilm(): | |
| global _MINILM, _MINILM_LOCK | |
| import threading | |
| if _MINILM_LOCK is None: | |
| _MINILM_LOCK = threading.Lock() | |
| with _MINILM_LOCK: | |
| if _MINILM is None: | |
| from sentence_transformers import SentenceTransformer | |
| print("Loading all-MiniLM-L6-v2 for semantic voting...") | |
| _MINILM = SentenceTransformer("all-MiniLM-L6-v2") | |
| print("MiniLM loaded OK.") | |
| return _MINILM | |
| def _normalize_label(label: str) -> str: | |
| import string | |
| return label.lower().strip().translate(str.maketrans("", "", string.punctuation)) | |
| def _semantic_vote(votes: list[str], fallback_llm, cluster_id: int) -> tuple[str, str]: | |
| real_votes = [ | |
| v for v in votes | |
| if v and "error" not in v.lower() and "fallback" not in v.lower() | |
| and v.strip().lower() not in ("", "none", "null") | |
| ] | |
| if not real_votes: | |
| return "Cluster {} (all models failed)".format(cluster_id), "error_fallback" | |
| if len(real_votes) == 1: | |
| return real_votes[0], "error_fallback" | |
| normalized = [_normalize_label(v) for v in real_votes] | |
| if len(set(normalized)) == 1: | |
| return min(real_votes, key=len), "unanimous" | |
| try: | |
| model = _get_minilm() | |
| embs = model.encode(normalized, normalize_embeddings=True) | |
| n = len(embs) | |
| sim = np.inner(embs, embs) | |
| THRESHOLD = 0.60 | |
| assigned = [-1] * n | |
| groups = [] | |
| for i in range(n): | |
| if assigned[i] != -1: | |
| continue | |
| group = [i] | |
| for j in range(i + 1, n): | |
| if assigned[j] == -1 and sim[i][j] >= THRESHOLD: | |
| group.append(j) | |
| gid = len(groups) | |
| for idx in group: | |
| assigned[idx] = gid | |
| groups.append(group) | |
| best_group = max(groups, key=len) | |
| if len(best_group) >= 2: | |
| winner = min([real_votes[i] for i in best_group], key=len) | |
| vote_type = "unanimous" if len(best_group) == n else "semantic_majority" | |
| return winner, vote_type | |
| numbered = "\n".join("{}. {}".format(i + 1, v) for i, v in enumerate(real_votes)) | |
| prompt = ( | |
| "You are an IS research expert. Given these 3 different cluster labels " | |
| "produced by different LLMs, produce ONE concise unified label " | |
| "(4-7 words, noun-phrase, IS-specific). " | |
| "Return ONLY the label — no explanation, no markdown.\n\nLabels:\n" + numbered | |
| ) | |
| try: | |
| response = fallback_llm.invoke([HumanMessage(content=prompt)]) | |
| unified = response.content.strip().strip('"').strip("'") | |
| return unified, "semantic_split" | |
| except Exception as llm_err: | |
| print(" LLM consolidation failed: {}".format(llm_err)) | |
| return min(real_votes, key=len), "semantic_split" | |
| except Exception as embed_err: | |
| print(" Semantic voting failed ({}), using mode fallback.".format(embed_err)) | |
| from collections import Counter | |
| return Counter(real_votes).most_common(1)[0][0], "error_fallback" | |
| # ── lazy-loaded SPECTER2 ────────────────────────────────────────────────────── | |
| _SPECTER_TOKENIZER = None | |
| _SPECTER_MODEL_OBJ = None | |
| def _get_specter(): | |
| global _SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ | |
| return ( | |
| (_SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ) | |
| if (_SPECTER_TOKENIZER is not None and _SPECTER_MODEL_OBJ is not None) | |
| else _load_specter_fresh() | |
| ) | |
| def _load_specter_fresh(): | |
| global _SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ | |
| from transformers import AutoTokenizer, AutoModel | |
| MODEL_ID = "allenai/specter2_base" | |
| print("Loading SPECTER2 — one-time HuggingFace download, then cached...") | |
| _SPECTER_TOKENIZER = AutoTokenizer.from_pretrained(MODEL_ID) | |
| _SPECTER_MODEL_OBJ = AutoModel.from_pretrained(MODEL_ID) | |
| _SPECTER_MODEL_OBJ.eval() | |
| print("SPECTER2 loaded OK.") | |
| return _SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ | |
| def _embed_specter(texts: list) -> np.ndarray: | |
| import torch | |
| tokenizer, model = _get_specter() | |
| BATCH = 8 | |
| all_embs = [] | |
| for start in range(0, len(texts), BATCH): | |
| batch = texts[start: start + BATCH] | |
| inputs = tokenizer(batch, padding=True, truncation=True, | |
| max_length=512, return_tensors="pt") | |
| with torch.no_grad(): | |
| out = model(**inputs) | |
| emb = out.last_hidden_state[:, 0, :].numpy() | |
| norms = np.linalg.norm(emb, axis=1, keepdims=True) | |
| all_embs.append(emb / np.maximum(norms, 1e-9)) | |
| return np.vstack(all_embs) | |
| def _p2() -> dict: | |
| d = DATA_DIR / "v2" | |
| d.mkdir(parents=True, exist_ok=True) | |
| return { | |
| "dir": d, | |
| "papers": d / "papers.json", | |
| "embeddings": d / "embeddings.npy", | |
| "umap_emb": d / "umap_emb.npy", | |
| "umap_2d_emb": d / "umap_2d_emb.npy", | |
| "clusters": d / "clusters.json", | |
| "clusters_original": d / "clusters_original.json", | |
| "summaries": d / "summaries.json", | |
| "taxonomy": d / "taxonomy.json", | |
| "charts": d / "charts.json", | |
| "audit_csv": d / "cluster_audit.csv", | |
| "narrative": d / "narrative_v2.txt", | |
| "comparison": DATA_DIR / "comparison_v2.csv", | |
| "optimization_log": d / "optimization_log.json", | |
| } | |
| def _read_csv_robust(path) -> pd.DataFrame: | |
| raw = Path(path).read_bytes() | |
| for enc in ["utf-8", "utf-8-sig", "latin-1", "cp1252"]: | |
| decoded = raw.decode(enc, errors="replace") | |
| return pd.read_csv(io.StringIO(decoded)) | |
| return pd.read_csv(path) | |
| def _call_llm_json(llm, prompt: str): | |
| response = llm.invoke([HumanMessage(content=prompt)]) | |
| raw = response.content.strip() | |
| raw = raw.split("```json")[-1].split("```")[0].strip() if "```" in raw else raw | |
| return json.loads(raw) | |
| def _run_hdbscan(umap_embs: np.ndarray, mcs: int, min_samples: int = 3): | |
| """Run HDBSCAN on fixed UMAP embeddings. Deterministic for same inputs.""" | |
| import hdbscan as hdbscan_mod | |
| clusterer = hdbscan_mod.HDBSCAN( | |
| min_cluster_size=mcs, | |
| min_samples=min_samples, | |
| metric="euclidean", | |
| cluster_selection_method="eom", | |
| prediction_data=True, | |
| ) | |
| labels = clusterer.fit_predict(umap_embs) | |
| probs = clusterer.probabilities_ | |
| unique = sorted(set(labels.tolist()) - {-1}) | |
| noise = int((labels == -1).sum()) | |
| return labels, probs, unique, noise | |
| def _build_clusters(labels, probs, embs, papers): | |
| """Build cluster dicts from HDBSCAN output.""" | |
| unique = sorted(set(labels.tolist()) - {-1}) | |
| def build_one(enum_pair): | |
| seq_id, raw_cid = enum_pair | |
| mask = labels == raw_cid | |
| indices = [i for i, m in enumerate(mask.tolist()) if m] | |
| cpaps = [papers[i] for i in indices] | |
| cembs = embs[mask] | |
| cprobs = probs[mask].tolist() | |
| centroid = cembs.mean(axis=0) | |
| c_norm = centroid / max(float(np.linalg.norm(centroid)), 1e-9) | |
| norms = np.linalg.norm(cembs, axis=1, keepdims=True) | |
| sims = (cembs / np.maximum(norms, 1e-9) @ c_norm).tolist() | |
| top3 = sorted(range(len(sims)), key=lambda x: -sims[x])[:3] | |
| return { | |
| "cluster_id": seq_id + 1, | |
| "paper_count": int(mask.sum()), | |
| "papers": cpaps, | |
| "hdbscan_probs": cprobs, | |
| "centroid_sims": sims, | |
| "centroid": centroid.tolist(), | |
| "top3_paper_idx": top3, | |
| "top3_titles": [cpaps[i]["title"] for i in top3], | |
| "top3_abstracts": [cpaps[i]["abstract"][:200] for i in top3], | |
| } | |
| all_clusters = list(map(build_one, enumerate(unique))) | |
| valid = sorted([c for c in all_clusters if c["paper_count"] >= 5], | |
| key=lambda c: -c["paper_count"]) | |
| return [{**c, "cluster_id": i + 1} for i, c in enumerate(valid)] | |
| # ============================================================================= | |
| # V2 TOOL 1 — load_and_embed_specter2 | |
| # ============================================================================= | |
| def load_and_embed_specter2(csv_path: str = "data/uploaded.csv") -> str: | |
| """Load Scopus CSV, build one combined Title+Abstract text per paper, embed with SPECTER2. | |
| SPECTER2 (allenai/specter2_base) is a LOCAL HuggingFace model — NO API key needed. | |
| First call downloads ~440 MB and caches; subsequent calls are instant. | |
| Output saved to data/v2/ only — completely independent of Classic (v1) run. | |
| Args: | |
| csv_path: Path to uploaded Scopus CSV. | |
| """ | |
| p = _p2() | |
| df = _read_csv_robust(csv_path) | |
| col_map = {c.strip().lower(): c for c in df.columns} | |
| title_col = col_map.get("title", next((c for c in df.columns if "title" in c.lower()), None)) | |
| abstract_col = col_map.get("abstract", next((c for c in df.columns if "abstract" in c.lower()), None)) | |
| doi_col = col_map.get("doi", next((c for c in df.columns if "doi" in c.lower()), None)) | |
| year_col = col_map.get("year", next((c for c in df.columns if "year" in c.lower()), None)) | |
| journal_col = next((c for c in df.columns if "source" in c.lower()), None) | |
| n = len(df) | |
| titles = list(df[title_col].fillna("") if title_col else [""] * n) | |
| abstracts = list(df[abstract_col].fillna("") if abstract_col else [""] * n) | |
| dois = list(df[doi_col].fillna("") if doi_col else [""] * n) | |
| years = list(df[year_col].fillna("") if year_col else [""] * n) | |
| journals = list(df[journal_col].fillna("") if journal_col else [""] * n) | |
| combined = ["{} {}".format(str(titles[i]).strip(), str(abstracts[i]).strip()).strip() | |
| for i in range(n)] | |
| valid_idx = [i for i, t in enumerate(combined) if len(t.split()) > 5] | |
| papers = [{ | |
| "paper_idx": i, | |
| "title": titles[i], | |
| "abstract": abstracts[i], | |
| "doi": dois[i], | |
| "year": str(years[i]), | |
| "journal": str(journals[i]), | |
| "combined": combined[i], | |
| } for i in valid_idx] | |
| p["papers"].write_text(json.dumps(papers, indent=2, ensure_ascii=False)) | |
| valid_texts = [combined[i] for i in valid_idx] | |
| print("Embedding {} papers with SPECTER2...".format(len(valid_texts))) | |
| embs = _embed_specter(valid_texts) | |
| np.save(p["embeddings"], embs) | |
| return json.dumps({ | |
| "total_papers": n, | |
| "valid_papers": len(papers), | |
| "embedding_dim": int(embs.shape[1]), | |
| "note": "SPECTER2 embeddings saved to data/v2/. No API key needed.", | |
| }) | |
| # ============================================================================= | |
| # V2 TOOL 2 — cluster_with_umap_hdbscan (UNOPTIMIZED initial run) | |
| # ============================================================================= | |
| def cluster_with_umap_hdbscan( | |
| umap_neighbors: int = 15, | |
| umap_min_dist: float = 0.05, | |
| hdbscan_min_cluster_size: int = 5, | |
| hdbscan_min_samples: int = 3, | |
| ) -> str: | |
| """Reduce SPECTER2 embeddings with UMAP (cosine) then cluster with HDBSCAN. | |
| INITIAL RUN (unoptimized): uses min_cluster_size=5, may give 30-50 clusters. | |
| Parameters are shown in output. User can then type "optimize". | |
| DETERMINISTIC: UMAP saved with random_state=42. Same dataset = same result every run. | |
| Args: | |
| umap_neighbors: UMAP n_neighbors (default 15). | |
| umap_min_dist: UMAP min_dist (default 0.05). | |
| hdbscan_min_cluster_size: Min papers per cluster (default 5, unoptimized). | |
| hdbscan_min_samples: HDBSCAN min_samples (default 3). | |
| """ | |
| import umap as umap_mod | |
| p = _p2() | |
| embs = np.load(p["embeddings"]) | |
| papers = json.loads(p["papers"].read_text()) | |
| # ── UMAP 5-D — computed once, saved, reused by optimizer ───────────────── | |
| print("UMAP 5-D (n_neighbors={}, min_dist={}, random_state=42)...".format( | |
| umap_neighbors, umap_min_dist)) | |
| reducer = umap_mod.UMAP( | |
| n_components=5, n_neighbors=umap_neighbors, min_dist=umap_min_dist, | |
| metric="cosine", random_state=42, verbose=False, | |
| ) | |
| umap_embs = reducer.fit_transform(embs) | |
| np.save(p["umap_emb"], umap_embs) | |
| # ── UMAP 2-D for scatter — also fixed seed ──────────────────────────────── | |
| r2d = umap_mod.UMAP( | |
| n_components=2, n_neighbors=umap_neighbors, min_dist=umap_min_dist, | |
| metric="cosine", random_state=42, verbose=False, | |
| ) | |
| umap_2d = r2d.fit_transform(embs) | |
| np.save(p["umap_2d_emb"], umap_2d) | |
| # ── Initial HDBSCAN ─────────────────────────────────────────────────────── | |
| labels, probs, unique, noise = _run_hdbscan( | |
| umap_embs, hdbscan_min_cluster_size, hdbscan_min_samples) | |
| print("Raw clusters: {}, noise: {}".format(len(unique), noise)) | |
| valid = _build_clusters(labels, probs, embs, papers) | |
| p["clusters_original"].write_text(json.dumps(valid, indent=2, ensure_ascii=False)) | |
| p["clusters"].write_text(json.dumps(valid, indent=2, ensure_ascii=False)) | |
| # ── Charts ──────────────────────────────────────────────────────────────── | |
| cdf = pd.DataFrame({ | |
| "x": umap_2d[:, 0].tolist(), "y": umap_2d[:, 1].tolist(), | |
| "cluster": [str(lb) for lb in labels.tolist()], | |
| "title": [pp["title"][:50] for pp in papers], | |
| "prob": probs.tolist(), | |
| }) | |
| fig_s = px.scatter(cdf, x="x", y="y", color="cluster", | |
| hover_data=["title", "prob"], | |
| title="UMAP+HDBSCAN — {} clusters (unoptimized), {} noise".format( | |
| len(valid), noise)) | |
| fig_b = px.bar( | |
| x=["C{}".format(c["cluster_id"]) for c in valid], | |
| y=[c["paper_count"] for c in valid], | |
| title="Papers per Cluster (UNOPTIMIZED — min_cluster_size={})".format( | |
| hdbscan_min_cluster_size), | |
| ) | |
| p["charts"].write_text(json.dumps({ | |
| "scatter": fig_s.to_html(full_html=False, include_plotlyjs="cdn"), | |
| "bar": fig_b.to_html(full_html=False, include_plotlyjs=False), | |
| })) | |
| return json.dumps({ | |
| "status": "UNOPTIMIZED_CLUSTERING_COMPLETE", | |
| "parameters_used": { | |
| "umap_n_neighbors": umap_neighbors, | |
| "umap_min_dist": umap_min_dist, | |
| "umap_n_components": 5, | |
| "umap_metric": "cosine", | |
| "umap_random_state": 42, | |
| "hdbscan_min_cluster_size": hdbscan_min_cluster_size, | |
| "hdbscan_min_samples": hdbscan_min_samples, | |
| "hdbscan_metric": "euclidean", | |
| "hdbscan_cluster_selection": "eom", | |
| }, | |
| "clusters_found": len(valid), | |
| "noise_papers": noise, | |
| "total_papers": len(papers), | |
| "cluster_sizes": [c["paper_count"] for c in valid], | |
| "within_15_30": 15 <= len(valid) <= 30, | |
| "note": ( | |
| "Unoptimized run complete: {} clusters with min_cluster_size={}. " | |
| "Type 'optimize' to reduce to an optimal cluster count.".format( | |
| len(valid), hdbscan_min_cluster_size) | |
| ), | |
| "next_step": "Type 'optimize' to run cluster optimization.", | |
| }) | |
| # ============================================================================= | |
| # V2 TOOL 2B — optimize_clusters_hardcoded | |
| # ============================================================================= | |
| def optimize_clusters_hardcoded() -> str: | |
| p = _p2() | |
| embs = np.load(p["embeddings"]) | |
| papers = json.loads(p["papers"].read_text()) | |
| if not p["umap_emb"].exists(): | |
| return json.dumps({ | |
| "error": "UMAP embeddings not found. Run cluster_with_umap_hdbscan() first." | |
| }) | |
| umap_embs = np.load(p["umap_emb"]) # fixed, random_state=42 | |
| umap_2d = np.load(p["umap_2d_emb"]) # fixed, random_state=42 | |
| original_clusters = json.loads(p["clusters_original"].read_text()) | |
| original_count = len(original_clusters) | |
| MCS = 10 | |
| labels, probs, unique, noise_count = _run_hdbscan(umap_embs, MCS, min_samples=3) | |
| valid = _build_clusters(labels, probs, embs, papers) | |
| optimized_count = len(valid) | |
| print("Optimized: {} clusters, {} noise".format(optimized_count, noise_count)) | |
| p["clusters"].write_text(json.dumps(valid, indent=2, ensure_ascii=False)) | |
| # ── Optimization log ────────────────────────────────────────────────────── | |
| p["optimization_log"].write_text(json.dumps({ | |
| "original_clusters": original_count, | |
| "optimized_clusters": optimized_count, | |
| "chosen_min_cluster_size": MCS, | |
| "hdbscan_min_samples": 3, | |
| "hdbscan_metric": "euclidean", | |
| "hdbscan_cluster_selection": "eom", | |
| "umap_random_state": 42, | |
| "noise_papers": noise_count, | |
| "reduction": original_count - optimized_count, | |
| "timestamp": str(pd.Timestamp.now()), | |
| }, indent=2, ensure_ascii=False)) | |
| # ── Charts ──────────────────────────────────────────────────────────────── | |
| cdf = pd.DataFrame({ | |
| "x": umap_2d[:, 0].tolist(), | |
| "y": umap_2d[:, 1].tolist(), | |
| "cluster": [str(lb) for lb in labels.tolist()], | |
| "title": [pp["title"][:50] for pp in papers], | |
| "prob": probs.tolist(), | |
| }) | |
| fig_s = px.scatter(cdf, x="x", y="y", color="cluster", | |
| hover_data=["title", "prob"], | |
| title="OPTIMIZED UMAP+HDBSCAN — {} clusters, {} noise".format( | |
| optimized_count, noise_count)) | |
| fig_b = px.bar( | |
| x=["C{}".format(c["cluster_id"]) for c in valid], | |
| y=[c["paper_count"] for c in valid], | |
| title="Papers per Cluster (OPTIMIZED: {} clusters, min_cluster_size={})".format( | |
| optimized_count, MCS), | |
| ) | |
| p["charts"].write_text(json.dumps({ | |
| "scatter": fig_s.to_html(full_html=False, include_plotlyjs="cdn"), | |
| "bar": fig_b.to_html(full_html=False, include_plotlyjs=False), | |
| })) | |
| return json.dumps({ | |
| "status": "OPTIMIZATION_COMPLETE", | |
| "optimization_parameters": { | |
| "hdbscan_min_cluster_size": MCS, | |
| "hdbscan_min_samples": 3, | |
| "hdbscan_metric": "euclidean", | |
| "hdbscan_cluster_selection": "eom", | |
| "umap_n_components": 5, | |
| "umap_metric": "cosine", | |
| "umap_random_state": 42, | |
| "note": "UMAP reused from initial run (random_state=42, fully deterministic).", | |
| }, | |
| "results": { | |
| "original_clusters": original_count, | |
| "optimized_clusters": optimized_count, | |
| "reduction": original_count - optimized_count, | |
| "noise_papers": noise_count, | |
| "cluster_sizes": [c["paper_count"] for c in valid], | |
| "within_15_30": 15 <= optimized_count <= 30, | |
| "all_clusters_above_5_papers": all(c["paper_count"] >= 5 for c in valid), | |
| }, | |
| "determinism_note": ( | |
| "Same dataset will always produce the same optimized output. " | |
| "UMAP is fixed (random_state=42). " | |
| "HDBSCAN on the same UMAP array with min_cluster_size={} is deterministic.".format(MCS) | |
| ), | |
| "bot_message": ( | |
| "Optimization complete.\n" | |
| "Parameters: min_cluster_size={}, min_samples=3, metric=euclidean, " | |
| "cluster_selection=eom\n" | |
| "Original: {} clusters → Optimized: {} clusters\n" | |
| "Reduction: {} clusters removed\n" | |
| "All clusters have >= 5 papers: {}\n" | |
| "Within 15-30 target range: {}\n" | |
| "Ready for labeling.".format( | |
| MCS, | |
| original_count, optimized_count, | |
| original_count - optimized_count, | |
| all(c["paper_count"] >= 5 for c in valid), | |
| 15 <= optimized_count <= 30, | |
| ) | |
| ), | |
| "next_step": "Call label_clusters_council_of_3() to label the {} optimized clusters.".format( | |
| optimized_count), | |
| }) | |
| # ============================================================================= | |
| # V2 TOOL 3 — label_clusters_council_of_3 (parallel + cached multi-LLM) | |
| # ============================================================================= | |
| def label_clusters_council_of_3(batch_size: int = 5) -> str: | |
| """Label clusters using a TRUE council of 3 LLMs running IN PARALLEL: | |
| 1. Mistral (mistral-small-latest) | |
| 2. Gemini (gemini-2.5-flash) | |
| 3. Groq (llama-3.3-70b-versatile) | |
| SPEED: All 3 LLMs run concurrently via ThreadPoolExecutor. | |
| COST: SHA-256 disk cache — identical prompts are NEVER sent twice. | |
| LIMITS: Per-model retry with exponential backoff. | |
| API keys auto-read from env: MISTRAL_API_KEY, GOOGLE_API_KEY, GROQ_API_KEY | |
| Cache lives at: data/v2/llm_cache/ | |
| Args: | |
| batch_size: Clusters per LLM call (default 5). | |
| """ | |
| import hashlib | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| p = _p2() | |
| clusters = json.loads(p["clusters"].read_text()) | |
| CACHE_DIR = p["dir"] / "llm_cache" | |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| cache_lock = threading.Lock() | |
| def _cache_key(model_name: str, prompt: str) -> str: | |
| return hashlib.sha256("{}::{}".format(model_name, prompt).encode()).hexdigest() | |
| def _cache_get(model_name: str, prompt: str): | |
| path = CACHE_DIR / "{}.json".format(_cache_key(model_name, prompt)) | |
| with cache_lock: | |
| if path.exists(): | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| return None | |
| def _cache_set(model_name: str, prompt: str, result): | |
| path = CACHE_DIR / "{}.json".format(_cache_key(model_name, prompt)) | |
| with cache_lock: | |
| path.write_text(json.dumps(result, ensure_ascii=False), encoding="utf-8") | |
| COUNCIL = [ | |
| {"name": "MISTRAL", "model": ChatMistralAI(model="mistral-small-latest", temperature=0.2), "stagger": 0}, | |
| {"name": "GEMINI", "model": ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.2), "stagger": 1}, | |
| {"name": "GROQ", "model": ChatGroq(model="llama-3.3-70b-versatile", temperature=0.2), "stagger": 2}, | |
| ] | |
| def make_prompt(batch: list) -> str: | |
| mini = [{"cluster_id": c["cluster_id"], "paper_count": c["paper_count"], | |
| "top3_titles": c["top3_titles"], "top3_abstracts": c["top3_abstracts"]} | |
| for c in batch] | |
| return ( | |
| "You are an Information Systems research expert conducting a systematic " | |
| "literature review. Label each cluster with a precise 4-7 word noun-phrase " | |
| "that reflects its core IS research theme.\n\n" | |
| "Cluster IDs in this batch: " + str([c["cluster_id"] for c in batch]) + "\n\n" | |
| "CLUSTERS:\n" + json.dumps(mini, indent=2) + "\n\n" | |
| "Return ONLY a raw JSON array — no markdown, no preamble.\n" | |
| "Each element: cluster_id (int), label (4-7 words), " | |
| "confidence (High/Medium/Low), reasoning (one sentence)." | |
| ) | |
| def run_one_member(member: dict) -> tuple[str, dict]: | |
| name, llm, stagger = member["name"], member["model"], member["stagger"] | |
| results = {} | |
| if stagger: | |
| time.sleep(stagger) | |
| batch_starts = list(range(0, len(clusters), batch_size)) | |
| for bi, start in enumerate(batch_starts): | |
| batch = clusters[start: start + batch_size] | |
| prompt = make_prompt(batch) | |
| cached = _cache_get(name, prompt) | |
| if cached is not None: | |
| print(" [{}] batch {}/{} → CACHE HIT".format(name, bi + 1, len(batch_starts))) | |
| for item in cached: | |
| results[int(item.get("cluster_id", 0))] = item | |
| continue | |
| MAX_RETRIES = 4 | |
| for attempt in range(MAX_RETRIES): | |
| try: | |
| print(" [{}] batch {}/{} attempt {}".format( | |
| name, bi + 1, len(batch_starts), attempt + 1)) | |
| batch_result = _call_llm_json(llm, prompt) | |
| _cache_set(name, prompt, batch_result) | |
| for item in batch_result: | |
| results[int(item.get("cluster_id", 0))] = item | |
| break | |
| except Exception as e: | |
| wait = (2 ** attempt) * 15 | |
| print(" [{}] batch {} attempt {} FAILED: {}".format( | |
| name, bi + 1, attempt + 1, e)) | |
| if attempt < MAX_RETRIES - 1: | |
| time.sleep(wait) | |
| else: | |
| for c in batch: | |
| cid = c["cluster_id"] | |
| results[cid] = { | |
| "cluster_id": cid, | |
| "label": "Cluster {} ({} error)".format(cid, name), | |
| "confidence": "Low", | |
| "reasoning": "Fallback — {} failed: {}".format(name, str(e)[:80]), | |
| } | |
| BATCH_DELAYS = {"MISTRAL": 12, "GEMINI": 8, "GROQ": 15} | |
| if bi < len(batch_starts) - 1: | |
| time.sleep(BATCH_DELAYS.get(name, 12)) | |
| return name, results | |
| persona_results = {} | |
| print("Dispatching 3 LLMs in parallel...") | |
| with ThreadPoolExecutor(max_workers=3) as executor: | |
| futures = {executor.submit(run_one_member, m): m["name"] for m in COUNCIL} | |
| for future in as_completed(futures): | |
| member_name = futures[future] | |
| try: | |
| name, result_dict = future.result() | |
| persona_results[name] = result_dict | |
| print("[DONE] {} — {} labels".format(name, len(result_dict))) | |
| except Exception as e: | |
| print("[ERROR] {} crashed: {}".format(member_name, e)) | |
| persona_results[member_name] = {} | |
| LLM_NAMES = ["MISTRAL", "GEMINI", "GROQ"] | |
| _consolidation_llm = ChatMistralAI(model="mistral-small-latest", temperature=0.1) | |
| def enrich(cluster): | |
| cid = cluster["cluster_id"] | |
| raw_votes = [str(persona_results.get(n, {}).get(cid, {}).get("label", "")).strip() | |
| for n in LLM_NAMES] | |
| final, vote_type = _semantic_vote(raw_votes, _consolidation_llm, cid) | |
| return { | |
| **cluster, | |
| "label": final, | |
| "llm_vote_1_MISTRAL": raw_votes[0], | |
| "llm_vote_2_GEMINI": raw_votes[1], | |
| "llm_vote_3_GROQ": raw_votes[2], | |
| "confidence_1": persona_results.get("MISTRAL", {}).get(cid, {}).get("confidence", ""), | |
| "confidence_2": persona_results.get("GEMINI", {}).get(cid, {}).get("confidence", ""), | |
| "confidence_3": persona_results.get("GROQ", {}).get(cid, {}).get("confidence", ""), | |
| "reasoning_1": persona_results.get("MISTRAL", {}).get(cid, {}).get("reasoning", ""), | |
| "reasoning_2": persona_results.get("GEMINI", {}).get(cid, {}).get("reasoning", ""), | |
| "reasoning_3": persona_results.get("GROQ", {}).get(cid, {}).get("reasoning", ""), | |
| "vote_agreement": vote_type, | |
| } | |
| enriched = list(map(enrich, clusters)) | |
| p["summaries"].write_text(json.dumps(enriched, indent=2, ensure_ascii=False)) | |
| rows = [] | |
| for c in enriched: | |
| cid = c["cluster_id"] | |
| for li, paper in enumerate(c["papers"]): | |
| rows.append({ | |
| "cluster_id": cid, | |
| "final_label": c["label"], | |
| "vote_agreement": c["vote_agreement"], | |
| "llm1_MISTRAL_label": c["llm_vote_1_MISTRAL"], | |
| "llm2_GEMINI_label": c["llm_vote_2_GEMINI"], | |
| "llm3_GROQ_label": c["llm_vote_3_GROQ"], | |
| "llm1_confidence": c["confidence_1"], | |
| "llm2_confidence": c["confidence_2"], | |
| "llm3_confidence": c["confidence_3"], | |
| "llm1_reasoning": c["reasoning_1"], | |
| "llm2_reasoning": c["reasoning_2"], | |
| "llm3_reasoning": c["reasoning_3"], | |
| "paper_doi": paper.get("doi", ""), | |
| "paper_title": paper.get("title", ""), | |
| "paper_year": paper.get("year", ""), | |
| "paper_journal": paper.get("journal", ""), | |
| "abstract_preview": paper.get("abstract", "")[:300], | |
| "combined_preview": paper.get("combined", "")[:200], | |
| "centroid_cosine_sim": round(float( | |
| c["centroid_sims"][li] if li < len(c["centroid_sims"]) else 0.0), 4), | |
| "hdbscan_probability": round(float( | |
| c["hdbscan_probs"][li] if li < len(c["hdbscan_probs"]) else 0.0), 4), | |
| "is_top3_centroid": "YES" if li in c["top3_paper_idx"] else "no", | |
| }) | |
| pd.DataFrame(rows).to_csv(p["audit_csv"], index=False, encoding="utf-8-sig") | |
| cached_files = len(list(CACHE_DIR.glob("*.json"))) | |
| unanimous = sum(1 for c in enriched if c["vote_agreement"] == "unanimous") | |
| majority = sum(1 for c in enriched if c["vote_agreement"] == "semantic_majority") | |
| return json.dumps({ | |
| "clusters_labeled": len(enriched), | |
| "unanimous": unanimous, | |
| "majority": majority, | |
| "split": len(enriched) - unanimous - majority, | |
| "audit_csv_rows": len(rows), | |
| "council_members": LLM_NAMES, | |
| "execution": "parallel (ThreadPoolExecutor, 3 workers)", | |
| "cache_files_on_disk": cached_files, | |
| "cache_dir": str(CACHE_DIR), | |
| "note": ( | |
| "Parallel 3-LLM ensemble done. " | |
| "Cache has {} entries — re-runs use these for free. " | |
| "Audit CSV ready ({} rows).".format(cached_files, len(rows)) | |
| ), | |
| }) | |
| # ============================================================================= | |
| # V2 TOOL 4 — map_clusters_to_pajais_v2 | |
| # ============================================================================= | |
| def map_clusters_to_pajais_v2() -> str: | |
| """Map v2 cluster labels to PAJAIS 25 IS research categories via Mistral LLM. | |
| Saves taxonomy to data/v2/taxonomy.json. Independent of v1 taxonomy. | |
| """ | |
| p = _p2() | |
| summaries = json.loads(p["summaries"].read_text()) | |
| llm = ChatMistralAI(model="mistral-small-latest", temperature=0.1) | |
| mini = [{"cluster_id": s["cluster_id"], "name": s["label"], | |
| "sample": s["top3_titles"][:2]} for s in summaries] | |
| BATCH = 10 | |
| starts = list(range(0, len(mini), BATCH)) | |
| results = [] | |
| for bi, start in enumerate(starts): | |
| batch = mini[start: start + BATCH] | |
| prompt = ( | |
| "Map each IS research cluster to the single most relevant PAJAIS category.\n\n" | |
| "CLUSTERS:\n" + json.dumps(batch, indent=2) + "\n\n" | |
| "PAJAIS CATEGORIES:\n" + json.dumps(PAJAIS_CATEGORIES, indent=2) + "\n\n" | |
| "Return ONLY a raw JSON array. Each element: " | |
| "cluster_id (int), name (str), pajais_category (str), " | |
| "confidence (High/Medium/Low), rationale (one sentence). No markdown." | |
| ) | |
| results.extend(_call_llm_json(llm, prompt)) | |
| _ = time.sleep(10) if bi < len(starts) - 1 else None | |
| p["taxonomy"].write_text(json.dumps(results, indent=2, ensure_ascii=False)) | |
| return json.dumps({"mapped_clusters": len(results), | |
| "note": "PAJAIS taxonomy saved to data/v2/taxonomy.json"}) | |
| # ============================================================================= | |
| # V2 TOOL 5 — export_v2_outputs | |
| # ============================================================================= | |
| def export_v2_outputs() -> str: | |
| """Generate final comparison_v2.csv and narrative_v2.txt for the SPECTER2 run. | |
| comparison_v2.csv: enriched audit CSV with PAJAIS column added. | |
| narrative_v2.txt: 500-word Section 7 academic discussion. | |
| Both saved to data/v2/ and data/comparison_v2.csv. | |
| """ | |
| p = _p2() | |
| summaries = json.loads(p["summaries"].read_text()) | |
| taxonomy = json.loads(p["taxonomy"].read_text()) | |
| tax_map = {str(item.get("cluster_id", "")): item.get("pajais_category", "Unknown") | |
| for item in taxonomy} | |
| audit_df = pd.read_csv(p["audit_csv"], encoding="utf-8-sig") | |
| audit_df["pajais_category"] = [ | |
| tax_map.get(str(int(float(str(row["cluster_id"])))), "Unknown") | |
| for _, row in audit_df.iterrows() | |
| ] | |
| out_path = p["comparison"] | |
| audit_df.to_csv(out_path, index=False, encoding="utf-8-sig") | |
| llm = ChatMistralAI(model="mistral-small-latest", temperature=0.4) | |
| cluster_summary = [{"cluster": s["cluster_id"], "label": s["label"], | |
| "papers": s["paper_count"], "agreement": s["vote_agreement"]} | |
| for s in summaries] | |
| prompt = ( | |
| "Write Section 7 (Discussion and Thematic Synthesis) for a systematic " | |
| "IS literature review. ~500 words, formal academic prose.\n" | |
| "Method: SPECTER2 document embeddings + UMAP + HDBSCAN + council-of-3-LLMs labeling.\n" | |
| "Cover: (a) overview of clusters/themes, (b) dominant PAJAIS categories, " | |
| "(c) inter-cluster relationships, (d) implications for IS research, " | |
| "(e) methodological contribution vs traditional BERTopic, (f) limitations.\n\n" | |
| "CLUSTERS:\n" + json.dumps(cluster_summary, indent=2) + "\n\n" | |
| "PAJAIS MAPPING:\n" + json.dumps(taxonomy, indent=2) + "\n\n" | |
| "Continuous academic paragraphs only. No bullet points or headers." | |
| ) | |
| response = llm.invoke([HumanMessage(content=prompt)]) | |
| narrative = response.content | |
| p["narrative"].write_text(narrative, encoding="utf-8") | |
| return json.dumps({ | |
| "comparison_csv_rows": len(audit_df), | |
| "comparison_csv_path": str(out_path), | |
| "narrative_words": len(narrative.split()), | |
| "narrative_path": str(p["narrative"]), | |
| "note": "comparison_v2.csv + narrative_v2.txt ready in Download tab.", | |
| }) |