topic_modelling / tools.py
luqman2520's picture
Upload 4 files
ccab3d4 verified
"""
tools.py β€” 7 Stateless LangChain Tools for BERTopic Agentic Thematic Analysis
All tools are decorated with @tool and use handle_tool_error=True.
No if/elif/else, no for/while loops, no try/except blocks.
"""
import json
import os
import re
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from langchain_core.tools import tool
from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering
from sklearn.preprocessing import normalize
from sklearn.metrics.pairwise import cosine_similarity
os.makedirs("outputs", exist_ok=True)
BOILERPLATE_PATTERNS = [
r"Β©\s*\d{4}.*",
r"all rights reserved.*",
r"published by elsevier.*",
r"this paper (proposes|presents|investigates|aims)",
r"in this (paper|study|article|work)",
r"the purpose of this (paper|study)",
]
def _clean_text(text: str) -> str:
"""Remove boilerplate from a single text string."""
text = str(text).lower().strip()
cleaned = re.sub("|".join(BOILERPLATE_PATTERNS), "", text, flags=re.IGNORECASE)
return cleaned.strip()
def _split_sentences(text: str) -> list:
"""Split text into sentences on '. ', '? ', '! '."""
raw = re.split(r"(?<=[.!?])\s+", str(text).strip())
return list(filter(lambda s: len(s.split()) > 4, raw))
@tool
def load_scopus_csv(file_path: str) -> str:
"""
Load a Scopus CSV file and return a summary of its contents.
Args:
file_path: Absolute or relative path to the Scopus CSV file.
Returns:
JSON string with keys: papers, abstract_sentences, title_sentences,
columns, sample_titles, status.
"""
df = pd.read_csv(file_path, encoding="utf-8-sig")
missing_columns = list(filter(lambda col: col not in df.columns, ["Title", "Abstract"]))
if missing_columns:
return json.dumps({
"status": "error",
"message": f"Missing required columns: {', '.join(missing_columns)}",
"columns": list(df.columns),
}, indent=2)
# Keep only rows with non-empty Title and Abstract
df = df[df["Title"].notna() & df["Abstract"].notna()].reset_index(drop=True)
df["Abstract_Clean"] = df["Abstract"].map(_clean_text)
df["Title_Clean"] = df["Title"].map(_clean_text)
abstract_sentences = sum(df["Abstract_Clean"].map(_split_sentences).map(len))
title_sentences = sum(df["Title_Clean"].map(_split_sentences).map(len))
df.to_json("outputs/cleaned_data.json", orient="records", indent=2)
return json.dumps({
"status": "loaded",
"papers": int(len(df)),
"abstract_sentences": int(abstract_sentences),
"title_sentences": int(title_sentences),
"columns": list(df.columns),
"sample_titles": list(df["Title"].head(5)),
}, indent=2)
@tool
def run_bertopic_discovery(run_config: str) -> str:
"""
Embed sentences, cluster with AgglomerativeClustering (cosine, threshold=0.7),
extract top-5 evidence sentences per cluster, generate Plotly charts, and
save summaries.json and embeddings.npy.
Args:
run_config: JSON string with key 'columns' β€” list of column names to use,
e.g. '{"columns": ["Abstract"]}' or '{"columns": ["Title"]}'
or '{"columns": ["Abstract", "Title"]}'.
Returns:
JSON string summarising clusters found.
"""
config = json.loads(run_config)
columns = config.get("columns", ["Abstract"])
tag = "_".join(columns).lower()
cleaned_data_path = "outputs/cleaned_data.json"
if not os.path.exists(cleaned_data_path):
return json.dumps({
"status": "error",
"message": "Cleaned data file not found. Run load_scopus_csv first.",
}, indent=2)
df = pd.read_json(cleaned_data_path)
col_map = {"Abstract": "Abstract_Clean", "Title": "Title_Clean"}
use_cols = list(map(lambda c: col_map.get(c, c), columns))
missing_columns = list(filter(lambda c: c not in df.columns, use_cols))
if missing_columns:
return json.dumps({
"status": "error",
"message": f"Missing cleaned columns: {', '.join(missing_columns)}",
"available_columns": list(df.columns),
}, indent=2)
# Collect (sentence, paper_index) pairs
pairs = []
def _extract(row_tuple):
idx, row = row_tuple
return list(map(lambda s: (s, idx),
_split_sentences(" ".join(str(row[c]) for c in use_cols))))
all_pairs = sum(map(_extract, df.iterrows()), [])
sentences = list(map(lambda p: p[0], all_pairs))
paper_ids = list(map(lambda p: p[1], all_pairs))
if not sentences:
empty_summaries_path = f"outputs/summaries_{tag}.json"
with open(empty_summaries_path, "w", encoding="utf-8") as f:
json.dump([], f, indent=2)
return json.dumps({
"status": "completed",
"tag": tag,
"n_clusters": 0,
"total_sentences": 0,
"summaries_file": empty_summaries_path,
"message": "No sentences available after preprocessing.",
}, indent=2)
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(sentences, show_progress_bar=True, batch_size=64)
# Convert to float32 and L2-normalise in-place to avoid large float64 copies
embeddings = np.asarray(embeddings, dtype=np.float32)
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
embeddings = embeddings / (norms + 1e-12)
embeddings = embeddings.astype(np.float32, copy=False)
np.save(f"outputs/embeddings_{tag}.npy", embeddings)
clusterer = AgglomerativeClustering(
n_clusters=None,
metric="cosine",
linkage="average",
distance_threshold=0.3 # cosine distance = 1 – similarity; 0.3 β‰ˆ similarity 0.7
)
labels = clusterer.fit_predict(embeddings)
n_clusters = int(max(labels) + 1)
def _summarise_cluster(cid):
mask = np.where(np.array(labels) == cid)[0]
vecs = embeddings[mask]
if vecs.size == 0:
top_sents = []
top_pids = []
size = 0
else:
centroid = vecs.mean(axis=0, keepdims=True)
sims = cosine_similarity(centroid, vecs)[0]
top5_idx = mask[np.argsort(sims)[::-1][:5]]
top_sents = list(map(lambda i: sentences[i], top5_idx))
top_pids = list(sorted(set(map(lambda i: int(paper_ids[i]), top5_idx))))
size = int(len(mask))
return {
"cluster_id": cid,
"size": size,
"papers": top_pids,
"top_sentences": top_sents,
"label": f"Cluster_{cid}",
"approved": False,
"rename_to": "",
"reasoning": "",
}
summaries = list(map(_summarise_cluster, range(n_clusters)))
with open(f"outputs/summaries_{tag}.json", "w") as f:
json.dump(summaries, f, indent=2)
sizes = list(map(lambda s: s["size"], summaries))
cids = list(map(lambda s: f"C{s['cluster_id']}", summaries))
fig_dist = px.bar(x=cids, y=sizes, labels={"x": "Cluster", "y": "Sentences"},
title=f"Topic Distribution ({tag})", color=sizes,
color_continuous_scale="Blues")
fig_dist.write_html("outputs/chart_distribution.html")
# Build centroids (one vector per cluster) using float32 to reduce memory
centroids = []
emb_arr = embeddings
labels_arr = np.array(labels)
for s in summaries:
mask = np.where(labels_arr == s["cluster_id"])[0]
if mask.size == 0:
centroids.append(np.zeros((emb_arr.shape[1],), dtype=np.float32))
else:
centroids.append(emb_arr[mask].mean(axis=0).astype(np.float32))
centroids = np.vstack(centroids).astype(np.float32)
# Avoid computing an enormous n_clusters x n_clusters heatmap which can OOM.
HEATMAP_MAX = 300
if centroids.shape[0] > HEATMAP_MAX:
with open("outputs/chart_heatmap.html", "w", encoding="utf-8") as f:
f.write(f"<p style='color:grey'>Heatmap skipped: {centroids.shape[0]} clusters exceeds safe limit ({HEATMAP_MAX}).</p>")
else:
sim_matrix = cosine_similarity(centroids.astype(np.float32))
fig_heat = go.Figure(go.Heatmap(z=sim_matrix, x=cids, y=cids,
colorscale="Viridis"))
fig_heat.update_layout(title=f"Cluster Similarity Heatmap ({tag})")
fig_heat.write_html("outputs/chart_heatmap.html")
return json.dumps({
"status": "completed",
"tag": tag,
"n_clusters": n_clusters,
"total_sentences": len(sentences),
"summaries_file": f"outputs/summaries_{tag}.json",
}, indent=2)
@tool
def label_topics_with_llm(labelling_input: str) -> str:
"""
Use the LLM to generate a human-readable label, category, confidence score,
and reasoning for each cluster based on its top evidence sentences.
Args:
labelling_input: JSON string with keys:
- 'tag': run tag (e.g. 'abstract' or 'title')
- 'llm_labels': list of dicts, each with keys
'cluster_id', 'label', 'category', 'confidence', 'reasoning'
as returned by the LLM's own analysis.
Returns:
JSON string confirming labels saved.
"""
data = json.loads(labelling_input)
tag = data.get("tag", "abstract")
llm_labels = data.get("llm_labels", [])
summaries_path = f"outputs/summaries_{tag}.json"
with open(summaries_path) as f:
summaries = json.load(f)
label_map = {item["cluster_id"]: item for item in llm_labels}
def _apply_label(s):
update = label_map.get(s["cluster_id"], {})
return {**s,
"label": update.get("label", s["label"]),
"category": update.get("category", ""),
"confidence": update.get("confidence", 0.0),
"reasoning": update.get("reasoning", "")}
updated = list(map(_apply_label, summaries))
with open(summaries_path, "w") as f:
json.dump(updated, f, indent=2)
return json.dumps({
"status": "labelled",
"tag": tag,
"topics_labelled": len(updated),
}, indent=2)
# ══════════════════════════════════════════════════════════════════════════════
# TOOL 4 β€” consolidate_into_themes
# ══════════════════════════════════════════════════════════════════════════════
@tool
def consolidate_into_themes(consolidation_input: str) -> str:
"""
Merge approved clusters into final themes based on user review table.
Recomputes merged centroids and saves themes.json.
Args:
consolidation_input: JSON string with keys:
- 'tag': run tag
- 'approvals': list of dicts with keys
'cluster_id', 'approved' (bool), 'rename_to' (str), 'reasoning' (str)
Returns:
JSON string summarising final themes.
"""
data = json.loads(consolidation_input)
tag = data.get("tag", "abstract")
approvals = data.get("approvals", [])
summaries_path = f"outputs/summaries_{tag}.json"
with open(summaries_path) as f:
summaries = json.load(f)
approval_map = {a["cluster_id"]: a for a in approvals}
def _apply_approval(s):
a = approval_map.get(s["cluster_id"], {})
return {**s,
"approved": a.get("approved", False),
"rename_to": a.get("rename_to", ""),
"reasoning": a.get("reasoning", "")}
updated = list(map(_apply_approval, summaries))
approved = list(filter(lambda s: s["approved"], updated))
def _finalise(s):
final_label = s["rename_to"].strip() if s["rename_to"].strip() else s["label"]
return {**s, "final_label": final_label}
themes = list(map(_finalise, approved))
with open(f"outputs/themes_{tag}.json", "w") as f:
json.dump(themes, f, indent=2)
# ── Keyword chart per theme ────────────────────────────────────────────────
from collections import Counter
stop = {"the","a","an","of","in","and","to","is","for","with","that","this","on","are",
"by","as","from","be","was","at","it","or","has","have","been","which","their"}
def _top_words(s):
words = re.findall(r"\b[a-z]{4,}\b",
" ".join(s.get("top_sentences", [])).lower())
filtered = list(filter(lambda w: w not in stop, words))
counted = Counter(filtered).most_common(5)
return list(map(lambda kv: {"theme": s["final_label"],
"word": kv[0], "count": kv[1]}, counted))
kw_rows = sum(map(_top_words, themes), [])
kw_df = pd.DataFrame(kw_rows)
if len(kw_df) > 0:
fig_kw = px.bar(kw_df, x="count", y="word", color="theme",
orientation="h", title="Top Keywords per Theme",
barmode="group")
fig_kw.write_html("outputs/chart_keywords.html")
else:
with open("outputs/chart_keywords.html", "w", encoding="utf-8") as f:
f.write("<p style='color:grey'>No approved themes available yet.</p>")
return json.dumps({
"status": "consolidated",
"tag": tag,
"themes_count": len(themes),
"themes": list(map(lambda t: t["final_label"], themes)),
}, indent=2)
# ══════════════════════════════════════════════════════════════════════════════
# TOOL 5 β€” compare_with_taxonomy
# ══════════════════════════════════════════════════════════════════════════════
@tool
def compare_with_taxonomy(taxonomy_input: str) -> str:
"""
Map final themes to the PAJAIS taxonomy. Identify MAPPED vs NOVEL themes.
Args:
taxonomy_input: JSON string with keys:
- 'tag': run tag
- 'mappings': list of dicts with keys
'final_label', 'pajais_category' (str or ''), 'mapped' (bool)
Returns:
JSON string with mapping results saved to taxonomy_mapping.json.
"""
PAJAIS_TAXONOMY = [
"IS Strategy & Governance", "AI & Machine Learning Applications",
"Digital Transformation", "Human-Computer Interaction",
"Knowledge Management", "Information Security & Privacy",
"Business Intelligence & Analytics", "Enterprise Systems",
"E-Commerce & Digital Markets", "IT Adoption & Acceptance",
"Social Media & Collaboration", "Healthcare IS",
"IS Research Methods", "Emerging Technologies",
]
data = json.loads(taxonomy_input)
tag = data.get("tag", "abstract")
mappings = data.get("mappings", [])
themes_path = f"outputs/themes_{tag}.json"
with open(themes_path) as f:
themes = json.load(f)
mapping_map = {m["final_label"]: m for m in mappings}
def _map_theme(t):
m = mapping_map.get(t["final_label"], {})
status = "MAPPED" if m.get("mapped", False) else "NOVEL"
return {**t,
"pajais_category": m.get("pajais_category", ""),
"mapping_status": status}
mapped_themes = list(map(_map_theme, themes))
with open(f"outputs/taxonomy_mapping_{tag}.json", "w") as f:
json.dump(mapped_themes, f, indent=2)
mapped_count = len(list(filter(lambda t: t["mapping_status"] == "MAPPED", mapped_themes)))
novel_count = len(mapped_themes) - mapped_count
return json.dumps({
"status": "mapped",
"tag": tag,
"total_themes": len(mapped_themes),
"mapped_count": mapped_count,
"novel_count": novel_count,
"pajais_taxonomy": PAJAIS_TAXONOMY,
"output_file": f"outputs/taxonomy_mapping_{tag}.json",
}, indent=2)
# ══════════════════════════════════════════════════════════════════════════════
# TOOL 6 β€” generate_comparison_csv
# ══════════════════════════════════════════════════════════════════════════════
@tool
def generate_comparison_csv(comparison_input: str) -> str:
"""
Compare Abstract-derived themes vs Title-derived themes.
Produce a side-by-side CSV and a Plotly comparison chart.
Args:
comparison_input: JSON string with key 'tags' β€” list of two run tags,
e.g. '{"tags": ["abstract", "title"]}'.
Returns:
JSON string with path to comparison CSV.
"""
data = json.loads(comparison_input)
tags = data.get("tags", ["abstract", "title"])
def _load_themes(tag):
path = f"outputs/themes_{tag}.json"
with open(path) as f:
themes = json.load(f)
return list(map(lambda t: {
"tag": tag,
"final_label": t["final_label"],
"size": t["size"],
"papers": len(t.get("papers", [])),
}, themes))
all_rows = sum(map(_load_themes, tags), [])
df = pd.DataFrame(all_rows)
df.to_csv("outputs/theme_comparison.csv", index=False)
if len(df) > 0:
fig = px.bar(df, x="final_label", y="size", color="tag", barmode="group",
title="Abstract vs Title Theme Comparison",
labels={"final_label": "Theme", "size": "Sentences", "tag": "Source"})
fig.write_html("outputs/chart_comparison.html")
else:
with open("outputs/chart_comparison.html", "w", encoding="utf-8") as f:
f.write("<p style='color:grey'>No theme comparison available yet.</p>")
return json.dumps({
"status": "comparison_generated",
"csv_path": "outputs/theme_comparison.csv",
"chart_path": "outputs/chart_comparison.html",
"total_rows": len(df),
}, indent=2)
# ══════════════════════════════════════════════════════════════════════════════
# TOOL 7 β€” export_narrative
# ══════════════════════════════════════════════════════════════════════════════
@tool
def export_narrative(narrative_input: str) -> str:
"""
Generate a ~500-word Section 7 narrative report summarising all themes,
their PAJAIS mapping, and key insights. Save as narrative_report.txt.
Args:
narrative_input: JSON string with keys:
- 'tag': run tag to base report on
- 'narrative': the 500-word narrative text (written by the LLM)
- 'researcher_name': optional researcher name
Returns:
JSON string confirming report saved.
"""
data = json.loads(narrative_input)
tag = data.get("tag", "abstract")
narrative_text = data.get("narrative", "")
researcher_name = data.get("researcher_name", "Researcher")
# Auto-trim narrative to a maximum word count to avoid oversized reports
try:
max_words = int(data.get("max_words", 500))
except Exception:
max_words = 500
words = narrative_text.split()
trimmed = False
if len(words) > max_words:
narrative_text = " ".join(words[:max_words]).rstrip() + " ..."
trimmed = True
mapping_path = f"outputs/taxonomy_mapping_{tag}.json"
with open(mapping_path) as f:
themes = json.load(f)
theme_lines = list(map(
lambda t: f" β€’ {t['final_label']} [{t.get('mapping_status','?')}]"
f" β€” PAJAIS: {t.get('pajais_category','N/A')}",
themes
))
full_report = "\n".join([
"=" * 60,
"SECTION 7: THEMATIC ANALYSIS NARRATIVE REPORT",
f"Researcher: {researcher_name}",
f"Source: {tag.upper()} columns",
"=" * 60,
"",
narrative_text,
"",
"─" * 60,
"THEME SUMMARY TABLE",
"─" * 60,
"\n".join(theme_lines),
"",
"=" * 60,
])
report_path = "outputs/narrative_report.txt"
with open(report_path, "w", encoding="utf-8") as f:
f.write(full_report)
return json.dumps({
"status": "report_saved",
"report_path": report_path,
"word_count": len(narrative_text.split()),
"trimmed": trimmed,
"themes_in_report": len(themes),
}, indent=2)