| """
|
| tools.py — 7 LangChain tool functions for BERTopic thematic analysis pipeline.
|
| Constraints: ZERO if/else, ZERO for/while, ZERO try/except.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import json
|
| import re
|
| import numpy as np
|
| import pandas as pd
|
| import plotly.express as px
|
| import plotly.graph_objects as go
|
|
|
| from pathlib import Path
|
| from langchain_core.tools import tool
|
| from sentence_transformers import SentenceTransformer
|
| from sklearn.cluster import AgglomerativeClustering
|
| from sklearn.metrics.pairwise import cosine_similarity
|
| from langchain_core.prompts import PromptTemplate
|
| from langchain_core.output_parsers import JsonOutputParser
|
| from langchain_mistralai import ChatMistralAI
|
| from dotenv import load_dotenv
|
| load_dotenv()
|
|
|
|
|
|
|
|
|
|
|
| BOILERPLATE_PATTERNS = [
|
| r"©\s*\d{4}",
|
| r"all rights reserved",
|
| r"published by elsevier",
|
| r"doi:\s*10\.\S+",
|
| r"this article is protected",
|
| r"www\.\S+\.com",
|
| r"^\s*abstract\s*$",
|
| r"please cite this article",
|
| r"accepted manuscript",
|
| ]
|
|
|
| RUN_CONFIGS = {
|
| "abstract": ["Abstract"],
|
| "title": ["Title"],
|
| }
|
|
|
| PAJAIS_CATEGORIES = [
|
| "Artificial Intelligence", "Machine Learning", "Deep Learning",
|
| "Natural Language Processing", "Computer Vision", "Robotics",
|
| "Knowledge Representation", "Expert Systems", "Decision Support",
|
| "Data Mining", "Information Retrieval", "Human-Computer Interaction",
|
| "Ethics in AI", "Explainable AI", "Fairness and Bias",
|
| "AI in Healthcare", "AI in Education", "AI in Finance",
|
| "AI in Manufacturing", "AI in Agriculture", "AI Governance",
|
| "Neural Networks", "Reinforcement Learning", "Federated Learning",
|
| "AI Safety",
|
| ]
|
|
|
| _MISTRAL = ChatMistralAI(model="mistral-large-latest", temperature=0)
|
|
|
|
|
|
|
|
|
|
|
| def _clean_text(text: str) -> str:
|
| combined = "|".join(BOILERPLATE_PATTERNS)
|
| return re.sub(combined, "", text, flags=re.IGNORECASE).strip()
|
|
|
|
|
| def _sentences_from_series(series: pd.Series) -> list[str]:
|
| raw = series.dropna().str.cat(sep=" ")
|
| return list(filter(None, map(str.strip, re.split(r"(?<=[.!?])\s+", raw))))
|
|
|
|
|
| def _nearest_centroids(embeddings: np.ndarray, labels: np.ndarray, n: int = 5):
|
| unique_labels = np.unique(labels)
|
| centroids = np.array(list(map(
|
| lambda lbl: embeddings[labels == lbl].mean(axis=0),
|
| unique_labels,
|
| )))
|
| sim_matrix = cosine_similarity(centroids)
|
| np.fill_diagonal(sim_matrix, -1)
|
| nearest = list(map(
|
| lambda i: unique_labels[np.argsort(sim_matrix[i])[::-1][:n]].tolist(),
|
| range(len(unique_labels)),
|
| ))
|
| return dict(zip(unique_labels.tolist(), nearest))
|
|
|
|
|
| def _top_sentences(sentences: list[str], embeddings: np.ndarray,
|
| centroid: np.ndarray, k: int = 5) -> list[str]:
|
| sims = cosine_similarity([centroid], embeddings)[0]
|
| top_idx = np.argsort(sims)[::-1][:k]
|
| return list(map(lambda i: sentences[i], top_idx))
|
|
|
|
|
|
|
|
|
|
|
|
|
| @tool
|
| def load_scopus_csv(csv_path: str, run_config: str = "abstract") -> str:
|
| """Load a Scopus CSV file, count papers/sentences, apply boilerplate regex
|
| filter, and return a JSON summary. run_config must be 'abstract' or 'title'."""
|
| df = pd.read_csv(csv_path)
|
| columns = RUN_CONFIGS[run_config]
|
| available_cols = list(filter(lambda c: c in df.columns, columns))
|
| texts = df[available_cols].fillna("").apply(
|
| lambda row: " ".join(row.values.astype(str)), axis=1
|
| )
|
| import re
|
|
|
|
|
| cleaned = list(map(_clean_text, texts))
|
|
|
|
|
| cleaned = list(map(
|
| lambda x: re.sub(
|
| r"©.*|all rights reserved|copyright.*|palgrave.*",
|
| "",
|
| x,
|
| flags=re.I
|
| ),
|
| cleaned
|
| ))
|
| sentences = _sentences_from_series(pd.Series(cleaned))
|
| df["_cleaned_text"] = cleaned
|
| df.to_parquet(csv_path.replace(".csv", "_cleaned.parquet"), index=False)
|
| summary = {
|
| "csv_path": csv_path,
|
| "run_config": run_config,
|
| "columns_used": available_cols,
|
| "total_papers": int(len(df)),
|
| "total_sentences": len(sentences),
|
| "sample_titles": df["Title"].head(5).tolist() if "Title" in df.columns else [],
|
| }
|
| Path("summaries.json").write_text(json.dumps(summary, indent=2))
|
| return json.dumps(summary)
|
|
|
|
|
|
|
|
|
|
|
|
|
| @tool
|
| def run_bertopic_discovery(parquet_path: str, run_config: str = "abstract") -> str:
|
| """Embed sentences with all-MiniLM-L6-v2, cluster with AgglomerativeClustering
|
| (cosine, threshold=0.7), find 5 nearest centroids per cluster, generate 4
|
| Plotly charts. Saves summaries.json + emb.npy. Returns topic summaries JSON."""
|
| df = pd.read_parquet(parquet_path)
|
| columns = RUN_CONFIGS[run_config]
|
| available_cols = list(filter(lambda c: c in df.columns, columns))
|
| texts = df[available_cols].fillna("").apply(
|
| lambda row: " ".join(row.values.astype(str)), axis=1
|
| )
|
| sentences = _sentences_from_series(texts)
|
|
|
| model = SentenceTransformer("all-MiniLM-L6-v2")
|
| embeddings = model.encode(sentences, normalize_embeddings=True, show_progress_bar=False)
|
| np.save("emb.npy", embeddings)
|
|
|
| clustering = AgglomerativeClustering(
|
| metric="cosine",
|
| linkage="average",
|
| distance_threshold=0.7,
|
| n_clusters=None,
|
| )
|
| labels = clustering.fit_predict(embeddings)
|
|
|
| unique_labels, counts = np.unique(labels, return_counts=True)
|
| nearest = _nearest_centroids(embeddings, labels)
|
|
|
| topic_summaries = list(map(
|
| lambda pair: {
|
| "topic_id": int(pair[0]),
|
| "sentence_count": int(pair[1]),
|
| "nearest_topics": nearest.get(int(pair[0]), []),
|
| "top_sentences": _top_sentences(
|
| sentences, embeddings,
|
| embeddings[labels == pair[0]].mean(axis=0),
|
| ),
|
| },
|
| zip(unique_labels, counts),
|
| ))
|
|
|
|
|
| topic_summaries.sort(key=lambda t: t["sentence_count"], reverse=True)
|
| top100 = topic_summaries[:100]
|
|
|
|
|
| top20 = top100[:20]
|
| fig1 = px.bar(
|
| x=[f"T{t['topic_id']}" for t in top20],
|
| y=[t["sentence_count"] for t in top20],
|
| labels={"x": "Topic", "y": "Sentences"},
|
| title="Top 20 Topics by Sentence Count",
|
| )
|
|
|
|
|
| fig2 = px.treemap(
|
| names=[f"Topic {t['topic_id']}" for t in top100],
|
| parents=["All"] * len(top100),
|
| values=[t["sentence_count"] for t in top100],
|
| title="Topic Distribution Treemap",
|
| )
|
|
|
|
|
| from sklearn.decomposition import PCA
|
| pca = PCA(n_components=2)
|
| coords = pca.fit_transform(embeddings)
|
| fig3 = go.Figure(go.Scatter(
|
| x=coords[:, 0], y=coords[:, 1],
|
| mode="markers",
|
| marker=dict(color=labels, colorscale="Viridis", size=4, opacity=0.6),
|
| ))
|
| fig3.update_layout(title="Sentence Clusters (PCA 2D)")
|
|
|
|
|
| top10_ids = [t["topic_id"] for t in top100[:10]]
|
| centroids10 = np.array(list(map(
|
| lambda lbl: embeddings[labels == lbl].mean(axis=0),
|
| top10_ids,
|
| )))
|
| sim10 = cosine_similarity(centroids10)
|
| fig4 = px.imshow(
|
| sim10,
|
| x=[f"T{i}" for i in top10_ids],
|
| y=[f"T{i}" for i in top10_ids],
|
| color_continuous_scale="Blues",
|
| title="Top-10 Topic Cosine Similarity Heatmap",
|
| )
|
|
|
| charts = {
|
| "bar_top20": fig1.to_json(),
|
| "treemap": fig2.to_json(),
|
| "scatter_pca": fig3.to_json(),
|
| "heatmap": fig4.to_json(),
|
| }
|
|
|
| result = {
|
| "total_clusters": int(len(unique_labels)),
|
| "top100_topics": top100,
|
| "charts_html": charts,
|
| }
|
|
|
| existing = json.loads(Path("summaries.json").read_text())
|
| existing.update({"bertopic": {"total_clusters": result["total_clusters"]}})
|
| Path("summaries.json").write_text(json.dumps(existing, indent=2))
|
| Path("charts.json").write_text(json.dumps(charts, indent=2))
|
| Path("topics.json").write_text(json.dumps(top100, indent=2))
|
|
|
| return json.dumps({
|
| "total_clusters": result["total_clusters"],
|
| "top100_count": len(top100),
|
| "charts_saved": list(charts.keys()),
|
| })
|
|
|
|
|
|
|
|
|
|
|
|
|
| @tool
|
| def label_topics_with_llm(topics_json_path: str = "topics.json") -> str:
|
| """Send top-100 topics to Mistral via PromptTemplate + JsonOutputParser to
|
| generate human-readable labels. Returns labelled topics JSON."""
|
| topics = json.loads(Path(topics_json_path).read_text())
|
| batch = topics[:100]
|
|
|
| prompt = PromptTemplate.from_template(
|
| "You are a qualitative research expert. Below are topic clusters from a "
|
| "systematic literature review. For EACH topic assign a concise label "
|
| "(3-6 words) and one sentence of reasoning.\n\n"
|
| "Topics:\n{topics_text}\n\n"
|
| "Return ONLY valid JSON: a list of objects with keys: "
|
| "topic_id, label, reasoning. No markdown fences."
|
| )
|
| parser = JsonOutputParser()
|
| chain = prompt | _MISTRAL | parser
|
|
|
| topics_text = "\n".join(list(map(
|
| lambda t: f"Topic {t['topic_id']} ({t['sentence_count']} sentences): "
|
| + " | ".join(t["top_sentences"][:2]),
|
| batch,
|
| )))
|
|
|
| labelled = chain.invoke({"topics_text": topics_text})
|
| label_map = {item["topic_id"]: item for item in labelled}
|
|
|
| enriched = list(map(
|
| lambda t: {**t, **label_map.get(t["topic_id"], {"label": f"Topic {t['topic_id']}", "reasoning": ""})},
|
| batch,
|
| ))
|
|
|
| Path("labelled_topics.json").write_text(json.dumps(enriched, indent=2))
|
| return json.dumps({"labelled_count": len(enriched), "path": "labelled_topics.json"})
|
|
|
|
|
|
|
|
|
|
|
|
|
| @tool
|
| def consolidate_into_themes(approved_groups_json: str) -> str:
|
| """Merge approved topic groups into themes, recompute centroids from emb.npy.
|
| approved_groups_json: JSON list of {theme_name, topic_ids: [...]} objects."""
|
| groups = json.loads(approved_groups_json)
|
| embeddings = np.load("emb.npy")
|
| topics = json.loads(Path("labelled_topics.json").read_text())
|
| topic_id_to_sentences = {t["topic_id"]: t["top_sentences"] for t in topics}
|
|
|
| themes = list(map(
|
| lambda g: {
|
| "theme_name": g["theme_name"],
|
| "topic_ids": g["topic_ids"],
|
| "top_sentences": sum(
|
| list(map(lambda tid: topic_id_to_sentences.get(tid, []), g["topic_ids"])),
|
| [],
|
| )[:10],
|
| "centroid": embeddings[
|
| np.isin(np.arange(len(embeddings)), g["topic_ids"])
|
| ].mean(axis=0).tolist(),
|
| },
|
| groups,
|
| ))
|
|
|
| Path("themes.json").write_text(json.dumps(themes, indent=2))
|
| return json.dumps({"themes_count": len(themes), "theme_names": [t["theme_name"] for t in themes]})
|
|
|
|
|
|
|
|
|
|
|
|
|
| @tool
|
| def compare_with_taxonomy(themes_json_path: str = "themes.json") -> str:
|
| """Map consolidated themes to PAJAIS 25 categories via Mistral.
|
| Returns a mapping JSON."""
|
| themes = json.loads(Path(themes_json_path).read_text())
|
|
|
| prompt = PromptTemplate.from_template(
|
| "You are an AI research taxonomist. Map each theme to the most relevant "
|
| "PAJAIS category.\n\n"
|
| "PAJAIS Categories:\n{categories}\n\n"
|
| "Themes:\n{themes_text}\n\n"
|
| "Return ONLY valid JSON: a list of objects with keys: "
|
| "theme_name, pajais_category, confidence (0-1), rationale. No markdown."
|
| )
|
| parser = JsonOutputParser()
|
| chain = prompt | _MISTRAL | parser
|
|
|
| themes_text = "\n".join(list(map(
|
| lambda t: f"- {t['theme_name']}: " + "; ".join(t["top_sentences"][:2]),
|
| themes,
|
| )))
|
|
|
| mapping = chain.invoke({
|
| "categories": "\n".join(list(map(lambda c: f" • {c}", PAJAIS_CATEGORIES))),
|
| "themes_text": themes_text,
|
| })
|
|
|
| Path("taxonomy_mapping.json").write_text(json.dumps(mapping, indent=2))
|
| return json.dumps({"mapped_count": len(mapping), "path": "taxonomy_mapping.json"})
|
|
|
|
|
|
|
|
|
|
|
|
|
| @tool
|
| def generate_comparison_csv(original_csv_path: str) -> str:
|
| """Generate a side-by-side comparison CSV of abstract vs title clustering
|
| results for each paper. Returns path to output CSV."""
|
| df = pd.read_csv(original_csv_path)
|
| abstract_col = "Abstract" if "Abstract" in df.columns else None
|
| title_col = "Title" if "Title" in df.columns else None
|
|
|
| comparison = df[[c for c in [title_col, abstract_col] if c is not None]].copy()
|
| comparison.columns = list(map(
|
| lambda c: c + "_text",
|
| [c for c in [title_col, abstract_col] if c is not None],
|
| ))
|
| comparison.insert(0, "Paper_ID", range(1, len(df) + 1))
|
|
|
| taxonomy_path = Path("taxonomy_mapping.json")
|
| theme_label = list(map(
|
| lambda _: "See themes.json for full mapping",
|
| range(len(comparison)),
|
| ))
|
| comparison["Theme_Assignment"] = theme_label
|
|
|
| out_path = "comparison_abstract_vs_title.csv"
|
| comparison.to_csv(out_path, index=False)
|
| return json.dumps({"output_csv": out_path, "rows": len(comparison), "columns": comparison.columns.tolist()})
|
|
|
|
|
|
|
|
|
|
|
|
|
| @tool
|
| def export_narrative(context_json: str = "{}") -> str:
|
| """Generate a ~500-word Section 7 narrative via Mistral, synthesising all
|
| prior analysis. context_json may contain extra instructions. Returns the
|
| narrative text and saves it to narrative.md."""
|
| context = json.loads(context_json)
|
| themes = json.loads(Path("themes.json").read_text()) if Path("themes.json").exists() else []
|
| mapping = json.loads(Path("taxonomy_mapping.json").read_text()) if Path("taxonomy_mapping.json").exists() else []
|
| summaries = json.loads(Path("summaries.json").read_text()) if Path("summaries.json").exists() else {}
|
|
|
| themes_summary = "\n".join(list(map(
|
| lambda t: f"- **{t['theme_name']}**: " + "; ".join(t["top_sentences"][:1]),
|
| themes,
|
| )))
|
| mapping_summary = "\n".join(list(map(
|
| lambda m: f"- {m.get('theme_name','?')} → {m.get('pajais_category','?')} "
|
| f"(confidence: {m.get('confidence', '?')})",
|
| mapping,
|
| )))
|
|
|
| prompt = PromptTemplate.from_template(
|
| "You are a senior academic researcher writing a systematic literature review. "
|
| "Write Section 7 (Discussion & Synthesis) of approximately 500 words. "
|
| "Use an academic tone, Braun & Clarke (2006) thematic analysis framing, "
|
| "and reference the themes and PAJAIS taxonomy mappings provided.\n\n"
|
| "Dataset summary:\n{summaries}\n\n"
|
| "Themes identified:\n{themes}\n\n"
|
| "PAJAIS taxonomy mapping:\n{mapping}\n\n"
|
| "Extra context: {extra}\n\n"
|
| "Write the section now. Use markdown headings."
|
| )
|
| chain = prompt | _MISTRAL
|
|
|
| result = chain.invoke({
|
| "summaries": json.dumps(summaries, indent=2),
|
| "themes": themes_summary,
|
| "mapping": mapping_summary,
|
| "extra": context.get("extra_instructions", "None"),
|
| })
|
|
|
| narrative = result.content
|
| Path("narrative.md").write_text(narrative)
|
| return json.dumps({"narrative_path": "narrative.md", "word_count": len(narrative.split())}) |