topic_modelling / tools_v2.py
aadisawant2912's picture
Update tools_v2.py
800e948 verified
"""
tools_v2.py - SPECTER2 + HDBSCAN + UMAP thematic analysis tools.
COMPLETELY INDEPENDENT from tools.py (v1). No shared state, no ordering dependency.
V2 can be run before, after, or without ever running V1.
OPTIMIZATION:
- Initial clustering uses min_cluster_size=5 (unoptimized, shows params).
SPECTER2 is allenai/specter2_base — a local HuggingFace model.
NO API KEY required. Downloads once, cached automatically.
Pipeline:
1. Combined Title+Abstract per paper → SPECTER2 embedding (768-dim)
2. UMAP (cosine, 5D) → tight document clusters
3. HDBSCAN (min_cluster_size=10 after optimization) → 15-30 clusters
4. Council-of-3-LLMs → 3 expert personas → semantic consensus voting
5. PAJAIS mapping + audit CSV + narrative
"""
from __future__ import annotations
import json
import io
import time
from pathlib import Path
import numpy as np
import pandas as pd
import plotly.express as px
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage
from langchain_mistralai import ChatMistralAI
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_groq import ChatGroq
import os
if not os.getenv("GOOGLE_API_KEY") and os.getenv("GEMINI_API_KEY"):
os.environ["GOOGLE_API_KEY"] = os.environ["GEMINI_API_KEY"]
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)
# ─────────────────────────────────────────────────────────────────────────────
# OPTIMIZATION SETTING — change this value to adjust what "optimize" produces
OPTIMIZED_MIN_CLUSTER_SIZE = 10
# ─────────────────────────────────────────────────────────────────────────────
PAJAIS_CATEGORIES = [
"Information Systems Theory", "IS Strategy & Governance",
"Digital Innovation", "Enterprise Systems",
"AI & Intelligent Systems", "Big Data & Analytics",
"Cybersecurity & Privacy", "Cloud Computing",
"IS in Healthcare", "IS in Education",
"E-Commerce & Digital Markets", "Social Media & Platforms",
"Human-Computer Interaction", "IS Project Management",
"IT Outsourcing", "Knowledge Management",
"IS Development Methodologies", "Digital Transformation",
"IS Ethics & Society", "IS in Developing Countries",
"Mobile Computing", "IT Infrastructure",
"IS Adoption & Diffusion", "IS Evaluation",
"Organizational IS & Change",
]
# ── Semantic consensus voting helpers ─────────────────────────────────────────
_MINILM = None
_MINILM_LOCK = None
def _get_minilm():
global _MINILM, _MINILM_LOCK
import threading
if _MINILM_LOCK is None:
_MINILM_LOCK = threading.Lock()
with _MINILM_LOCK:
if _MINILM is None:
from sentence_transformers import SentenceTransformer
print("Loading all-MiniLM-L6-v2 for semantic voting...")
_MINILM = SentenceTransformer("all-MiniLM-L6-v2")
print("MiniLM loaded OK.")
return _MINILM
def _normalize_label(label: str) -> str:
import string
return label.lower().strip().translate(str.maketrans("", "", string.punctuation))
def _semantic_vote(votes: list[str], fallback_llm, cluster_id: int) -> tuple[str, str]:
real_votes = [
v for v in votes
if v and "error" not in v.lower() and "fallback" not in v.lower()
and v.strip().lower() not in ("", "none", "null")
]
if not real_votes:
return "Cluster {} (all models failed)".format(cluster_id), "error_fallback"
if len(real_votes) == 1:
return real_votes[0], "error_fallback"
normalized = [_normalize_label(v) for v in real_votes]
if len(set(normalized)) == 1:
return min(real_votes, key=len), "unanimous"
try:
model = _get_minilm()
embs = model.encode(normalized, normalize_embeddings=True)
n = len(embs)
sim = np.inner(embs, embs)
THRESHOLD = 0.60
assigned = [-1] * n
groups = []
for i in range(n):
if assigned[i] != -1:
continue
group = [i]
for j in range(i + 1, n):
if assigned[j] == -1 and sim[i][j] >= THRESHOLD:
group.append(j)
gid = len(groups)
for idx in group:
assigned[idx] = gid
groups.append(group)
best_group = max(groups, key=len)
if len(best_group) >= 2:
winner = min([real_votes[i] for i in best_group], key=len)
vote_type = "unanimous" if len(best_group) == n else "semantic_majority"
return winner, vote_type
numbered = "\n".join("{}. {}".format(i + 1, v) for i, v in enumerate(real_votes))
prompt = (
"You are an IS research expert. Given these 3 different cluster labels "
"produced by different LLMs, produce ONE concise unified label "
"(4-7 words, noun-phrase, IS-specific). "
"Return ONLY the label — no explanation, no markdown.\n\nLabels:\n" + numbered
)
try:
response = fallback_llm.invoke([HumanMessage(content=prompt)])
unified = response.content.strip().strip('"').strip("'")
return unified, "semantic_split"
except Exception as llm_err:
print(" LLM consolidation failed: {}".format(llm_err))
return min(real_votes, key=len), "semantic_split"
except Exception as embed_err:
print(" Semantic voting failed ({}), using mode fallback.".format(embed_err))
from collections import Counter
return Counter(real_votes).most_common(1)[0][0], "error_fallback"
# ── lazy-loaded SPECTER2 ──────────────────────────────────────────────────────
_SPECTER_TOKENIZER = None
_SPECTER_MODEL_OBJ = None
def _get_specter():
global _SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ
return (
(_SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ)
if (_SPECTER_TOKENIZER is not None and _SPECTER_MODEL_OBJ is not None)
else _load_specter_fresh()
)
def _load_specter_fresh():
global _SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ
from transformers import AutoTokenizer, AutoModel
MODEL_ID = "allenai/specter2_base"
print("Loading SPECTER2 — one-time HuggingFace download, then cached...")
_SPECTER_TOKENIZER = AutoTokenizer.from_pretrained(MODEL_ID)
_SPECTER_MODEL_OBJ = AutoModel.from_pretrained(MODEL_ID)
_SPECTER_MODEL_OBJ.eval()
print("SPECTER2 loaded OK.")
return _SPECTER_TOKENIZER, _SPECTER_MODEL_OBJ
def _embed_specter(texts: list) -> np.ndarray:
import torch
tokenizer, model = _get_specter()
BATCH = 8
all_embs = []
for start in range(0, len(texts), BATCH):
batch = texts[start: start + BATCH]
inputs = tokenizer(batch, padding=True, truncation=True,
max_length=512, return_tensors="pt")
with torch.no_grad():
out = model(**inputs)
emb = out.last_hidden_state[:, 0, :].numpy()
norms = np.linalg.norm(emb, axis=1, keepdims=True)
all_embs.append(emb / np.maximum(norms, 1e-9))
return np.vstack(all_embs)
def _p2() -> dict:
d = DATA_DIR / "v2"
d.mkdir(parents=True, exist_ok=True)
return {
"dir": d,
"papers": d / "papers.json",
"embeddings": d / "embeddings.npy",
"umap_emb": d / "umap_emb.npy",
"umap_2d_emb": d / "umap_2d_emb.npy",
"clusters": d / "clusters.json",
"clusters_original": d / "clusters_original.json",
"summaries": d / "summaries.json",
"taxonomy": d / "taxonomy.json",
"charts": d / "charts.json",
"audit_csv": d / "cluster_audit.csv",
"narrative": d / "narrative_v2.txt",
"comparison": DATA_DIR / "comparison_v2.csv",
"optimization_log": d / "optimization_log.json",
}
def _read_csv_robust(path) -> pd.DataFrame:
raw = Path(path).read_bytes()
for enc in ["utf-8", "utf-8-sig", "latin-1", "cp1252"]:
decoded = raw.decode(enc, errors="replace")
return pd.read_csv(io.StringIO(decoded))
return pd.read_csv(path)
def _call_llm_json(llm, prompt: str):
response = llm.invoke([HumanMessage(content=prompt)])
raw = response.content.strip()
raw = raw.split("```json")[-1].split("```")[0].strip() if "```" in raw else raw
return json.loads(raw)
def _run_hdbscan(umap_embs: np.ndarray, mcs: int, min_samples: int = 3):
"""Run HDBSCAN on fixed UMAP embeddings. Deterministic for same inputs."""
import hdbscan as hdbscan_mod
clusterer = hdbscan_mod.HDBSCAN(
min_cluster_size=mcs,
min_samples=min_samples,
metric="euclidean",
cluster_selection_method="eom",
prediction_data=True,
)
labels = clusterer.fit_predict(umap_embs)
probs = clusterer.probabilities_
unique = sorted(set(labels.tolist()) - {-1})
noise = int((labels == -1).sum())
return labels, probs, unique, noise
def _build_clusters(labels, probs, embs, papers):
"""Build cluster dicts from HDBSCAN output."""
unique = sorted(set(labels.tolist()) - {-1})
def build_one(enum_pair):
seq_id, raw_cid = enum_pair
mask = labels == raw_cid
indices = [i for i, m in enumerate(mask.tolist()) if m]
cpaps = [papers[i] for i in indices]
cembs = embs[mask]
cprobs = probs[mask].tolist()
centroid = cembs.mean(axis=0)
c_norm = centroid / max(float(np.linalg.norm(centroid)), 1e-9)
norms = np.linalg.norm(cembs, axis=1, keepdims=True)
sims = (cembs / np.maximum(norms, 1e-9) @ c_norm).tolist()
top3 = sorted(range(len(sims)), key=lambda x: -sims[x])[:3]
return {
"cluster_id": seq_id + 1,
"paper_count": int(mask.sum()),
"papers": cpaps,
"hdbscan_probs": cprobs,
"centroid_sims": sims,
"centroid": centroid.tolist(),
"top3_paper_idx": top3,
"top3_titles": [cpaps[i]["title"] for i in top3],
"top3_abstracts": [cpaps[i]["abstract"][:200] for i in top3],
}
all_clusters = list(map(build_one, enumerate(unique)))
valid = sorted([c for c in all_clusters if c["paper_count"] >= 5],
key=lambda c: -c["paper_count"])
return [{**c, "cluster_id": i + 1} for i, c in enumerate(valid)]
# =============================================================================
# V2 TOOL 1 — load_and_embed_specter2
# =============================================================================
@tool
def load_and_embed_specter2(csv_path: str = "data/uploaded.csv") -> str:
"""Load Scopus CSV, build one combined Title+Abstract text per paper, embed with SPECTER2.
SPECTER2 (allenai/specter2_base) is a LOCAL HuggingFace model — NO API key needed.
First call downloads ~440 MB and caches; subsequent calls are instant.
Output saved to data/v2/ only — completely independent of Classic (v1) run.
Args:
csv_path: Path to uploaded Scopus CSV.
"""
p = _p2()
df = _read_csv_robust(csv_path)
col_map = {c.strip().lower(): c for c in df.columns}
title_col = col_map.get("title", next((c for c in df.columns if "title" in c.lower()), None))
abstract_col = col_map.get("abstract", next((c for c in df.columns if "abstract" in c.lower()), None))
doi_col = col_map.get("doi", next((c for c in df.columns if "doi" in c.lower()), None))
year_col = col_map.get("year", next((c for c in df.columns if "year" in c.lower()), None))
journal_col = next((c for c in df.columns if "source" in c.lower()), None)
n = len(df)
titles = list(df[title_col].fillna("") if title_col else [""] * n)
abstracts = list(df[abstract_col].fillna("") if abstract_col else [""] * n)
dois = list(df[doi_col].fillna("") if doi_col else [""] * n)
years = list(df[year_col].fillna("") if year_col else [""] * n)
journals = list(df[journal_col].fillna("") if journal_col else [""] * n)
combined = ["{} {}".format(str(titles[i]).strip(), str(abstracts[i]).strip()).strip()
for i in range(n)]
valid_idx = [i for i, t in enumerate(combined) if len(t.split()) > 5]
papers = [{
"paper_idx": i,
"title": titles[i],
"abstract": abstracts[i],
"doi": dois[i],
"year": str(years[i]),
"journal": str(journals[i]),
"combined": combined[i],
} for i in valid_idx]
p["papers"].write_text(json.dumps(papers, indent=2, ensure_ascii=False))
valid_texts = [combined[i] for i in valid_idx]
print("Embedding {} papers with SPECTER2...".format(len(valid_texts)))
embs = _embed_specter(valid_texts)
np.save(p["embeddings"], embs)
return json.dumps({
"total_papers": n,
"valid_papers": len(papers),
"embedding_dim": int(embs.shape[1]),
"note": "SPECTER2 embeddings saved to data/v2/. No API key needed.",
})
# =============================================================================
# V2 TOOL 2 — cluster_with_umap_hdbscan (UNOPTIMIZED initial run)
# =============================================================================
@tool
def cluster_with_umap_hdbscan(
umap_neighbors: int = 15,
umap_min_dist: float = 0.05,
hdbscan_min_cluster_size: int = 5,
hdbscan_min_samples: int = 3,
) -> str:
"""Reduce SPECTER2 embeddings with UMAP (cosine) then cluster with HDBSCAN.
INITIAL RUN (unoptimized): uses min_cluster_size=5, may give 30-50 clusters.
Parameters are shown in output. User can then type "optimize".
DETERMINISTIC: UMAP saved with random_state=42. Same dataset = same result every run.
Args:
umap_neighbors: UMAP n_neighbors (default 15).
umap_min_dist: UMAP min_dist (default 0.05).
hdbscan_min_cluster_size: Min papers per cluster (default 5, unoptimized).
hdbscan_min_samples: HDBSCAN min_samples (default 3).
"""
import umap as umap_mod
p = _p2()
embs = np.load(p["embeddings"])
papers = json.loads(p["papers"].read_text())
# ── UMAP 5-D — computed once, saved, reused by optimizer ─────────────────
print("UMAP 5-D (n_neighbors={}, min_dist={}, random_state=42)...".format(
umap_neighbors, umap_min_dist))
reducer = umap_mod.UMAP(
n_components=5, n_neighbors=umap_neighbors, min_dist=umap_min_dist,
metric="cosine", random_state=42, verbose=False,
)
umap_embs = reducer.fit_transform(embs)
np.save(p["umap_emb"], umap_embs)
# ── UMAP 2-D for scatter — also fixed seed ────────────────────────────────
r2d = umap_mod.UMAP(
n_components=2, n_neighbors=umap_neighbors, min_dist=umap_min_dist,
metric="cosine", random_state=42, verbose=False,
)
umap_2d = r2d.fit_transform(embs)
np.save(p["umap_2d_emb"], umap_2d)
# ── Initial HDBSCAN ───────────────────────────────────────────────────────
labels, probs, unique, noise = _run_hdbscan(
umap_embs, hdbscan_min_cluster_size, hdbscan_min_samples)
print("Raw clusters: {}, noise: {}".format(len(unique), noise))
valid = _build_clusters(labels, probs, embs, papers)
p["clusters_original"].write_text(json.dumps(valid, indent=2, ensure_ascii=False))
p["clusters"].write_text(json.dumps(valid, indent=2, ensure_ascii=False))
# ── Charts ────────────────────────────────────────────────────────────────
cdf = pd.DataFrame({
"x": umap_2d[:, 0].tolist(), "y": umap_2d[:, 1].tolist(),
"cluster": [str(lb) for lb in labels.tolist()],
"title": [pp["title"][:50] for pp in papers],
"prob": probs.tolist(),
})
fig_s = px.scatter(cdf, x="x", y="y", color="cluster",
hover_data=["title", "prob"],
title="UMAP+HDBSCAN — {} clusters (unoptimized), {} noise".format(
len(valid), noise))
fig_b = px.bar(
x=["C{}".format(c["cluster_id"]) for c in valid],
y=[c["paper_count"] for c in valid],
title="Papers per Cluster (UNOPTIMIZED — min_cluster_size={})".format(
hdbscan_min_cluster_size),
)
p["charts"].write_text(json.dumps({
"scatter": fig_s.to_html(full_html=False, include_plotlyjs="cdn"),
"bar": fig_b.to_html(full_html=False, include_plotlyjs=False),
}))
return json.dumps({
"status": "UNOPTIMIZED_CLUSTERING_COMPLETE",
"parameters_used": {
"umap_n_neighbors": umap_neighbors,
"umap_min_dist": umap_min_dist,
"umap_n_components": 5,
"umap_metric": "cosine",
"umap_random_state": 42,
"hdbscan_min_cluster_size": hdbscan_min_cluster_size,
"hdbscan_min_samples": hdbscan_min_samples,
"hdbscan_metric": "euclidean",
"hdbscan_cluster_selection": "eom",
},
"clusters_found": len(valid),
"noise_papers": noise,
"total_papers": len(papers),
"cluster_sizes": [c["paper_count"] for c in valid],
"within_15_30": 15 <= len(valid) <= 30,
"note": (
"Unoptimized run complete: {} clusters with min_cluster_size={}. "
"Type 'optimize' to reduce to an optimal cluster count.".format(
len(valid), hdbscan_min_cluster_size)
),
"next_step": "Type 'optimize' to run cluster optimization.",
})
# =============================================================================
# V2 TOOL 2B — optimize_clusters_hardcoded
# =============================================================================
@tool
def optimize_clusters_hardcoded() -> str:
p = _p2()
embs = np.load(p["embeddings"])
papers = json.loads(p["papers"].read_text())
if not p["umap_emb"].exists():
return json.dumps({
"error": "UMAP embeddings not found. Run cluster_with_umap_hdbscan() first."
})
umap_embs = np.load(p["umap_emb"]) # fixed, random_state=42
umap_2d = np.load(p["umap_2d_emb"]) # fixed, random_state=42
original_clusters = json.loads(p["clusters_original"].read_text())
original_count = len(original_clusters)
MCS = 10
labels, probs, unique, noise_count = _run_hdbscan(umap_embs, MCS, min_samples=3)
valid = _build_clusters(labels, probs, embs, papers)
optimized_count = len(valid)
print("Optimized: {} clusters, {} noise".format(optimized_count, noise_count))
p["clusters"].write_text(json.dumps(valid, indent=2, ensure_ascii=False))
# ── Optimization log ──────────────────────────────────────────────────────
p["optimization_log"].write_text(json.dumps({
"original_clusters": original_count,
"optimized_clusters": optimized_count,
"chosen_min_cluster_size": MCS,
"hdbscan_min_samples": 3,
"hdbscan_metric": "euclidean",
"hdbscan_cluster_selection": "eom",
"umap_random_state": 42,
"noise_papers": noise_count,
"reduction": original_count - optimized_count,
"timestamp": str(pd.Timestamp.now()),
}, indent=2, ensure_ascii=False))
# ── Charts ────────────────────────────────────────────────────────────────
cdf = pd.DataFrame({
"x": umap_2d[:, 0].tolist(),
"y": umap_2d[:, 1].tolist(),
"cluster": [str(lb) for lb in labels.tolist()],
"title": [pp["title"][:50] for pp in papers],
"prob": probs.tolist(),
})
fig_s = px.scatter(cdf, x="x", y="y", color="cluster",
hover_data=["title", "prob"],
title="OPTIMIZED UMAP+HDBSCAN — {} clusters, {} noise".format(
optimized_count, noise_count))
fig_b = px.bar(
x=["C{}".format(c["cluster_id"]) for c in valid],
y=[c["paper_count"] for c in valid],
title="Papers per Cluster (OPTIMIZED: {} clusters, min_cluster_size={})".format(
optimized_count, MCS),
)
p["charts"].write_text(json.dumps({
"scatter": fig_s.to_html(full_html=False, include_plotlyjs="cdn"),
"bar": fig_b.to_html(full_html=False, include_plotlyjs=False),
}))
return json.dumps({
"status": "OPTIMIZATION_COMPLETE",
"optimization_parameters": {
"hdbscan_min_cluster_size": MCS,
"hdbscan_min_samples": 3,
"hdbscan_metric": "euclidean",
"hdbscan_cluster_selection": "eom",
"umap_n_components": 5,
"umap_metric": "cosine",
"umap_random_state": 42,
"note": "UMAP reused from initial run (random_state=42, fully deterministic).",
},
"results": {
"original_clusters": original_count,
"optimized_clusters": optimized_count,
"reduction": original_count - optimized_count,
"noise_papers": noise_count,
"cluster_sizes": [c["paper_count"] for c in valid],
"within_15_30": 15 <= optimized_count <= 30,
"all_clusters_above_5_papers": all(c["paper_count"] >= 5 for c in valid),
},
"determinism_note": (
"Same dataset will always produce the same optimized output. "
"UMAP is fixed (random_state=42). "
"HDBSCAN on the same UMAP array with min_cluster_size={} is deterministic.".format(MCS)
),
"bot_message": (
"Optimization complete.\n"
"Parameters: min_cluster_size={}, min_samples=3, metric=euclidean, "
"cluster_selection=eom\n"
"Original: {} clusters → Optimized: {} clusters\n"
"Reduction: {} clusters removed\n"
"All clusters have >= 5 papers: {}\n"
"Within 15-30 target range: {}\n"
"Ready for labeling.".format(
MCS,
original_count, optimized_count,
original_count - optimized_count,
all(c["paper_count"] >= 5 for c in valid),
15 <= optimized_count <= 30,
)
),
"next_step": "Call label_clusters_council_of_3() to label the {} optimized clusters.".format(
optimized_count),
})
# =============================================================================
# V2 TOOL 3 — label_clusters_council_of_3 (parallel + cached multi-LLM)
# =============================================================================
@tool
def label_clusters_council_of_3(batch_size: int = 5) -> str:
"""Label clusters using a TRUE council of 3 LLMs running IN PARALLEL:
1. Mistral (mistral-small-latest)
2. Gemini (gemini-2.5-flash)
3. Groq (llama-3.3-70b-versatile)
SPEED: All 3 LLMs run concurrently via ThreadPoolExecutor.
COST: SHA-256 disk cache — identical prompts are NEVER sent twice.
LIMITS: Per-model retry with exponential backoff.
API keys auto-read from env: MISTRAL_API_KEY, GOOGLE_API_KEY, GROQ_API_KEY
Cache lives at: data/v2/llm_cache/
Args:
batch_size: Clusters per LLM call (default 5).
"""
import hashlib
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
p = _p2()
clusters = json.loads(p["clusters"].read_text())
CACHE_DIR = p["dir"] / "llm_cache"
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache_lock = threading.Lock()
def _cache_key(model_name: str, prompt: str) -> str:
return hashlib.sha256("{}::{}".format(model_name, prompt).encode()).hexdigest()
def _cache_get(model_name: str, prompt: str):
path = CACHE_DIR / "{}.json".format(_cache_key(model_name, prompt))
with cache_lock:
if path.exists():
return json.loads(path.read_text(encoding="utf-8"))
return None
def _cache_set(model_name: str, prompt: str, result):
path = CACHE_DIR / "{}.json".format(_cache_key(model_name, prompt))
with cache_lock:
path.write_text(json.dumps(result, ensure_ascii=False), encoding="utf-8")
COUNCIL = [
{"name": "MISTRAL", "model": ChatMistralAI(model="mistral-small-latest", temperature=0.2), "stagger": 0},
{"name": "GEMINI", "model": ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.2), "stagger": 1},
{"name": "GROQ", "model": ChatGroq(model="llama-3.3-70b-versatile", temperature=0.2), "stagger": 2},
]
def make_prompt(batch: list) -> str:
mini = [{"cluster_id": c["cluster_id"], "paper_count": c["paper_count"],
"top3_titles": c["top3_titles"], "top3_abstracts": c["top3_abstracts"]}
for c in batch]
return (
"You are an Information Systems research expert conducting a systematic "
"literature review. Label each cluster with a precise 4-7 word noun-phrase "
"that reflects its core IS research theme.\n\n"
"Cluster IDs in this batch: " + str([c["cluster_id"] for c in batch]) + "\n\n"
"CLUSTERS:\n" + json.dumps(mini, indent=2) + "\n\n"
"Return ONLY a raw JSON array — no markdown, no preamble.\n"
"Each element: cluster_id (int), label (4-7 words), "
"confidence (High/Medium/Low), reasoning (one sentence)."
)
def run_one_member(member: dict) -> tuple[str, dict]:
name, llm, stagger = member["name"], member["model"], member["stagger"]
results = {}
if stagger:
time.sleep(stagger)
batch_starts = list(range(0, len(clusters), batch_size))
for bi, start in enumerate(batch_starts):
batch = clusters[start: start + batch_size]
prompt = make_prompt(batch)
cached = _cache_get(name, prompt)
if cached is not None:
print(" [{}] batch {}/{} → CACHE HIT".format(name, bi + 1, len(batch_starts)))
for item in cached:
results[int(item.get("cluster_id", 0))] = item
continue
MAX_RETRIES = 4
for attempt in range(MAX_RETRIES):
try:
print(" [{}] batch {}/{} attempt {}".format(
name, bi + 1, len(batch_starts), attempt + 1))
batch_result = _call_llm_json(llm, prompt)
_cache_set(name, prompt, batch_result)
for item in batch_result:
results[int(item.get("cluster_id", 0))] = item
break
except Exception as e:
wait = (2 ** attempt) * 15
print(" [{}] batch {} attempt {} FAILED: {}".format(
name, bi + 1, attempt + 1, e))
if attempt < MAX_RETRIES - 1:
time.sleep(wait)
else:
for c in batch:
cid = c["cluster_id"]
results[cid] = {
"cluster_id": cid,
"label": "Cluster {} ({} error)".format(cid, name),
"confidence": "Low",
"reasoning": "Fallback — {} failed: {}".format(name, str(e)[:80]),
}
BATCH_DELAYS = {"MISTRAL": 12, "GEMINI": 8, "GROQ": 15}
if bi < len(batch_starts) - 1:
time.sleep(BATCH_DELAYS.get(name, 12))
return name, results
persona_results = {}
print("Dispatching 3 LLMs in parallel...")
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(run_one_member, m): m["name"] for m in COUNCIL}
for future in as_completed(futures):
member_name = futures[future]
try:
name, result_dict = future.result()
persona_results[name] = result_dict
print("[DONE] {} — {} labels".format(name, len(result_dict)))
except Exception as e:
print("[ERROR] {} crashed: {}".format(member_name, e))
persona_results[member_name] = {}
LLM_NAMES = ["MISTRAL", "GEMINI", "GROQ"]
_consolidation_llm = ChatMistralAI(model="mistral-small-latest", temperature=0.1)
def enrich(cluster):
cid = cluster["cluster_id"]
raw_votes = [str(persona_results.get(n, {}).get(cid, {}).get("label", "")).strip()
for n in LLM_NAMES]
final, vote_type = _semantic_vote(raw_votes, _consolidation_llm, cid)
return {
**cluster,
"label": final,
"llm_vote_1_MISTRAL": raw_votes[0],
"llm_vote_2_GEMINI": raw_votes[1],
"llm_vote_3_GROQ": raw_votes[2],
"confidence_1": persona_results.get("MISTRAL", {}).get(cid, {}).get("confidence", ""),
"confidence_2": persona_results.get("GEMINI", {}).get(cid, {}).get("confidence", ""),
"confidence_3": persona_results.get("GROQ", {}).get(cid, {}).get("confidence", ""),
"reasoning_1": persona_results.get("MISTRAL", {}).get(cid, {}).get("reasoning", ""),
"reasoning_2": persona_results.get("GEMINI", {}).get(cid, {}).get("reasoning", ""),
"reasoning_3": persona_results.get("GROQ", {}).get(cid, {}).get("reasoning", ""),
"vote_agreement": vote_type,
}
enriched = list(map(enrich, clusters))
p["summaries"].write_text(json.dumps(enriched, indent=2, ensure_ascii=False))
rows = []
for c in enriched:
cid = c["cluster_id"]
for li, paper in enumerate(c["papers"]):
rows.append({
"cluster_id": cid,
"final_label": c["label"],
"vote_agreement": c["vote_agreement"],
"llm1_MISTRAL_label": c["llm_vote_1_MISTRAL"],
"llm2_GEMINI_label": c["llm_vote_2_GEMINI"],
"llm3_GROQ_label": c["llm_vote_3_GROQ"],
"llm1_confidence": c["confidence_1"],
"llm2_confidence": c["confidence_2"],
"llm3_confidence": c["confidence_3"],
"llm1_reasoning": c["reasoning_1"],
"llm2_reasoning": c["reasoning_2"],
"llm3_reasoning": c["reasoning_3"],
"paper_doi": paper.get("doi", ""),
"paper_title": paper.get("title", ""),
"paper_year": paper.get("year", ""),
"paper_journal": paper.get("journal", ""),
"abstract_preview": paper.get("abstract", "")[:300],
"combined_preview": paper.get("combined", "")[:200],
"centroid_cosine_sim": round(float(
c["centroid_sims"][li] if li < len(c["centroid_sims"]) else 0.0), 4),
"hdbscan_probability": round(float(
c["hdbscan_probs"][li] if li < len(c["hdbscan_probs"]) else 0.0), 4),
"is_top3_centroid": "YES" if li in c["top3_paper_idx"] else "no",
})
pd.DataFrame(rows).to_csv(p["audit_csv"], index=False, encoding="utf-8-sig")
cached_files = len(list(CACHE_DIR.glob("*.json")))
unanimous = sum(1 for c in enriched if c["vote_agreement"] == "unanimous")
majority = sum(1 for c in enriched if c["vote_agreement"] == "semantic_majority")
return json.dumps({
"clusters_labeled": len(enriched),
"unanimous": unanimous,
"majority": majority,
"split": len(enriched) - unanimous - majority,
"audit_csv_rows": len(rows),
"council_members": LLM_NAMES,
"execution": "parallel (ThreadPoolExecutor, 3 workers)",
"cache_files_on_disk": cached_files,
"cache_dir": str(CACHE_DIR),
"note": (
"Parallel 3-LLM ensemble done. "
"Cache has {} entries — re-runs use these for free. "
"Audit CSV ready ({} rows).".format(cached_files, len(rows))
),
})
# =============================================================================
# V2 TOOL 4 — map_clusters_to_pajais_v2
# =============================================================================
@tool
def map_clusters_to_pajais_v2() -> str:
"""Map v2 cluster labels to PAJAIS 25 IS research categories via Mistral LLM.
Saves taxonomy to data/v2/taxonomy.json. Independent of v1 taxonomy.
"""
p = _p2()
summaries = json.loads(p["summaries"].read_text())
llm = ChatMistralAI(model="mistral-small-latest", temperature=0.1)
mini = [{"cluster_id": s["cluster_id"], "name": s["label"],
"sample": s["top3_titles"][:2]} for s in summaries]
BATCH = 10
starts = list(range(0, len(mini), BATCH))
results = []
for bi, start in enumerate(starts):
batch = mini[start: start + BATCH]
prompt = (
"Map each IS research cluster to the single most relevant PAJAIS category.\n\n"
"CLUSTERS:\n" + json.dumps(batch, indent=2) + "\n\n"
"PAJAIS CATEGORIES:\n" + json.dumps(PAJAIS_CATEGORIES, indent=2) + "\n\n"
"Return ONLY a raw JSON array. Each element: "
"cluster_id (int), name (str), pajais_category (str), "
"confidence (High/Medium/Low), rationale (one sentence). No markdown."
)
results.extend(_call_llm_json(llm, prompt))
_ = time.sleep(10) if bi < len(starts) - 1 else None
p["taxonomy"].write_text(json.dumps(results, indent=2, ensure_ascii=False))
return json.dumps({"mapped_clusters": len(results),
"note": "PAJAIS taxonomy saved to data/v2/taxonomy.json"})
# =============================================================================
# V2 TOOL 5 — export_v2_outputs
# =============================================================================
@tool
def export_v2_outputs() -> str:
"""Generate final comparison_v2.csv and narrative_v2.txt for the SPECTER2 run.
comparison_v2.csv: enriched audit CSV with PAJAIS column added.
narrative_v2.txt: 500-word Section 7 academic discussion.
Both saved to data/v2/ and data/comparison_v2.csv.
"""
p = _p2()
summaries = json.loads(p["summaries"].read_text())
taxonomy = json.loads(p["taxonomy"].read_text())
tax_map = {str(item.get("cluster_id", "")): item.get("pajais_category", "Unknown")
for item in taxonomy}
audit_df = pd.read_csv(p["audit_csv"], encoding="utf-8-sig")
audit_df["pajais_category"] = [
tax_map.get(str(int(float(str(row["cluster_id"])))), "Unknown")
for _, row in audit_df.iterrows()
]
out_path = p["comparison"]
audit_df.to_csv(out_path, index=False, encoding="utf-8-sig")
llm = ChatMistralAI(model="mistral-small-latest", temperature=0.4)
cluster_summary = [{"cluster": s["cluster_id"], "label": s["label"],
"papers": s["paper_count"], "agreement": s["vote_agreement"]}
for s in summaries]
prompt = (
"Write Section 7 (Discussion and Thematic Synthesis) for a systematic "
"IS literature review. ~500 words, formal academic prose.\n"
"Method: SPECTER2 document embeddings + UMAP + HDBSCAN + council-of-3-LLMs labeling.\n"
"Cover: (a) overview of clusters/themes, (b) dominant PAJAIS categories, "
"(c) inter-cluster relationships, (d) implications for IS research, "
"(e) methodological contribution vs traditional BERTopic, (f) limitations.\n\n"
"CLUSTERS:\n" + json.dumps(cluster_summary, indent=2) + "\n\n"
"PAJAIS MAPPING:\n" + json.dumps(taxonomy, indent=2) + "\n\n"
"Continuous academic paragraphs only. No bullet points or headers."
)
response = llm.invoke([HumanMessage(content=prompt)])
narrative = response.content
p["narrative"].write_text(narrative, encoding="utf-8")
return json.dumps({
"comparison_csv_rows": len(audit_df),
"comparison_csv_path": str(out_path),
"narrative_words": len(narrative.split()),
"narrative_path": str(p["narrative"]),
"note": "comparison_v2.csv + narrative_v2.txt ready in Download tab.",
})