topic-modelling / tools.py
nethra815's picture
Initial Commit
8bd2709 verified
"""
tools.py — 7 LangChain tool functions for BERTopic thematic analysis pipeline.
Constraints: ZERO if/else, ZERO for/while, ZERO try/except.
"""
from __future__ import annotations
import json
import re
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from pathlib import Path
from langchain_core.tools import tool
from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics.pairwise import cosine_similarity
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_mistralai import ChatMistralAI
from dotenv import load_dotenv
load_dotenv() # add this right after the imports
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
BOILERPLATE_PATTERNS = [
r"©\s*\d{4}",
r"all rights reserved",
r"published by elsevier",
r"doi:\s*10\.\S+",
r"this article is protected",
r"www\.\S+\.com",
r"^\s*abstract\s*$",
r"please cite this article",
r"accepted manuscript",
]
RUN_CONFIGS = {
"abstract": ["Abstract"],
"title": ["Title"],
}
PAJAIS_CATEGORIES = [
"Artificial Intelligence", "Machine Learning", "Deep Learning",
"Natural Language Processing", "Computer Vision", "Robotics",
"Knowledge Representation", "Expert Systems", "Decision Support",
"Data Mining", "Information Retrieval", "Human-Computer Interaction",
"Ethics in AI", "Explainable AI", "Fairness and Bias",
"AI in Healthcare", "AI in Education", "AI in Finance",
"AI in Manufacturing", "AI in Agriculture", "AI Governance",
"Neural Networks", "Reinforcement Learning", "Federated Learning",
"AI Safety",
]
_MISTRAL = ChatMistralAI(model="mistral-large-latest", temperature=0)
# ---------------------------------------------------------------------------
# Helper — pure functions, no loops
# ---------------------------------------------------------------------------
def _clean_text(text: str) -> str:
combined = "|".join(BOILERPLATE_PATTERNS)
return re.sub(combined, "", text, flags=re.IGNORECASE).strip()
def _sentences_from_series(series: pd.Series) -> list[str]:
raw = series.dropna().str.cat(sep=" ")
return list(filter(None, map(str.strip, re.split(r"(?<=[.!?])\s+", raw))))
def _nearest_centroids(embeddings: np.ndarray, labels: np.ndarray, n: int = 5):
unique_labels = np.unique(labels)
centroids = np.array(list(map(
lambda lbl: embeddings[labels == lbl].mean(axis=0),
unique_labels,
)))
sim_matrix = cosine_similarity(centroids)
np.fill_diagonal(sim_matrix, -1)
nearest = list(map(
lambda i: unique_labels[np.argsort(sim_matrix[i])[::-1][:n]].tolist(),
range(len(unique_labels)),
))
return dict(zip(unique_labels.tolist(), nearest))
def _top_sentences(sentences: list[str], embeddings: np.ndarray,
centroid: np.ndarray, k: int = 5) -> list[str]:
sims = cosine_similarity([centroid], embeddings)[0]
top_idx = np.argsort(sims)[::-1][:k]
return list(map(lambda i: sentences[i], top_idx))
# ---------------------------------------------------------------------------
# Tool 1 — load_scopus_csv
# ---------------------------------------------------------------------------
@tool
def load_scopus_csv(csv_path: str, run_config: str = "abstract") -> str:
"""Load a Scopus CSV file, count papers/sentences, apply boilerplate regex
filter, and return a JSON summary. run_config must be 'abstract' or 'title'."""
df = pd.read_csv(csv_path)
columns = RUN_CONFIGS[run_config]
available_cols = list(filter(lambda c: c in df.columns, columns))
texts = df[available_cols].fillna("").apply(
lambda row: " ".join(row.values.astype(str)), axis=1
)
import re
# Step 1: basic cleaning
cleaned = list(map(_clean_text, texts))
# Step 2: 🔥 remove boilerplate noise (ADD HERE)
cleaned = list(map(
lambda x: re.sub(
r"©.*|all rights reserved|copyright.*|palgrave.*",
"",
x,
flags=re.I
),
cleaned
))
sentences = _sentences_from_series(pd.Series(cleaned))
df["_cleaned_text"] = cleaned
df.to_parquet(csv_path.replace(".csv", "_cleaned.parquet"), index=False)
summary = {
"csv_path": csv_path,
"run_config": run_config,
"columns_used": available_cols,
"total_papers": int(len(df)),
"total_sentences": len(sentences),
"sample_titles": df["Title"].head(5).tolist() if "Title" in df.columns else [],
}
Path("summaries.json").write_text(json.dumps(summary, indent=2))
return json.dumps(summary)
# ---------------------------------------------------------------------------
# Tool 2 — run_bertopic_discovery
# ---------------------------------------------------------------------------
@tool
def run_bertopic_discovery(parquet_path: str, run_config: str = "abstract") -> str:
"""Embed sentences with all-MiniLM-L6-v2, cluster with AgglomerativeClustering
(cosine, threshold=0.7), find 5 nearest centroids per cluster, generate 4
Plotly charts. Saves summaries.json + emb.npy. Returns topic summaries JSON."""
df = pd.read_parquet(parquet_path)
columns = RUN_CONFIGS[run_config]
available_cols = list(filter(lambda c: c in df.columns, columns))
texts = df[available_cols].fillna("").apply(
lambda row: " ".join(row.values.astype(str)), axis=1
)
sentences = _sentences_from_series(texts)
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(sentences, normalize_embeddings=True, show_progress_bar=False)
np.save("emb.npy", embeddings)
clustering = AgglomerativeClustering(
metric="cosine",
linkage="average",
distance_threshold=0.7,
n_clusters=None,
)
labels = clustering.fit_predict(embeddings)
unique_labels, counts = np.unique(labels, return_counts=True)
nearest = _nearest_centroids(embeddings, labels)
topic_summaries = list(map(
lambda pair: {
"topic_id": int(pair[0]),
"sentence_count": int(pair[1]),
"nearest_topics": nearest.get(int(pair[0]), []),
"top_sentences": _top_sentences(
sentences, embeddings,
embeddings[labels == pair[0]].mean(axis=0),
),
},
zip(unique_labels, counts),
))
# Sort by sentence count desc
topic_summaries.sort(key=lambda t: t["sentence_count"], reverse=True)
top100 = topic_summaries[:100]
# ---- Chart 1: Bar chart — top 20 topics by sentence count ----
top20 = top100[:20]
fig1 = px.bar(
x=[f"T{t['topic_id']}" for t in top20],
y=[t["sentence_count"] for t in top20],
labels={"x": "Topic", "y": "Sentences"},
title="Top 20 Topics by Sentence Count",
)
# ---- Chart 2: Treemap ----
fig2 = px.treemap(
names=[f"Topic {t['topic_id']}" for t in top100],
parents=["All"] * len(top100),
values=[t["sentence_count"] for t in top100],
title="Topic Distribution Treemap",
)
# ---- Chart 3: Scatter (PCA 2D projection) ----
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
coords = pca.fit_transform(embeddings)
fig3 = go.Figure(go.Scatter(
x=coords[:, 0], y=coords[:, 1],
mode="markers",
marker=dict(color=labels, colorscale="Viridis", size=4, opacity=0.6),
))
fig3.update_layout(title="Sentence Clusters (PCA 2D)")
# ---- Chart 4: Heatmap — top 10 topic cosine similarity ----
top10_ids = [t["topic_id"] for t in top100[:10]]
centroids10 = np.array(list(map(
lambda lbl: embeddings[labels == lbl].mean(axis=0),
top10_ids,
)))
sim10 = cosine_similarity(centroids10)
fig4 = px.imshow(
sim10,
x=[f"T{i}" for i in top10_ids],
y=[f"T{i}" for i in top10_ids],
color_continuous_scale="Blues",
title="Top-10 Topic Cosine Similarity Heatmap",
)
charts = {
"bar_top20": fig1.to_json(),
"treemap": fig2.to_json(),
"scatter_pca": fig3.to_json(),
"heatmap": fig4.to_json(),
}
result = {
"total_clusters": int(len(unique_labels)),
"top100_topics": top100,
"charts_html": charts,
}
existing = json.loads(Path("summaries.json").read_text())
existing.update({"bertopic": {"total_clusters": result["total_clusters"]}})
Path("summaries.json").write_text(json.dumps(existing, indent=2))
Path("charts.json").write_text(json.dumps(charts, indent=2))
Path("topics.json").write_text(json.dumps(top100, indent=2))
return json.dumps({
"total_clusters": result["total_clusters"],
"top100_count": len(top100),
"charts_saved": list(charts.keys()),
})
# ---------------------------------------------------------------------------
# Tool 3 — label_topics_with_llm
# ---------------------------------------------------------------------------
@tool
def label_topics_with_llm(topics_json_path: str = "topics.json") -> str:
"""Send top-100 topics to Mistral via PromptTemplate + JsonOutputParser to
generate human-readable labels. Returns labelled topics JSON."""
topics = json.loads(Path(topics_json_path).read_text())
batch = topics[:100]
prompt = PromptTemplate.from_template(
"You are a qualitative research expert. Below are topic clusters from a "
"systematic literature review. For EACH topic assign a concise label "
"(3-6 words) and one sentence of reasoning.\n\n"
"Topics:\n{topics_text}\n\n"
"Return ONLY valid JSON: a list of objects with keys: "
"topic_id, label, reasoning. No markdown fences."
)
parser = JsonOutputParser()
chain = prompt | _MISTRAL | parser
topics_text = "\n".join(list(map(
lambda t: f"Topic {t['topic_id']} ({t['sentence_count']} sentences): "
+ " | ".join(t["top_sentences"][:2]),
batch,
)))
labelled = chain.invoke({"topics_text": topics_text})
label_map = {item["topic_id"]: item for item in labelled}
enriched = list(map(
lambda t: {**t, **label_map.get(t["topic_id"], {"label": f"Topic {t['topic_id']}", "reasoning": ""})},
batch,
))
Path("labelled_topics.json").write_text(json.dumps(enriched, indent=2))
return json.dumps({"labelled_count": len(enriched), "path": "labelled_topics.json"})
# ---------------------------------------------------------------------------
# Tool 4 — consolidate_into_themes
# ---------------------------------------------------------------------------
@tool
def consolidate_into_themes(approved_groups_json: str) -> str:
"""Merge approved topic groups into themes, recompute centroids from emb.npy.
approved_groups_json: JSON list of {theme_name, topic_ids: [...]} objects."""
groups = json.loads(approved_groups_json)
embeddings = np.load("emb.npy")
topics = json.loads(Path("labelled_topics.json").read_text())
topic_id_to_sentences = {t["topic_id"]: t["top_sentences"] for t in topics}
themes = list(map(
lambda g: {
"theme_name": g["theme_name"],
"topic_ids": g["topic_ids"],
"top_sentences": sum(
list(map(lambda tid: topic_id_to_sentences.get(tid, []), g["topic_ids"])),
[],
)[:10],
"centroid": embeddings[
np.isin(np.arange(len(embeddings)), g["topic_ids"])
].mean(axis=0).tolist(),
},
groups,
))
Path("themes.json").write_text(json.dumps(themes, indent=2))
return json.dumps({"themes_count": len(themes), "theme_names": [t["theme_name"] for t in themes]})
# ---------------------------------------------------------------------------
# Tool 5 — compare_with_taxonomy
# ---------------------------------------------------------------------------
@tool
def compare_with_taxonomy(themes_json_path: str = "themes.json") -> str:
"""Map consolidated themes to PAJAIS 25 categories via Mistral.
Returns a mapping JSON."""
themes = json.loads(Path(themes_json_path).read_text())
prompt = PromptTemplate.from_template(
"You are an AI research taxonomist. Map each theme to the most relevant "
"PAJAIS category.\n\n"
"PAJAIS Categories:\n{categories}\n\n"
"Themes:\n{themes_text}\n\n"
"Return ONLY valid JSON: a list of objects with keys: "
"theme_name, pajais_category, confidence (0-1), rationale. No markdown."
)
parser = JsonOutputParser()
chain = prompt | _MISTRAL | parser
themes_text = "\n".join(list(map(
lambda t: f"- {t['theme_name']}: " + "; ".join(t["top_sentences"][:2]),
themes,
)))
mapping = chain.invoke({
"categories": "\n".join(list(map(lambda c: f" • {c}", PAJAIS_CATEGORIES))),
"themes_text": themes_text,
})
Path("taxonomy_mapping.json").write_text(json.dumps(mapping, indent=2))
return json.dumps({"mapped_count": len(mapping), "path": "taxonomy_mapping.json"})
# ---------------------------------------------------------------------------
# Tool 6 — generate_comparison_csv
# ---------------------------------------------------------------------------
@tool
def generate_comparison_csv(original_csv_path: str) -> str:
"""Generate a side-by-side comparison CSV of abstract vs title clustering
results for each paper. Returns path to output CSV."""
df = pd.read_csv(original_csv_path)
abstract_col = "Abstract" if "Abstract" in df.columns else None
title_col = "Title" if "Title" in df.columns else None
comparison = df[[c for c in [title_col, abstract_col] if c is not None]].copy()
comparison.columns = list(map(
lambda c: c + "_text",
[c for c in [title_col, abstract_col] if c is not None],
))
comparison.insert(0, "Paper_ID", range(1, len(df) + 1))
taxonomy_path = Path("taxonomy_mapping.json")
theme_label = list(map(
lambda _: "See themes.json for full mapping",
range(len(comparison)),
))
comparison["Theme_Assignment"] = theme_label
out_path = "comparison_abstract_vs_title.csv"
comparison.to_csv(out_path, index=False)
return json.dumps({"output_csv": out_path, "rows": len(comparison), "columns": comparison.columns.tolist()})
# ---------------------------------------------------------------------------
# Tool 7 — export_narrative
# ---------------------------------------------------------------------------
@tool
def export_narrative(context_json: str = "{}") -> str:
"""Generate a ~500-word Section 7 narrative via Mistral, synthesising all
prior analysis. context_json may contain extra instructions. Returns the
narrative text and saves it to narrative.md."""
context = json.loads(context_json)
themes = json.loads(Path("themes.json").read_text()) if Path("themes.json").exists() else []
mapping = json.loads(Path("taxonomy_mapping.json").read_text()) if Path("taxonomy_mapping.json").exists() else []
summaries = json.loads(Path("summaries.json").read_text()) if Path("summaries.json").exists() else {}
themes_summary = "\n".join(list(map(
lambda t: f"- **{t['theme_name']}**: " + "; ".join(t["top_sentences"][:1]),
themes,
)))
mapping_summary = "\n".join(list(map(
lambda m: f"- {m.get('theme_name','?')}{m.get('pajais_category','?')} "
f"(confidence: {m.get('confidence', '?')})",
mapping,
)))
prompt = PromptTemplate.from_template(
"You are a senior academic researcher writing a systematic literature review. "
"Write Section 7 (Discussion & Synthesis) of approximately 500 words. "
"Use an academic tone, Braun & Clarke (2006) thematic analysis framing, "
"and reference the themes and PAJAIS taxonomy mappings provided.\n\n"
"Dataset summary:\n{summaries}\n\n"
"Themes identified:\n{themes}\n\n"
"PAJAIS taxonomy mapping:\n{mapping}\n\n"
"Extra context: {extra}\n\n"
"Write the section now. Use markdown headings."
)
chain = prompt | _MISTRAL
result = chain.invoke({
"summaries": json.dumps(summaries, indent=2),
"themes": themes_summary,
"mapping": mapping_summary,
"extra": context.get("extra_instructions", "None"),
})
narrative = result.content
Path("narrative.md").write_text(narrative)
return json.dumps({"narrative_path": "narrative.md", "word_count": len(narrative.split())})