"""
app.py — Gradio web application for SPECTER2-based scientific topic modelling.
Pipeline:
CSV Upload → Preprocessing → SPECTER2 Embeddings → UMAP → HDBSCAN →
Top Papers → LLM Label Generation (3 approaches) → AI Council →
TCCM Classification → KeyBERT Keywords → Results
PARALLELIZATION:
Per-cluster processing (labeling + AI Council + TCCM + keywords) is
executed in a ThreadPoolExecutor(max_workers=10), reducing the label
generation phase from ~60 min sequential to ~5-8 min parallel.
"""
import os
import io
import sys
import traceback
import numpy as np
import pandas as pd
import gradio as gr
import plotly.express as px
import plotly.graph_objects as go
from concurrent.futures import ThreadPoolExecutor, as_completed
# Local imports
from utils import (
load_env, build_paper_results, build_cluster_summary,
print_metrics_report, build_metrics_summary, build_council_summary
)
from preprocessing import load_and_preprocess
from embedding import load_or_generate_embeddings
from clustering import auto_cluster, get_top_papers, compute_silhouette, compute_cluster_coherence
from labeling import generate_all_labels
from ai_council import run_council, compute_label_confidence
from tccm_classifier import run_tccm_for_all_clusters, classify_tccm, extract_keywords
load_env()
# ─── PER-CLUSTER WORKER ──────────────────────────────────────────────────────
def _process_cluster(cid, papers, labels, df, np_labels):
"""
Worker function executed in parallel for each cluster.
Runs: generate_all_labels → run_council → compute_label_confidence
→ classify_tccm → extract_keywords
Returns (cid, cluster_result, tccm_result)
"""
try:
# Labels (3 approaches) — each approach calls LLM once
candidates = generate_all_labels(cid, papers)
# AI Council — 3 candidates × 3 agents = 9 LLM calls, all parallel inside
council = run_council(cid, candidates, papers)
label_conf = compute_label_confidence(council)
n_papers = int(np.sum(np_labels == cid))
cluster_result = {
**council,
"label_confidence": label_conf,
"n_papers": n_papers,
}
# TCCM classification
tccm = classify_tccm(cid, papers)
# KeyBERT keywords from clean texts of this cluster
mask = np_labels == cid
clean_texts = df[mask]["combined_text_clean"].tolist()
keywords = extract_keywords(clean_texts)
tccm_result = {**tccm, "keywords": keywords}
return cid, cluster_result, tccm_result
except Exception as e:
tb = traceback.format_exc()
print(f"[Worker] Cluster {cid} FAILED: {e}\n{tb}")
# Return safe fallback values so the pipeline doesn't crash
return cid, {
"final_label": f"Cluster {cid}",
"winning_approach": "error",
"candidates": {},
"justification": f"Error: {e}",
"label_confidence": 0.0,
"n_papers": int(np.sum(np_labels == cid)),
}, {
"theory": "Not specified", "context": "Not specified",
"characteristics": "Not specified", "methodology": "Not specified",
"keywords": [],
}
# ─── PIPELINE ────────────────────────────────────────────────────────────────
def run_full_pipeline(csv_file, progress=gr.Progress(track_tqdm=True)):
"""Main pipeline function called by Gradio."""
try:
# ── Step 1: Preprocessing
progress(0.05, desc="🔍 Preprocessing CSV...")
df, preprocess_stats = load_and_preprocess(csv_file.name)
# ── Step 2: Embeddings
progress(0.15, desc="🧬 Generating SPECTER2 embeddings (may take a few minutes)...")
embeddings = load_or_generate_embeddings(df, batch_size=64)
# ── Step 3+4: UMAP + HDBSCAN (with strict 15 clusters and noise absorption)
progress(0.38, desc="📐 Running UMAP + HDBSCAN (targeting exactly 15 clusters)...")
reduced_nd, reduced_2d, labels, probs = auto_cluster(embeddings)
# ── Step 5: Top Papers
progress(0.52, desc="📄 Selecting top papers per cluster...")
top_papers = get_top_papers(df, reduced_nd, labels, probs)
# ── Metrics
progress(0.56, desc="📊 Computing research metrics...")
silhouette = compute_silhouette(reduced_nd, labels)
coherence = compute_cluster_coherence(embeddings, labels)
# ── Step 6+7+8: Labeling + AI Council + TCCM — ALL IN PARALLEL
cluster_ids = sorted(top_papers.keys())
n_total = len(cluster_ids)
progress(0.58, desc=f"🤖 Labeling & classifying {n_total} clusters in parallel...")
cluster_results: dict = {}
tccm_results: dict = {}
completed = 0
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(
_process_cluster,
cid, top_papers[cid], labels, df, labels
): cid
for cid in cluster_ids
}
for future in as_completed(futures):
cid_done = futures[future]
try:
cid, cluster_result, tccm_result = future.result()
cluster_results[cid] = cluster_result
tccm_results[cid] = tccm_result
except Exception as e:
print(f"[Pipeline] Unexpected error for cluster {cid_done}: {e}")
completed += 1
pct = 0.58 + 0.37 * (completed / max(n_total, 1))
progress(pct, desc=f"✅ Cluster {completed}/{n_total} done...")
# ── Step 9: Build outputs
progress(0.97, desc="📋 Compiling results...")
paper_df = build_paper_results(df, labels, cluster_results)
cluster_df = build_cluster_summary(
cluster_results, top_papers, coherence, silhouette, tccm_results
)
metrics_df = build_metrics_summary(silhouette, coherence, cluster_results, labels)
council_df = build_council_summary(cluster_results)
print_metrics_report(silhouette, coherence, cluster_results, labels)
# ── Scatter plot
fig = _make_scatter(df, reduced_2d, labels, cluster_results)
# ── Dataset Overview
overview_md = _build_overview_md(preprocess_stats)
# ── Metrics string (keep for UI but add DF for download)
avg_coherence = float(np.mean(list(coherence.values()))) if coherence else 0
avg_confidence = float(np.mean([
r.get("label_confidence", 0) for r in cluster_results.values()
])) if cluster_results else 0
n_noise = int(np.sum(labels == -1))
noise_pct = 100 * n_noise / max(len(labels), 1)
metrics_md = (
f"### 📊 Research Metrics\n"
f"| Metric | Value |\n|---|---|\n"
f"| Total Clusters | **{len(cluster_results)}** |\n"
f"| Total Papers | **{len(df)}** |\n"
f"| Noise Points | **{n_noise} ({noise_pct:.1f}%)** |\n"
f"| Silhouette Score | **{silhouette:.4f}** |\n"
f"| Avg Cluster Coherence | **{avg_coherence:.4f}** |\n"
f"| Avg Label Confidence | **{avg_confidence:.4f}** |\n"
)
# ── Council comparison table
council_md = _build_council_md(cluster_results)
# ── Save CSV files to disk
paper_df.to_csv("paper_results.csv", index=False)
cluster_df.to_csv("cluster_summary.csv", index=False)
metrics_df.to_csv("metrics_summary.csv", index=False)
council_df.to_csv("council_scores.csv", index=False)
# Cluster options for filtering
cids = sorted([int(c) for c in cluster_results.keys()])
cluster_choices = ["All Clusters"] + [f"Cluster {c}" for c in cids]
progress(1.0, desc="✅ Done! (Results saved to project folder)")
return (
cluster_df,
paper_df,
fig,
metrics_md,
overview_md,
council_md,
gr.update(choices=cluster_choices, value="All Clusters"),
gr.update(value="✅ **Pipeline complete.** Results saved as CSV files in the project folder.", visible=True),
gr.update(value="cluster_summary.csv", interactive=True),
gr.update(value="paper_results.csv", interactive=True),
gr.update(value="metrics_summary.csv", interactive=True),
gr.update(value="council_scores.csv", interactive=True),
)
except Exception as e:
tb = traceback.format_exc()
print(f"[Pipeline Error] {tb}")
raise gr.Error(f"Pipeline failed: {str(e)}\n\nDetails:\n{tb}")
# ─── HELPER BUILDERS ─────────────────────────────────────────────────────────
def _build_overview_md(stats: dict) -> str:
"""Build a markdown table summarising dataset preprocessing statistics."""
total = stats.get("total", 0)
missing_abs = stats.get("missing_abstracts", 0)
dupes = stats.get("duplicates_removed", 0)
final = stats.get("final_count", 0)
cleaned = total - final - dupes
return (
f"### 📂 Dataset Overview\n"
f"| Stage | Count |\n|---|---|\n"
f"| Papers in CSV | **{total}** |\n"
f"| Missing abstracts | **{missing_abs}** |\n"
f"| Duplicates removed | **{dupes}** |\n"
f"| Short / invalid texts removed | **{max(0, cleaned)}** |\n"
f"| **Papers used for analysis** | **{final}** |\n"
)
def _build_council_md(cluster_results: dict) -> str:
"""Build a markdown comparison table of AI Council scores per cluster."""
if not cluster_results:
return ""
rows = []
for cid, result in sorted(cluster_results.items()):
candidates = result.get("candidates", {})
winner = result.get("winning_approach", "")
for approach, eval_data in candidates.items():
sc = eval_data.get("scores", {})
is_winner = "✅" if approach == winner else ""
rows.append({
"Cluster": cid,
"Approach": approach,
"Label (truncated)": eval_data.get("label", "")[:45],
"Semantic": f"{sc.get('semantic', 0):.2f}",
"Keyword": f"{sc.get('keyword', 0):.2f}",
"Clarity": f"{sc.get('clarity', 0):.2f}",
"Final": f"{sc.get('final', 0):.3f}",
"Winner": is_winner,
})
if not rows:
return ""
lines = ["### 🏛️ AI Council Score Comparison\n"]
lines.append("| Cluster | Approach | Label | Semantic | Keyword | Clarity | Final | Winner |")
lines.append("|---|---|---|---|---|---|---|---|")
for r in rows:
lines.append(
f"| {r['Cluster']} | {r['Approach']} | {r['Label (truncated)']} "
f"| {r['Semantic']} | {r['Keyword']} | {r['Clarity']} | {r['Final']} | {r['Winner']} |"
)
return "\n".join(lines)
def _make_scatter(df, reduced_2d, labels, cluster_results):
"""Create a Plotly 2D scatter plot with cluster colors."""
n = len(df)
cluster_labels_list = []
for i in range(n):
cid = int(labels[i])
if cid == -1:
cluster_labels_list.append("Noise")
elif cid in cluster_results:
cluster_labels_list.append(f"[{cid}] {cluster_results[cid]['final_label'][:40]}")
else:
cluster_labels_list.append(f"Cluster {cid}")
plot_df = pd.DataFrame({
"x": reduced_2d[:, 0],
"y": reduced_2d[:, 1],
"cluster": cluster_labels_list,
"title": df["Title"].str[:80],
})
noise_mask = plot_df["cluster"] == "Noise"
fig = go.Figure()
non_noise = plot_df[~noise_mask]
cluster_names = sorted(non_noise["cluster"].unique())
colors = px.colors.qualitative.Alphabet + px.colors.qualitative.Dark24
for i, cname in enumerate(cluster_names):
cdata = non_noise[non_noise["cluster"] == cname]
fig.add_trace(go.Scatter(
x=cdata["x"], y=cdata["y"],
mode="markers",
name=cname,
text=cdata["title"],
hovertemplate="%{text}