""" tools.py — 7 LangChain tool functions for BERTopic thematic analysis pipeline. Constraints: ZERO if/else, ZERO for/while, ZERO try/except. """ from __future__ import annotations import json import re import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go from pathlib import Path from langchain_core.tools import tool from sentence_transformers import SentenceTransformer from sklearn.cluster import AgglomerativeClustering from sklearn.metrics.pairwise import cosine_similarity from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import JsonOutputParser from langchain_mistralai import ChatMistralAI from dotenv import load_dotenv load_dotenv() # add this right after the imports # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- BOILERPLATE_PATTERNS = [ r"©\s*\d{4}", r"all rights reserved", r"published by elsevier", r"doi:\s*10\.\S+", r"this article is protected", r"www\.\S+\.com", r"^\s*abstract\s*$", r"please cite this article", r"accepted manuscript", ] RUN_CONFIGS = { "abstract": ["Abstract"], "title": ["Title"], } PAJAIS_CATEGORIES = [ "Artificial Intelligence", "Machine Learning", "Deep Learning", "Natural Language Processing", "Computer Vision", "Robotics", "Knowledge Representation", "Expert Systems", "Decision Support", "Data Mining", "Information Retrieval", "Human-Computer Interaction", "Ethics in AI", "Explainable AI", "Fairness and Bias", "AI in Healthcare", "AI in Education", "AI in Finance", "AI in Manufacturing", "AI in Agriculture", "AI Governance", "Neural Networks", "Reinforcement Learning", "Federated Learning", "AI Safety", ] _MISTRAL = ChatMistralAI(model="mistral-large-latest", temperature=0) # --------------------------------------------------------------------------- # Helper — pure functions, no loops # --------------------------------------------------------------------------- def _clean_text(text: str) -> str: combined = "|".join(BOILERPLATE_PATTERNS) return re.sub(combined, "", text, flags=re.IGNORECASE).strip() def _sentences_from_series(series: pd.Series) -> list[str]: raw = series.dropna().str.cat(sep=" ") return list(filter(None, map(str.strip, re.split(r"(?<=[.!?])\s+", raw)))) def _nearest_centroids(embeddings: np.ndarray, labels: np.ndarray, n: int = 5): unique_labels = np.unique(labels) centroids = np.array(list(map( lambda lbl: embeddings[labels == lbl].mean(axis=0), unique_labels, ))) sim_matrix = cosine_similarity(centroids) np.fill_diagonal(sim_matrix, -1) nearest = list(map( lambda i: unique_labels[np.argsort(sim_matrix[i])[::-1][:n]].tolist(), range(len(unique_labels)), )) return dict(zip(unique_labels.tolist(), nearest)) def _top_sentences(sentences: list[str], embeddings: np.ndarray, centroid: np.ndarray, k: int = 5) -> list[str]: sims = cosine_similarity([centroid], embeddings)[0] top_idx = np.argsort(sims)[::-1][:k] return list(map(lambda i: sentences[i], top_idx)) # --------------------------------------------------------------------------- # Tool 1 — load_scopus_csv # --------------------------------------------------------------------------- @tool def load_scopus_csv(csv_path: str, run_config: str = "abstract") -> str: """Load a Scopus CSV file, count papers/sentences, apply boilerplate regex filter, and return a JSON summary. run_config must be 'abstract' or 'title'.""" df = pd.read_csv(csv_path) columns = RUN_CONFIGS[run_config] available_cols = list(filter(lambda c: c in df.columns, columns)) texts = df[available_cols].fillna("").apply( lambda row: " ".join(row.values.astype(str)), axis=1 ) import re # Step 1: basic cleaning cleaned = list(map(_clean_text, texts)) # Step 2: 🔥 remove boilerplate noise (ADD HERE) cleaned = list(map( lambda x: re.sub( r"©.*|all rights reserved|copyright.*|palgrave.*", "", x, flags=re.I ), cleaned )) sentences = _sentences_from_series(pd.Series(cleaned)) df["_cleaned_text"] = cleaned df.to_parquet(csv_path.replace(".csv", "_cleaned.parquet"), index=False) summary = { "csv_path": csv_path, "run_config": run_config, "columns_used": available_cols, "total_papers": int(len(df)), "total_sentences": len(sentences), "sample_titles": df["Title"].head(5).tolist() if "Title" in df.columns else [], } Path("summaries.json").write_text(json.dumps(summary, indent=2)) return json.dumps(summary) # --------------------------------------------------------------------------- # Tool 2 — run_bertopic_discovery # --------------------------------------------------------------------------- @tool def run_bertopic_discovery(parquet_path: str, run_config: str = "abstract") -> str: """Embed sentences with all-MiniLM-L6-v2, cluster with AgglomerativeClustering (cosine, threshold=0.7), find 5 nearest centroids per cluster, generate 4 Plotly charts. Saves summaries.json + emb.npy. Returns topic summaries JSON.""" df = pd.read_parquet(parquet_path) columns = RUN_CONFIGS[run_config] available_cols = list(filter(lambda c: c in df.columns, columns)) texts = df[available_cols].fillna("").apply( lambda row: " ".join(row.values.astype(str)), axis=1 ) sentences = _sentences_from_series(texts) model = SentenceTransformer("all-MiniLM-L6-v2") embeddings = model.encode(sentences, normalize_embeddings=True, show_progress_bar=False) np.save("emb.npy", embeddings) clustering = AgglomerativeClustering( metric="cosine", linkage="average", distance_threshold=0.7, n_clusters=None, ) labels = clustering.fit_predict(embeddings) unique_labels, counts = np.unique(labels, return_counts=True) nearest = _nearest_centroids(embeddings, labels) topic_summaries = list(map( lambda pair: { "topic_id": int(pair[0]), "sentence_count": int(pair[1]), "nearest_topics": nearest.get(int(pair[0]), []), "top_sentences": _top_sentences( sentences, embeddings, embeddings[labels == pair[0]].mean(axis=0), ), }, zip(unique_labels, counts), )) # Sort by sentence count desc topic_summaries.sort(key=lambda t: t["sentence_count"], reverse=True) top100 = topic_summaries[:100] # ---- Chart 1: Bar chart — top 20 topics by sentence count ---- top20 = top100[:20] fig1 = px.bar( x=[f"T{t['topic_id']}" for t in top20], y=[t["sentence_count"] for t in top20], labels={"x": "Topic", "y": "Sentences"}, title="Top 20 Topics by Sentence Count", ) # ---- Chart 2: Treemap ---- fig2 = px.treemap( names=[f"Topic {t['topic_id']}" for t in top100], parents=["All"] * len(top100), values=[t["sentence_count"] for t in top100], title="Topic Distribution Treemap", ) # ---- Chart 3: Scatter (PCA 2D projection) ---- from sklearn.decomposition import PCA pca = PCA(n_components=2) coords = pca.fit_transform(embeddings) fig3 = go.Figure(go.Scatter( x=coords[:, 0], y=coords[:, 1], mode="markers", marker=dict(color=labels, colorscale="Viridis", size=4, opacity=0.6), )) fig3.update_layout(title="Sentence Clusters (PCA 2D)") # ---- Chart 4: Heatmap — top 10 topic cosine similarity ---- top10_ids = [t["topic_id"] for t in top100[:10]] centroids10 = np.array(list(map( lambda lbl: embeddings[labels == lbl].mean(axis=0), top10_ids, ))) sim10 = cosine_similarity(centroids10) fig4 = px.imshow( sim10, x=[f"T{i}" for i in top10_ids], y=[f"T{i}" for i in top10_ids], color_continuous_scale="Blues", title="Top-10 Topic Cosine Similarity Heatmap", ) charts = { "bar_top20": fig1.to_json(), "treemap": fig2.to_json(), "scatter_pca": fig3.to_json(), "heatmap": fig4.to_json(), } result = { "total_clusters": int(len(unique_labels)), "top100_topics": top100, "charts_html": charts, } existing = json.loads(Path("summaries.json").read_text()) existing.update({"bertopic": {"total_clusters": result["total_clusters"]}}) Path("summaries.json").write_text(json.dumps(existing, indent=2)) Path("charts.json").write_text(json.dumps(charts, indent=2)) Path("topics.json").write_text(json.dumps(top100, indent=2)) return json.dumps({ "total_clusters": result["total_clusters"], "top100_count": len(top100), "charts_saved": list(charts.keys()), }) # --------------------------------------------------------------------------- # Tool 3 — label_topics_with_llm # --------------------------------------------------------------------------- @tool def label_topics_with_llm(topics_json_path: str = "topics.json") -> str: """Send top-100 topics to Mistral via PromptTemplate + JsonOutputParser to generate human-readable labels. Returns labelled topics JSON.""" topics = json.loads(Path(topics_json_path).read_text()) batch = topics[:100] prompt = PromptTemplate.from_template( "You are a qualitative research expert. Below are topic clusters from a " "systematic literature review. For EACH topic assign a concise label " "(3-6 words) and one sentence of reasoning.\n\n" "Topics:\n{topics_text}\n\n" "Return ONLY valid JSON: a list of objects with keys: " "topic_id, label, reasoning. No markdown fences." ) parser = JsonOutputParser() chain = prompt | _MISTRAL | parser topics_text = "\n".join(list(map( lambda t: f"Topic {t['topic_id']} ({t['sentence_count']} sentences): " + " | ".join(t["top_sentences"][:2]), batch, ))) labelled = chain.invoke({"topics_text": topics_text}) label_map = {item["topic_id"]: item for item in labelled} enriched = list(map( lambda t: {**t, **label_map.get(t["topic_id"], {"label": f"Topic {t['topic_id']}", "reasoning": ""})}, batch, )) Path("labelled_topics.json").write_text(json.dumps(enriched, indent=2)) return json.dumps({"labelled_count": len(enriched), "path": "labelled_topics.json"}) # --------------------------------------------------------------------------- # Tool 4 — consolidate_into_themes # --------------------------------------------------------------------------- @tool def consolidate_into_themes(approved_groups_json: str) -> str: """Merge approved topic groups into themes, recompute centroids from emb.npy. approved_groups_json: JSON list of {theme_name, topic_ids: [...]} objects.""" groups = json.loads(approved_groups_json) embeddings = np.load("emb.npy") topics = json.loads(Path("labelled_topics.json").read_text()) topic_id_to_sentences = {t["topic_id"]: t["top_sentences"] for t in topics} themes = list(map( lambda g: { "theme_name": g["theme_name"], "topic_ids": g["topic_ids"], "top_sentences": sum( list(map(lambda tid: topic_id_to_sentences.get(tid, []), g["topic_ids"])), [], )[:10], "centroid": embeddings[ np.isin(np.arange(len(embeddings)), g["topic_ids"]) ].mean(axis=0).tolist(), }, groups, )) Path("themes.json").write_text(json.dumps(themes, indent=2)) return json.dumps({"themes_count": len(themes), "theme_names": [t["theme_name"] for t in themes]}) # --------------------------------------------------------------------------- # Tool 5 — compare_with_taxonomy # --------------------------------------------------------------------------- @tool def compare_with_taxonomy(themes_json_path: str = "themes.json") -> str: """Map consolidated themes to PAJAIS 25 categories via Mistral. Returns a mapping JSON.""" themes = json.loads(Path(themes_json_path).read_text()) prompt = PromptTemplate.from_template( "You are an AI research taxonomist. Map each theme to the most relevant " "PAJAIS category.\n\n" "PAJAIS Categories:\n{categories}\n\n" "Themes:\n{themes_text}\n\n" "Return ONLY valid JSON: a list of objects with keys: " "theme_name, pajais_category, confidence (0-1), rationale. No markdown." ) parser = JsonOutputParser() chain = prompt | _MISTRAL | parser themes_text = "\n".join(list(map( lambda t: f"- {t['theme_name']}: " + "; ".join(t["top_sentences"][:2]), themes, ))) mapping = chain.invoke({ "categories": "\n".join(list(map(lambda c: f" • {c}", PAJAIS_CATEGORIES))), "themes_text": themes_text, }) Path("taxonomy_mapping.json").write_text(json.dumps(mapping, indent=2)) return json.dumps({"mapped_count": len(mapping), "path": "taxonomy_mapping.json"}) # --------------------------------------------------------------------------- # Tool 6 — generate_comparison_csv # --------------------------------------------------------------------------- @tool def generate_comparison_csv(original_csv_path: str) -> str: """Generate a side-by-side comparison CSV of abstract vs title clustering results for each paper. Returns path to output CSV.""" df = pd.read_csv(original_csv_path) abstract_col = "Abstract" if "Abstract" in df.columns else None title_col = "Title" if "Title" in df.columns else None comparison = df[[c for c in [title_col, abstract_col] if c is not None]].copy() comparison.columns = list(map( lambda c: c + "_text", [c for c in [title_col, abstract_col] if c is not None], )) comparison.insert(0, "Paper_ID", range(1, len(df) + 1)) taxonomy_path = Path("taxonomy_mapping.json") theme_label = list(map( lambda _: "See themes.json for full mapping", range(len(comparison)), )) comparison["Theme_Assignment"] = theme_label out_path = "comparison_abstract_vs_title.csv" comparison.to_csv(out_path, index=False) return json.dumps({"output_csv": out_path, "rows": len(comparison), "columns": comparison.columns.tolist()}) # --------------------------------------------------------------------------- # Tool 7 — export_narrative # --------------------------------------------------------------------------- @tool def export_narrative(context_json: str = "{}") -> str: """Generate a ~500-word Section 7 narrative via Mistral, synthesising all prior analysis. context_json may contain extra instructions. Returns the narrative text and saves it to narrative.md.""" context = json.loads(context_json) themes = json.loads(Path("themes.json").read_text()) if Path("themes.json").exists() else [] mapping = json.loads(Path("taxonomy_mapping.json").read_text()) if Path("taxonomy_mapping.json").exists() else [] summaries = json.loads(Path("summaries.json").read_text()) if Path("summaries.json").exists() else {} themes_summary = "\n".join(list(map( lambda t: f"- **{t['theme_name']}**: " + "; ".join(t["top_sentences"][:1]), themes, ))) mapping_summary = "\n".join(list(map( lambda m: f"- {m.get('theme_name','?')} → {m.get('pajais_category','?')} " f"(confidence: {m.get('confidence', '?')})", mapping, ))) prompt = PromptTemplate.from_template( "You are a senior academic researcher writing a systematic literature review. " "Write Section 7 (Discussion & Synthesis) of approximately 500 words. " "Use an academic tone, Braun & Clarke (2006) thematic analysis framing, " "and reference the themes and PAJAIS taxonomy mappings provided.\n\n" "Dataset summary:\n{summaries}\n\n" "Themes identified:\n{themes}\n\n" "PAJAIS taxonomy mapping:\n{mapping}\n\n" "Extra context: {extra}\n\n" "Write the section now. Use markdown headings." ) chain = prompt | _MISTRAL result = chain.invoke({ "summaries": json.dumps(summaries, indent=2), "themes": themes_summary, "mapping": mapping_summary, "extra": context.get("extra_instructions", "None"), }) narrative = result.content Path("narrative.md").write_text(narrative) return json.dumps({"narrative_path": "narrative.md", "word_count": len(narrative.split())})