""" tools_v2.py - SPECTER2 + HDBSCAN + UMAP thematic analysis tools. COMPLETELY INDEPENDENT from tools.py (v1). No shared state, no ordering dependency. V2 can be run before, after, or without ever running V1. OPTIMIZATION: - Initial clustering uses min_cluster_size=5 (unoptimized, shows params). SPECTER2 is allenai/specter2_base — a local HuggingFace model. NO API KEY required. Downloads once, cached automatically. Pipeline: 1. Combined Title+Abstract per paper → SPECTER2 embedding (768-dim) 2. UMAP (cosine, 5D) → tight document clusters 3. HDBSCAN (min_cluster_size=10 after optimization) → 15-30 clusters 4. Council-of-3-LLMs → 3 expert personas → semantic consensus voting 5. PAJAIS mapping + audit CSV + narrative """ from __future__ import annotations import json import io import time from pathlib import Path import numpy as np import pandas as pd import plotly.express as px from langchain_core.tools import tool from langchain_core.messages import HumanMessage from langchain_mistralai import ChatMistralAI from langchain_google_genai import ChatGoogleGenerativeAI from langchain_groq import ChatGroq import os if not os.getenv("GOOGLE_API_KEY") and os.getenv("GEMINI_API_KEY"): os.environ["GOOGLE_API_KEY"] = os.environ["GEMINI_API_KEY"] DATA_DIR = Path("data") DATA_DIR.mkdir(exist_ok=True) # ───────────────────────────────────────────────────────────────────────────── # OPTIMIZATION SETTING — change this value to adjust what "optimize" produces OPTIMIZED_MIN_CLUSTER_SIZE = 10 # ───────────────────────────────────────────────────────────────────────────── PAJAIS_CATEGORIES = [ "Information Systems Theory", "IS Strategy & Governance", "Digital Innovation", "Enterprise Systems", "AI & Intelligent Systems", "Big Data & Analytics", "Cybersecurity & Privacy", "Cloud Computing", "IS in Healthcare", "IS in Education", "E-Commerce & Digital Markets", "Social Media & Platforms", "Human-Computer Interaction", "IS Project Management", "IT Outsourcing", "Knowledge Management", "IS Development Methodologies", "Digital Transformation", "IS Ethics & Society", "IS in Developing Countries", "Mobile Computing", "IT Infrastructure", "IS Adoption & Diffusion", "IS Evaluation", "Organizational IS & Change", ] # ── Semantic consensus voting helpers ───────────────────────────────────────── _MINILM = None _MINILM_LOCK = None def _get_minilm(): global _MINILM, _MINILM_LOCK import threading if _MINILM_LOCK is None: _MINILM_LOCK = threading.Lock() with _MINILM_LOCK: if _MINILM is None: from sentence_transformers import SentenceTransformer print("Loading all-MiniLM-L6-v2 for semantic voting...") _MINILM = SentenceTransformer("all-MiniLM-L6-v2") print("MiniLM loaded OK.") return _MINILM def _normalize_label(label: str) -> str: import string return label.lower().strip().translate(str.maketrans("", "", string.punctuation)) def _semantic_vote(votes: list[str], fallback_llm, cluster_id: int) -> tuple[str, str]: real_votes = [ v for v in votes if v and "error" not in v.lower() and "fallback" not in v.lower() and v.strip().lower() not in ("", "none", "null") ] if not real_votes: return "Cluster {} (all models failed)".format(cluster_id), "error_fallback" if len(real_votes) == 1: return real_votes[0], "error_fallback" normalized = [_normalize_label(v) for v in real_votes] if len(set(normalized)) == 1: return min(real_votes, key=len), "unanimous" try: model = _get_minilm() embs = model.encode(normalized, normalize_embeddings=True) n = len(embs) sim = np.inner(embs, embs) THRESHOLD = 0.60 assigned = [-1] * n groups = [] for i in range(n): if assigned[i] != -1: continue group = [i] for j in range(i + 1, n): if assigned[j] == -1 and sim[i][j] >= THRESHOLD: group.append(j) gid = len(groups) for idx in group: assigned[idx] = gid groups.append(group) best_group = max(groups, key=len) if len(best_group) >= 2: winner = min([real_votes[i] for i in best_group], key=len) vote_type = "unanimous" if len(best_group) == n else "semantic_majority" return winner, vote_type numbered = "\n".join("{}. {}".format(i + 1, v) for i, v in enumerate(real_votes)) prompt = ( "You are an IS research expert. Given these 3 different cluster labels " "produced by different LLMs, produce ONE concise unified label " "(4-7 words, noun-phrase, IS-specific). " "Return ONLY the label — no explanation, no markdown.\n\nLabels:\n" + numbered ) try: response = fallback_llm.invoke([HumanMessage(content=prompt)]) unified = response.content.strip().strip('"').strip("'") return unified, "semantic_split" except Exception as llm_err: print(" LLM consolidation failed: {}".format(llm_err)) return min(real_votes, key=len), "semantic_split" except Exception as embed_err: print(" Semantic voting failed ({}), using mode fallback.".format(embed_err)) from collections import Counter return Counter(real_votes).most_common(1)[0][0], "error_fallback" # ── lazy-loaded SPECTER2 ────────────────────────────────────────────────────── _SPECTER_TOKENIZER = None _SPECTER_MODEL_OBJ = None def _get_specter(): global _SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ return ( (_SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ) if (_SPECTER_TOKENIZER is not None and _SPECTER_MODEL_OBJ is not None) else _load_specter_fresh() ) def _load_specter_fresh(): global _SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ from transformers import AutoTokenizer, AutoModel MODEL_ID = "allenai/specter2_base" print("Loading SPECTER2 — one-time HuggingFace download, then cached...") _SPECTER_TOKENIZER = AutoTokenizer.from_pretrained(MODEL_ID) _SPECTER_MODEL_OBJ = AutoModel.from_pretrained(MODEL_ID) _SPECTER_MODEL_OBJ.eval() print("SPECTER2 loaded OK.") return _SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ def _embed_specter(texts: list) -> np.ndarray: import torch tokenizer, model = _get_specter() BATCH = 8 all_embs = [] for start in range(0, len(texts), BATCH): batch = texts[start: start + BATCH] inputs = tokenizer(batch, padding=True, truncation=True, max_length=512, return_tensors="pt") with torch.no_grad(): out = model(**inputs) emb = out.last_hidden_state[:, 0, :].numpy() norms = np.linalg.norm(emb, axis=1, keepdims=True) all_embs.append(emb / np.maximum(norms, 1e-9)) return np.vstack(all_embs) def _p2() -> dict: d = DATA_DIR / "v2" d.mkdir(parents=True, exist_ok=True) return { "dir": d, "papers": d / "papers.json", "embeddings": d / "embeddings.npy", "umap_emb": d / "umap_emb.npy", "umap_2d_emb": d / "umap_2d_emb.npy", "clusters": d / "clusters.json", "clusters_original": d / "clusters_original.json", "summaries": d / "summaries.json", "taxonomy": d / "taxonomy.json", "charts": d / "charts.json", "audit_csv": d / "cluster_audit.csv", "narrative": d / "narrative_v2.txt", "comparison": DATA_DIR / "comparison_v2.csv", "optimization_log": d / "optimization_log.json", } def _read_csv_robust(path) -> pd.DataFrame: raw = Path(path).read_bytes() for enc in ["utf-8", "utf-8-sig", "latin-1", "cp1252"]: decoded = raw.decode(enc, errors="replace") return pd.read_csv(io.StringIO(decoded)) return pd.read_csv(path) def _call_llm_json(llm, prompt: str): response = llm.invoke([HumanMessage(content=prompt)]) raw = response.content.strip() raw = raw.split("```json")[-1].split("```")[0].strip() if "```" in raw else raw return json.loads(raw) def _run_hdbscan(umap_embs: np.ndarray, mcs: int, min_samples: int = 3): """Run HDBSCAN on fixed UMAP embeddings. Deterministic for same inputs.""" import hdbscan as hdbscan_mod clusterer = hdbscan_mod.HDBSCAN( min_cluster_size=mcs, min_samples=min_samples, metric="euclidean", cluster_selection_method="eom", prediction_data=True, ) labels = clusterer.fit_predict(umap_embs) probs = clusterer.probabilities_ unique = sorted(set(labels.tolist()) - {-1}) noise = int((labels == -1).sum()) return labels, probs, unique, noise def _build_clusters(labels, probs, embs, papers): """Build cluster dicts from HDBSCAN output.""" unique = sorted(set(labels.tolist()) - {-1}) def build_one(enum_pair): seq_id, raw_cid = enum_pair mask = labels == raw_cid indices = [i for i, m in enumerate(mask.tolist()) if m] cpaps = [papers[i] for i in indices] cembs = embs[mask] cprobs = probs[mask].tolist() centroid = cembs.mean(axis=0) c_norm = centroid / max(float(np.linalg.norm(centroid)), 1e-9) norms = np.linalg.norm(cembs, axis=1, keepdims=True) sims = (cembs / np.maximum(norms, 1e-9) @ c_norm).tolist() top3 = sorted(range(len(sims)), key=lambda x: -sims[x])[:3] return { "cluster_id": seq_id + 1, "paper_count": int(mask.sum()), "papers": cpaps, "hdbscan_probs": cprobs, "centroid_sims": sims, "centroid": centroid.tolist(), "top3_paper_idx": top3, "top3_titles": [cpaps[i]["title"] for i in top3], "top3_abstracts": [cpaps[i]["abstract"][:200] for i in top3], } all_clusters = list(map(build_one, enumerate(unique))) valid = sorted([c for c in all_clusters if c["paper_count"] >= 5], key=lambda c: -c["paper_count"]) return [{**c, "cluster_id": i + 1} for i, c in enumerate(valid)] # ============================================================================= # V2 TOOL 1 — load_and_embed_specter2 # ============================================================================= @tool def load_and_embed_specter2(csv_path: str = "data/uploaded.csv") -> str: """Load Scopus CSV, build one combined Title+Abstract text per paper, embed with SPECTER2. SPECTER2 (allenai/specter2_base) is a LOCAL HuggingFace model — NO API key needed. First call downloads ~440 MB and caches; subsequent calls are instant. Output saved to data/v2/ only — completely independent of Classic (v1) run. Args: csv_path: Path to uploaded Scopus CSV. """ p = _p2() df = _read_csv_robust(csv_path) col_map = {c.strip().lower(): c for c in df.columns} title_col = col_map.get("title", next((c for c in df.columns if "title" in c.lower()), None)) abstract_col = col_map.get("abstract", next((c for c in df.columns if "abstract" in c.lower()), None)) doi_col = col_map.get("doi", next((c for c in df.columns if "doi" in c.lower()), None)) year_col = col_map.get("year", next((c for c in df.columns if "year" in c.lower()), None)) journal_col = next((c for c in df.columns if "source" in c.lower()), None) n = len(df) titles = list(df[title_col].fillna("") if title_col else [""] * n) abstracts = list(df[abstract_col].fillna("") if abstract_col else [""] * n) dois = list(df[doi_col].fillna("") if doi_col else [""] * n) years = list(df[year_col].fillna("") if year_col else [""] * n) journals = list(df[journal_col].fillna("") if journal_col else [""] * n) combined = ["{} {}".format(str(titles[i]).strip(), str(abstracts[i]).strip()).strip() for i in range(n)] valid_idx = [i for i, t in enumerate(combined) if len(t.split()) > 5] papers = [{ "paper_idx": i, "title": titles[i], "abstract": abstracts[i], "doi": dois[i], "year": str(years[i]), "journal": str(journals[i]), "combined": combined[i], } for i in valid_idx] p["papers"].write_text(json.dumps(papers, indent=2, ensure_ascii=False)) valid_texts = [combined[i] for i in valid_idx] print("Embedding {} papers with SPECTER2...".format(len(valid_texts))) embs = _embed_specter(valid_texts) np.save(p["embeddings"], embs) return json.dumps({ "total_papers": n, "valid_papers": len(papers), "embedding_dim": int(embs.shape[1]), "note": "SPECTER2 embeddings saved to data/v2/. No API key needed.", }) # ============================================================================= # V2 TOOL 2 — cluster_with_umap_hdbscan (UNOPTIMIZED initial run) # ============================================================================= @tool def cluster_with_umap_hdbscan( umap_neighbors: int = 15, umap_min_dist: float = 0.05, hdbscan_min_cluster_size: int = 5, hdbscan_min_samples: int = 3, ) -> str: """Reduce SPECTER2 embeddings with UMAP (cosine) then cluster with HDBSCAN. INITIAL RUN (unoptimized): uses min_cluster_size=5, may give 30-50 clusters. Parameters are shown in output. User can then type "optimize". DETERMINISTIC: UMAP saved with random_state=42. Same dataset = same result every run. Args: umap_neighbors: UMAP n_neighbors (default 15). umap_min_dist: UMAP min_dist (default 0.05). hdbscan_min_cluster_size: Min papers per cluster (default 5, unoptimized). hdbscan_min_samples: HDBSCAN min_samples (default 3). """ import umap as umap_mod p = _p2() embs = np.load(p["embeddings"]) papers = json.loads(p["papers"].read_text()) # ── UMAP 5-D — computed once, saved, reused by optimizer ───────────────── print("UMAP 5-D (n_neighbors={}, min_dist={}, random_state=42)...".format( umap_neighbors, umap_min_dist)) reducer = umap_mod.UMAP( n_components=5, n_neighbors=umap_neighbors, min_dist=umap_min_dist, metric="cosine", random_state=42, verbose=False, ) umap_embs = reducer.fit_transform(embs) np.save(p["umap_emb"], umap_embs) # ── UMAP 2-D for scatter — also fixed seed ──────────────────────────────── r2d = umap_mod.UMAP( n_components=2, n_neighbors=umap_neighbors, min_dist=umap_min_dist, metric="cosine", random_state=42, verbose=False, ) umap_2d = r2d.fit_transform(embs) np.save(p["umap_2d_emb"], umap_2d) # ── Initial HDBSCAN ─────────────────────────────────────────────────────── labels, probs, unique, noise = _run_hdbscan( umap_embs, hdbscan_min_cluster_size, hdbscan_min_samples) print("Raw clusters: {}, noise: {}".format(len(unique), noise)) valid = _build_clusters(labels, probs, embs, papers) p["clusters_original"].write_text(json.dumps(valid, indent=2, ensure_ascii=False)) p["clusters"].write_text(json.dumps(valid, indent=2, ensure_ascii=False)) # ── Charts ──────────────────────────────────────────────────────────────── cdf = pd.DataFrame({ "x": umap_2d[:, 0].tolist(), "y": umap_2d[:, 1].tolist(), "cluster": [str(lb) for lb in labels.tolist()], "title": [pp["title"][:50] for pp in papers], "prob": probs.tolist(), }) fig_s = px.scatter(cdf, x="x", y="y", color="cluster", hover_data=["title", "prob"], title="UMAP+HDBSCAN — {} clusters (unoptimized), {} noise".format( len(valid), noise)) fig_b = px.bar( x=["C{}".format(c["cluster_id"]) for c in valid], y=[c["paper_count"] for c in valid], title="Papers per Cluster (UNOPTIMIZED — min_cluster_size={})".format( hdbscan_min_cluster_size), ) p["charts"].write_text(json.dumps({ "scatter": fig_s.to_html(full_html=False, include_plotlyjs="cdn"), "bar": fig_b.to_html(full_html=False, include_plotlyjs=False), })) return json.dumps({ "status": "UNOPTIMIZED_CLUSTERING_COMPLETE", "parameters_used": { "umap_n_neighbors": umap_neighbors, "umap_min_dist": umap_min_dist, "umap_n_components": 5, "umap_metric": "cosine", "umap_random_state": 42, "hdbscan_min_cluster_size": hdbscan_min_cluster_size, "hdbscan_min_samples": hdbscan_min_samples, "hdbscan_metric": "euclidean", "hdbscan_cluster_selection": "eom", }, "clusters_found": len(valid), "noise_papers": noise, "total_papers": len(papers), "cluster_sizes": [c["paper_count"] for c in valid], "within_15_30": 15 <= len(valid) <= 30, "note": ( "Unoptimized run complete: {} clusters with min_cluster_size={}. " "Type 'optimize' to reduce to an optimal cluster count.".format( len(valid), hdbscan_min_cluster_size) ), "next_step": "Type 'optimize' to run cluster optimization.", }) # ============================================================================= # V2 TOOL 2B — optimize_clusters_hardcoded # ============================================================================= @tool def optimize_clusters_hardcoded() -> str: p = _p2() embs = np.load(p["embeddings"]) papers = json.loads(p["papers"].read_text()) if not p["umap_emb"].exists(): return json.dumps({ "error": "UMAP embeddings not found. Run cluster_with_umap_hdbscan() first." }) umap_embs = np.load(p["umap_emb"]) # fixed, random_state=42 umap_2d = np.load(p["umap_2d_emb"]) # fixed, random_state=42 original_clusters = json.loads(p["clusters_original"].read_text()) original_count = len(original_clusters) MCS = 10 labels, probs, unique, noise_count = _run_hdbscan(umap_embs, MCS, min_samples=3) valid = _build_clusters(labels, probs, embs, papers) optimized_count = len(valid) print("Optimized: {} clusters, {} noise".format(optimized_count, noise_count)) p["clusters"].write_text(json.dumps(valid, indent=2, ensure_ascii=False)) # ── Optimization log ────────────────────────────────────────────────────── p["optimization_log"].write_text(json.dumps({ "original_clusters": original_count, "optimized_clusters": optimized_count, "chosen_min_cluster_size": MCS, "hdbscan_min_samples": 3, "hdbscan_metric": "euclidean", "hdbscan_cluster_selection": "eom", "umap_random_state": 42, "noise_papers": noise_count, "reduction": original_count - optimized_count, "timestamp": str(pd.Timestamp.now()), }, indent=2, ensure_ascii=False)) # ── Charts ──────────────────────────────────────────────────────────────── cdf = pd.DataFrame({ "x": umap_2d[:, 0].tolist(), "y": umap_2d[:, 1].tolist(), "cluster": [str(lb) for lb in labels.tolist()], "title": [pp["title"][:50] for pp in papers], "prob": probs.tolist(), }) fig_s = px.scatter(cdf, x="x", y="y", color="cluster", hover_data=["title", "prob"], title="OPTIMIZED UMAP+HDBSCAN — {} clusters, {} noise".format( optimized_count, noise_count)) fig_b = px.bar( x=["C{}".format(c["cluster_id"]) for c in valid], y=[c["paper_count"] for c in valid], title="Papers per Cluster (OPTIMIZED: {} clusters, min_cluster_size={})".format( optimized_count, MCS), ) p["charts"].write_text(json.dumps({ "scatter": fig_s.to_html(full_html=False, include_plotlyjs="cdn"), "bar": fig_b.to_html(full_html=False, include_plotlyjs=False), })) return json.dumps({ "status": "OPTIMIZATION_COMPLETE", "optimization_parameters": { "hdbscan_min_cluster_size": MCS, "hdbscan_min_samples": 3, "hdbscan_metric": "euclidean", "hdbscan_cluster_selection": "eom", "umap_n_components": 5, "umap_metric": "cosine", "umap_random_state": 42, "note": "UMAP reused from initial run (random_state=42, fully deterministic).", }, "results": { "original_clusters": original_count, "optimized_clusters": optimized_count, "reduction": original_count - optimized_count, "noise_papers": noise_count, "cluster_sizes": [c["paper_count"] for c in valid], "within_15_30": 15 <= optimized_count <= 30, "all_clusters_above_5_papers": all(c["paper_count"] >= 5 for c in valid), }, "determinism_note": ( "Same dataset will always produce the same optimized output. " "UMAP is fixed (random_state=42). " "HDBSCAN on the same UMAP array with min_cluster_size={} is deterministic.".format(MCS) ), "bot_message": ( "Optimization complete.\n" "Parameters: min_cluster_size={}, min_samples=3, metric=euclidean, " "cluster_selection=eom\n" "Original: {} clusters → Optimized: {} clusters\n" "Reduction: {} clusters removed\n" "All clusters have >= 5 papers: {}\n" "Within 15-30 target range: {}\n" "Ready for labeling.".format( MCS, original_count, optimized_count, original_count - optimized_count, all(c["paper_count"] >= 5 for c in valid), 15 <= optimized_count <= 30, ) ), "next_step": "Call label_clusters_council_of_3() to label the {} optimized clusters.".format( optimized_count), }) # ============================================================================= # V2 TOOL 3 — label_clusters_council_of_3 (parallel + cached multi-LLM) # ============================================================================= @tool def label_clusters_council_of_3(batch_size: int = 5) -> str: """Label clusters using a TRUE council of 3 LLMs running IN PARALLEL: 1. Mistral (mistral-small-latest) 2. Gemini (gemini-2.5-flash) 3. Groq (llama-3.3-70b-versatile) SPEED: All 3 LLMs run concurrently via ThreadPoolExecutor. COST: SHA-256 disk cache — identical prompts are NEVER sent twice. LIMITS: Per-model retry with exponential backoff. API keys auto-read from env: MISTRAL_API_KEY, GOOGLE_API_KEY, GROQ_API_KEY Cache lives at: data/v2/llm_cache/ Args: batch_size: Clusters per LLM call (default 5). """ import hashlib import threading from concurrent.futures import ThreadPoolExecutor, as_completed p = _p2() clusters = json.loads(p["clusters"].read_text()) CACHE_DIR = p["dir"] / "llm_cache" CACHE_DIR.mkdir(parents=True, exist_ok=True) cache_lock = threading.Lock() def _cache_key(model_name: str, prompt: str) -> str: return hashlib.sha256("{}::{}".format(model_name, prompt).encode()).hexdigest() def _cache_get(model_name: str, prompt: str): path = CACHE_DIR / "{}.json".format(_cache_key(model_name, prompt)) with cache_lock: if path.exists(): return json.loads(path.read_text(encoding="utf-8")) return None def _cache_set(model_name: str, prompt: str, result): path = CACHE_DIR / "{}.json".format(_cache_key(model_name, prompt)) with cache_lock: path.write_text(json.dumps(result, ensure_ascii=False), encoding="utf-8") COUNCIL = [ {"name": "MISTRAL", "model": ChatMistralAI(model="mistral-small-latest", temperature=0.2), "stagger": 0}, {"name": "GEMINI", "model": ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.2), "stagger": 1}, {"name": "GROQ", "model": ChatGroq(model="llama-3.3-70b-versatile", temperature=0.2), "stagger": 2}, ] def make_prompt(batch: list) -> str: mini = [{"cluster_id": c["cluster_id"], "paper_count": c["paper_count"], "top3_titles": c["top3_titles"], "top3_abstracts": c["top3_abstracts"]} for c in batch] return ( "You are an Information Systems research expert conducting a systematic " "literature review. Label each cluster with a precise 4-7 word noun-phrase " "that reflects its core IS research theme.\n\n" "Cluster IDs in this batch: " + str([c["cluster_id"] for c in batch]) + "\n\n" "CLUSTERS:\n" + json.dumps(mini, indent=2) + "\n\n" "Return ONLY a raw JSON array — no markdown, no preamble.\n" "Each element: cluster_id (int), label (4-7 words), " "confidence (High/Medium/Low), reasoning (one sentence)." ) def run_one_member(member: dict) -> tuple[str, dict]: name, llm, stagger = member["name"], member["model"], member["stagger"] results = {} if stagger: time.sleep(stagger) batch_starts = list(range(0, len(clusters), batch_size)) for bi, start in enumerate(batch_starts): batch = clusters[start: start + batch_size] prompt = make_prompt(batch) cached = _cache_get(name, prompt) if cached is not None: print(" [{}] batch {}/{} → CACHE HIT".format(name, bi + 1, len(batch_starts))) for item in cached: results[int(item.get("cluster_id", 0))] = item continue MAX_RETRIES = 4 for attempt in range(MAX_RETRIES): try: print(" [{}] batch {}/{} attempt {}".format( name, bi + 1, len(batch_starts), attempt + 1)) batch_result = _call_llm_json(llm, prompt) _cache_set(name, prompt, batch_result) for item in batch_result: results[int(item.get("cluster_id", 0))] = item break except Exception as e: wait = (2 ** attempt) * 15 print(" [{}] batch {} attempt {} FAILED: {}".format( name, bi + 1, attempt + 1, e)) if attempt < MAX_RETRIES - 1: time.sleep(wait) else: for c in batch: cid = c["cluster_id"] results[cid] = { "cluster_id": cid, "label": "Cluster {} ({} error)".format(cid, name), "confidence": "Low", "reasoning": "Fallback — {} failed: {}".format(name, str(e)[:80]), } BATCH_DELAYS = {"MISTRAL": 12, "GEMINI": 8, "GROQ": 15} if bi < len(batch_starts) - 1: time.sleep(BATCH_DELAYS.get(name, 12)) return name, results persona_results = {} print("Dispatching 3 LLMs in parallel...") with ThreadPoolExecutor(max_workers=3) as executor: futures = {executor.submit(run_one_member, m): m["name"] for m in COUNCIL} for future in as_completed(futures): member_name = futures[future] try: name, result_dict = future.result() persona_results[name] = result_dict print("[DONE] {} — {} labels".format(name, len(result_dict))) except Exception as e: print("[ERROR] {} crashed: {}".format(member_name, e)) persona_results[member_name] = {} LLM_NAMES = ["MISTRAL", "GEMINI", "GROQ"] _consolidation_llm = ChatMistralAI(model="mistral-small-latest", temperature=0.1) def enrich(cluster): cid = cluster["cluster_id"] raw_votes = [str(persona_results.get(n, {}).get(cid, {}).get("label", "")).strip() for n in LLM_NAMES] final, vote_type = _semantic_vote(raw_votes, _consolidation_llm, cid) return { **cluster, "label": final, "llm_vote_1_MISTRAL": raw_votes[0], "llm_vote_2_GEMINI": raw_votes[1], "llm_vote_3_GROQ": raw_votes[2], "confidence_1": persona_results.get("MISTRAL", {}).get(cid, {}).get("confidence", ""), "confidence_2": persona_results.get("GEMINI", {}).get(cid, {}).get("confidence", ""), "confidence_3": persona_results.get("GROQ", {}).get(cid, {}).get("confidence", ""), "reasoning_1": persona_results.get("MISTRAL", {}).get(cid, {}).get("reasoning", ""), "reasoning_2": persona_results.get("GEMINI", {}).get(cid, {}).get("reasoning", ""), "reasoning_3": persona_results.get("GROQ", {}).get(cid, {}).get("reasoning", ""), "vote_agreement": vote_type, } enriched = list(map(enrich, clusters)) p["summaries"].write_text(json.dumps(enriched, indent=2, ensure_ascii=False)) rows = [] for c in enriched: cid = c["cluster_id"] for li, paper in enumerate(c["papers"]): rows.append({ "cluster_id": cid, "final_label": c["label"], "vote_agreement": c["vote_agreement"], "llm1_MISTRAL_label": c["llm_vote_1_MISTRAL"], "llm2_GEMINI_label": c["llm_vote_2_GEMINI"], "llm3_GROQ_label": c["llm_vote_3_GROQ"], "llm1_confidence": c["confidence_1"], "llm2_confidence": c["confidence_2"], "llm3_confidence": c["confidence_3"], "llm1_reasoning": c["reasoning_1"], "llm2_reasoning": c["reasoning_2"], "llm3_reasoning": c["reasoning_3"], "paper_doi": paper.get("doi", ""), "paper_title": paper.get("title", ""), "paper_year": paper.get("year", ""), "paper_journal": paper.get("journal", ""), "abstract_preview": paper.get("abstract", "")[:300], "combined_preview": paper.get("combined", "")[:200], "centroid_cosine_sim": round(float( c["centroid_sims"][li] if li < len(c["centroid_sims"]) else 0.0), 4), "hdbscan_probability": round(float( c["hdbscan_probs"][li] if li < len(c["hdbscan_probs"]) else 0.0), 4), "is_top3_centroid": "YES" if li in c["top3_paper_idx"] else "no", }) pd.DataFrame(rows).to_csv(p["audit_csv"], index=False, encoding="utf-8-sig") cached_files = len(list(CACHE_DIR.glob("*.json"))) unanimous = sum(1 for c in enriched if c["vote_agreement"] == "unanimous") majority = sum(1 for c in enriched if c["vote_agreement"] == "semantic_majority") return json.dumps({ "clusters_labeled": len(enriched), "unanimous": unanimous, "majority": majority, "split": len(enriched) - unanimous - majority, "audit_csv_rows": len(rows), "council_members": LLM_NAMES, "execution": "parallel (ThreadPoolExecutor, 3 workers)", "cache_files_on_disk": cached_files, "cache_dir": str(CACHE_DIR), "note": ( "Parallel 3-LLM ensemble done. " "Cache has {} entries — re-runs use these for free. " "Audit CSV ready ({} rows).".format(cached_files, len(rows)) ), }) # ============================================================================= # V2 TOOL 4 — map_clusters_to_pajais_v2 # ============================================================================= @tool def map_clusters_to_pajais_v2() -> str: """Map v2 cluster labels to PAJAIS 25 IS research categories via Mistral LLM. Saves taxonomy to data/v2/taxonomy.json. Independent of v1 taxonomy. """ p = _p2() summaries = json.loads(p["summaries"].read_text()) llm = ChatMistralAI(model="mistral-small-latest", temperature=0.1) mini = [{"cluster_id": s["cluster_id"], "name": s["label"], "sample": s["top3_titles"][:2]} for s in summaries] BATCH = 10 starts = list(range(0, len(mini), BATCH)) results = [] for bi, start in enumerate(starts): batch = mini[start: start + BATCH] prompt = ( "Map each IS research cluster to the single most relevant PAJAIS category.\n\n" "CLUSTERS:\n" + json.dumps(batch, indent=2) + "\n\n" "PAJAIS CATEGORIES:\n" + json.dumps(PAJAIS_CATEGORIES, indent=2) + "\n\n" "Return ONLY a raw JSON array. Each element: " "cluster_id (int), name (str), pajais_category (str), " "confidence (High/Medium/Low), rationale (one sentence). No markdown." ) results.extend(_call_llm_json(llm, prompt)) _ = time.sleep(10) if bi < len(starts) - 1 else None p["taxonomy"].write_text(json.dumps(results, indent=2, ensure_ascii=False)) return json.dumps({"mapped_clusters": len(results), "note": "PAJAIS taxonomy saved to data/v2/taxonomy.json"}) # ============================================================================= # V2 TOOL 5 — export_v2_outputs # ============================================================================= @tool def export_v2_outputs() -> str: """Generate final comparison_v2.csv and narrative_v2.txt for the SPECTER2 run. comparison_v2.csv: enriched audit CSV with PAJAIS column added. narrative_v2.txt: 500-word Section 7 academic discussion. Both saved to data/v2/ and data/comparison_v2.csv. """ p = _p2() summaries = json.loads(p["summaries"].read_text()) taxonomy = json.loads(p["taxonomy"].read_text()) tax_map = {str(item.get("cluster_id", "")): item.get("pajais_category", "Unknown") for item in taxonomy} audit_df = pd.read_csv(p["audit_csv"], encoding="utf-8-sig") audit_df["pajais_category"] = [ tax_map.get(str(int(float(str(row["cluster_id"])))), "Unknown") for _, row in audit_df.iterrows() ] out_path = p["comparison"] audit_df.to_csv(out_path, index=False, encoding="utf-8-sig") llm = ChatMistralAI(model="mistral-small-latest", temperature=0.4) cluster_summary = [{"cluster": s["cluster_id"], "label": s["label"], "papers": s["paper_count"], "agreement": s["vote_agreement"]} for s in summaries] prompt = ( "Write Section 7 (Discussion and Thematic Synthesis) for a systematic " "IS literature review. ~500 words, formal academic prose.\n" "Method: SPECTER2 document embeddings + UMAP + HDBSCAN + council-of-3-LLMs labeling.\n" "Cover: (a) overview of clusters/themes, (b) dominant PAJAIS categories, " "(c) inter-cluster relationships, (d) implications for IS research, " "(e) methodological contribution vs traditional BERTopic, (f) limitations.\n\n" "CLUSTERS:\n" + json.dumps(cluster_summary, indent=2) + "\n\n" "PAJAIS MAPPING:\n" + json.dumps(taxonomy, indent=2) + "\n\n" "Continuous academic paragraphs only. No bullet points or headers." ) response = llm.invoke([HumanMessage(content=prompt)]) narrative = response.content p["narrative"].write_text(narrative, encoding="utf-8") return json.dumps({ "comparison_csv_rows": len(audit_df), "comparison_csv_path": str(out_path), "narrative_words": len(narrative.split()), "narrative_path": str(p["narrative"]), "note": "comparison_v2.csv + narrative_v2.txt ready in Download tab.", })