Spaces:
Configuration error
Configuration error
| """ | |
| tools.py — 7 LangChain tool functions for BERTopic thematic analysis pipeline. | |
| Constraints: ZERO if/else, ZERO for/while, ZERO try/except. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from pathlib import Path | |
| from langchain_core.tools import tool | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.cluster import AgglomerativeClustering | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_core.output_parsers import JsonOutputParser | |
| from langchain_mistralai import ChatMistralAI | |
| from dotenv import load_dotenv | |
| load_dotenv() # add this right after the imports | |
| # --------------------------------------------------------------------------- | |
| # Constants | |
| # --------------------------------------------------------------------------- | |
| BOILERPLATE_PATTERNS = [ | |
| r"©\s*\d{4}", | |
| r"all rights reserved", | |
| r"published by elsevier", | |
| r"doi:\s*10\.\S+", | |
| r"this article is protected", | |
| r"www\.\S+\.com", | |
| r"^\s*abstract\s*$", | |
| r"please cite this article", | |
| r"accepted manuscript", | |
| ] | |
| RUN_CONFIGS = { | |
| "abstract": ["Abstract"], | |
| "title": ["Title"], | |
| } | |
| PAJAIS_CATEGORIES = [ | |
| "Artificial Intelligence", "Machine Learning", "Deep Learning", | |
| "Natural Language Processing", "Computer Vision", "Robotics", | |
| "Knowledge Representation", "Expert Systems", "Decision Support", | |
| "Data Mining", "Information Retrieval", "Human-Computer Interaction", | |
| "Ethics in AI", "Explainable AI", "Fairness and Bias", | |
| "AI in Healthcare", "AI in Education", "AI in Finance", | |
| "AI in Manufacturing", "AI in Agriculture", "AI Governance", | |
| "Neural Networks", "Reinforcement Learning", "Federated Learning", | |
| "AI Safety", | |
| ] | |
| _MISTRAL = ChatMistralAI(model="mistral-large-latest", temperature=0) | |
| # --------------------------------------------------------------------------- | |
| # Helper — pure functions, no loops | |
| # --------------------------------------------------------------------------- | |
| def _clean_text(text: str) -> str: | |
| combined = "|".join(BOILERPLATE_PATTERNS) | |
| return re.sub(combined, "", text, flags=re.IGNORECASE).strip() | |
| def _sentences_from_series(series: pd.Series) -> list[str]: | |
| raw = series.dropna().str.cat(sep=" ") | |
| return list(filter(None, map(str.strip, re.split(r"(?<=[.!?])\s+", raw)))) | |
| def _nearest_centroids(embeddings: np.ndarray, labels: np.ndarray, n: int = 5): | |
| unique_labels = np.unique(labels) | |
| centroids = np.array(list(map( | |
| lambda lbl: embeddings[labels == lbl].mean(axis=0), | |
| unique_labels, | |
| ))) | |
| sim_matrix = cosine_similarity(centroids) | |
| np.fill_diagonal(sim_matrix, -1) | |
| nearest = list(map( | |
| lambda i: unique_labels[np.argsort(sim_matrix[i])[::-1][:n]].tolist(), | |
| range(len(unique_labels)), | |
| )) | |
| return dict(zip(unique_labels.tolist(), nearest)) | |
| def _top_sentences(sentences: list[str], embeddings: np.ndarray, | |
| centroid: np.ndarray, k: int = 5) -> list[str]: | |
| sims = cosine_similarity([centroid], embeddings)[0] | |
| top_idx = np.argsort(sims)[::-1][:k] | |
| return list(map(lambda i: sentences[i], top_idx)) | |
| # --------------------------------------------------------------------------- | |
| # Tool 1 — load_scopus_csv | |
| # --------------------------------------------------------------------------- | |
| def load_scopus_csv(csv_path: str, run_config: str = "abstract") -> str: | |
| """Load a Scopus CSV file, count papers/sentences, apply boilerplate regex | |
| filter, and return a JSON summary. run_config must be 'abstract' or 'title'.""" | |
| df = pd.read_csv(csv_path) | |
| columns = RUN_CONFIGS[run_config] | |
| available_cols = list(filter(lambda c: c in df.columns, columns)) | |
| texts = df[available_cols].fillna("").apply( | |
| lambda row: " ".join(row.values.astype(str)), axis=1 | |
| ) | |
| import re | |
| # Step 1: basic cleaning | |
| cleaned = list(map(_clean_text, texts)) | |
| # Step 2: 🔥 remove boilerplate noise (ADD HERE) | |
| cleaned = list(map( | |
| lambda x: re.sub( | |
| r"©.*|all rights reserved|copyright.*|palgrave.*", | |
| "", | |
| x, | |
| flags=re.I | |
| ), | |
| cleaned | |
| )) | |
| sentences = _sentences_from_series(pd.Series(cleaned)) | |
| df["_cleaned_text"] = cleaned | |
| df.to_parquet(csv_path.replace(".csv", "_cleaned.parquet"), index=False) | |
| summary = { | |
| "csv_path": csv_path, | |
| "run_config": run_config, | |
| "columns_used": available_cols, | |
| "total_papers": int(len(df)), | |
| "total_sentences": len(sentences), | |
| "sample_titles": df["Title"].head(5).tolist() if "Title" in df.columns else [], | |
| } | |
| Path("summaries.json").write_text(json.dumps(summary, indent=2)) | |
| return json.dumps(summary) | |
| # --------------------------------------------------------------------------- | |
| # Tool 2 — run_bertopic_discovery | |
| # --------------------------------------------------------------------------- | |
| def run_bertopic_discovery(parquet_path: str, run_config: str = "abstract") -> str: | |
| """Embed sentences with all-MiniLM-L6-v2, cluster with AgglomerativeClustering | |
| (cosine, threshold=0.7), find 5 nearest centroids per cluster, generate 4 | |
| Plotly charts. Saves summaries.json + emb.npy. Returns topic summaries JSON.""" | |
| df = pd.read_parquet(parquet_path) | |
| columns = RUN_CONFIGS[run_config] | |
| available_cols = list(filter(lambda c: c in df.columns, columns)) | |
| texts = df[available_cols].fillna("").apply( | |
| lambda row: " ".join(row.values.astype(str)), axis=1 | |
| ) | |
| sentences = _sentences_from_series(texts) | |
| model = SentenceTransformer("all-MiniLM-L6-v2") | |
| embeddings = model.encode(sentences, normalize_embeddings=True, show_progress_bar=False) | |
| np.save("emb.npy", embeddings) | |
| clustering = AgglomerativeClustering( | |
| metric="cosine", | |
| linkage="average", | |
| distance_threshold=0.7, | |
| n_clusters=None, | |
| ) | |
| labels = clustering.fit_predict(embeddings) | |
| unique_labels, counts = np.unique(labels, return_counts=True) | |
| nearest = _nearest_centroids(embeddings, labels) | |
| topic_summaries = list(map( | |
| lambda pair: { | |
| "topic_id": int(pair[0]), | |
| "sentence_count": int(pair[1]), | |
| "nearest_topics": nearest.get(int(pair[0]), []), | |
| "top_sentences": _top_sentences( | |
| sentences, embeddings, | |
| embeddings[labels == pair[0]].mean(axis=0), | |
| ), | |
| }, | |
| zip(unique_labels, counts), | |
| )) | |
| # Sort by sentence count desc | |
| topic_summaries.sort(key=lambda t: t["sentence_count"], reverse=True) | |
| top100 = topic_summaries[:100] | |
| # ---- Chart 1: Bar chart — top 20 topics by sentence count ---- | |
| top20 = top100[:20] | |
| fig1 = px.bar( | |
| x=[f"T{t['topic_id']}" for t in top20], | |
| y=[t["sentence_count"] for t in top20], | |
| labels={"x": "Topic", "y": "Sentences"}, | |
| title="Top 20 Topics by Sentence Count", | |
| ) | |
| # ---- Chart 2: Treemap ---- | |
| fig2 = px.treemap( | |
| names=[f"Topic {t['topic_id']}" for t in top100], | |
| parents=["All"] * len(top100), | |
| values=[t["sentence_count"] for t in top100], | |
| title="Topic Distribution Treemap", | |
| ) | |
| # ---- Chart 3: Scatter (PCA 2D projection) ---- | |
| from sklearn.decomposition import PCA | |
| pca = PCA(n_components=2) | |
| coords = pca.fit_transform(embeddings) | |
| fig3 = go.Figure(go.Scatter( | |
| x=coords[:, 0], y=coords[:, 1], | |
| mode="markers", | |
| marker=dict(color=labels, colorscale="Viridis", size=4, opacity=0.6), | |
| )) | |
| fig3.update_layout(title="Sentence Clusters (PCA 2D)") | |
| # ---- Chart 4: Heatmap — top 10 topic cosine similarity ---- | |
| top10_ids = [t["topic_id"] for t in top100[:10]] | |
| centroids10 = np.array(list(map( | |
| lambda lbl: embeddings[labels == lbl].mean(axis=0), | |
| top10_ids, | |
| ))) | |
| sim10 = cosine_similarity(centroids10) | |
| fig4 = px.imshow( | |
| sim10, | |
| x=[f"T{i}" for i in top10_ids], | |
| y=[f"T{i}" for i in top10_ids], | |
| color_continuous_scale="Blues", | |
| title="Top-10 Topic Cosine Similarity Heatmap", | |
| ) | |
| charts = { | |
| "bar_top20": fig1.to_json(), | |
| "treemap": fig2.to_json(), | |
| "scatter_pca": fig3.to_json(), | |
| "heatmap": fig4.to_json(), | |
| } | |
| result = { | |
| "total_clusters": int(len(unique_labels)), | |
| "top100_topics": top100, | |
| "charts_html": charts, | |
| } | |
| existing = json.loads(Path("summaries.json").read_text()) | |
| existing.update({"bertopic": {"total_clusters": result["total_clusters"]}}) | |
| Path("summaries.json").write_text(json.dumps(existing, indent=2)) | |
| Path("charts.json").write_text(json.dumps(charts, indent=2)) | |
| Path("topics.json").write_text(json.dumps(top100, indent=2)) | |
| return json.dumps({ | |
| "total_clusters": result["total_clusters"], | |
| "top100_count": len(top100), | |
| "charts_saved": list(charts.keys()), | |
| }) | |
| # --------------------------------------------------------------------------- | |
| # Tool 3 — label_topics_with_llm | |
| # --------------------------------------------------------------------------- | |
| def label_topics_with_llm(topics_json_path: str = "topics.json") -> str: | |
| """Send top-100 topics to Mistral via PromptTemplate + JsonOutputParser to | |
| generate human-readable labels. Returns labelled topics JSON.""" | |
| topics = json.loads(Path(topics_json_path).read_text()) | |
| batch = topics[:100] | |
| prompt = PromptTemplate.from_template( | |
| "You are a qualitative research expert. Below are topic clusters from a " | |
| "systematic literature review. For EACH topic assign a concise label " | |
| "(3-6 words) and one sentence of reasoning.\n\n" | |
| "Topics:\n{topics_text}\n\n" | |
| "Return ONLY valid JSON: a list of objects with keys: " | |
| "topic_id, label, reasoning. No markdown fences." | |
| ) | |
| parser = JsonOutputParser() | |
| chain = prompt | _MISTRAL | parser | |
| topics_text = "\n".join(list(map( | |
| lambda t: f"Topic {t['topic_id']} ({t['sentence_count']} sentences): " | |
| + " | ".join(t["top_sentences"][:2]), | |
| batch, | |
| ))) | |
| labelled = chain.invoke({"topics_text": topics_text}) | |
| label_map = {item["topic_id"]: item for item in labelled} | |
| enriched = list(map( | |
| lambda t: {**t, **label_map.get(t["topic_id"], {"label": f"Topic {t['topic_id']}", "reasoning": ""})}, | |
| batch, | |
| )) | |
| Path("labelled_topics.json").write_text(json.dumps(enriched, indent=2)) | |
| return json.dumps({"labelled_count": len(enriched), "path": "labelled_topics.json"}) | |
| # --------------------------------------------------------------------------- | |
| # Tool 4 — consolidate_into_themes | |
| # --------------------------------------------------------------------------- | |
| def consolidate_into_themes(approved_groups_json: str) -> str: | |
| """Merge approved topic groups into themes, recompute centroids from emb.npy. | |
| approved_groups_json: JSON list of {theme_name, topic_ids: [...]} objects.""" | |
| groups = json.loads(approved_groups_json) | |
| embeddings = np.load("emb.npy") | |
| topics = json.loads(Path("labelled_topics.json").read_text()) | |
| topic_id_to_sentences = {t["topic_id"]: t["top_sentences"] for t in topics} | |
| themes = list(map( | |
| lambda g: { | |
| "theme_name": g["theme_name"], | |
| "topic_ids": g["topic_ids"], | |
| "top_sentences": sum( | |
| list(map(lambda tid: topic_id_to_sentences.get(tid, []), g["topic_ids"])), | |
| [], | |
| )[:10], | |
| "centroid": embeddings[ | |
| np.isin(np.arange(len(embeddings)), g["topic_ids"]) | |
| ].mean(axis=0).tolist(), | |
| }, | |
| groups, | |
| )) | |
| Path("themes.json").write_text(json.dumps(themes, indent=2)) | |
| return json.dumps({"themes_count": len(themes), "theme_names": [t["theme_name"] for t in themes]}) | |
| # --------------------------------------------------------------------------- | |
| # Tool 5 — compare_with_taxonomy | |
| # --------------------------------------------------------------------------- | |
| def compare_with_taxonomy(themes_json_path: str = "themes.json") -> str: | |
| """Map consolidated themes to PAJAIS 25 categories via Mistral. | |
| Returns a mapping JSON.""" | |
| themes = json.loads(Path(themes_json_path).read_text()) | |
| prompt = PromptTemplate.from_template( | |
| "You are an AI research taxonomist. Map each theme to the most relevant " | |
| "PAJAIS category.\n\n" | |
| "PAJAIS Categories:\n{categories}\n\n" | |
| "Themes:\n{themes_text}\n\n" | |
| "Return ONLY valid JSON: a list of objects with keys: " | |
| "theme_name, pajais_category, confidence (0-1), rationale. No markdown." | |
| ) | |
| parser = JsonOutputParser() | |
| chain = prompt | _MISTRAL | parser | |
| themes_text = "\n".join(list(map( | |
| lambda t: f"- {t['theme_name']}: " + "; ".join(t["top_sentences"][:2]), | |
| themes, | |
| ))) | |
| mapping = chain.invoke({ | |
| "categories": "\n".join(list(map(lambda c: f" • {c}", PAJAIS_CATEGORIES))), | |
| "themes_text": themes_text, | |
| }) | |
| Path("taxonomy_mapping.json").write_text(json.dumps(mapping, indent=2)) | |
| return json.dumps({"mapped_count": len(mapping), "path": "taxonomy_mapping.json"}) | |
| # --------------------------------------------------------------------------- | |
| # Tool 6 — generate_comparison_csv | |
| # --------------------------------------------------------------------------- | |
| def generate_comparison_csv(original_csv_path: str) -> str: | |
| """Generate a side-by-side comparison CSV of abstract vs title clustering | |
| results for each paper. Returns path to output CSV.""" | |
| df = pd.read_csv(original_csv_path) | |
| abstract_col = "Abstract" if "Abstract" in df.columns else None | |
| title_col = "Title" if "Title" in df.columns else None | |
| comparison = df[[c for c in [title_col, abstract_col] if c is not None]].copy() | |
| comparison.columns = list(map( | |
| lambda c: c + "_text", | |
| [c for c in [title_col, abstract_col] if c is not None], | |
| )) | |
| comparison.insert(0, "Paper_ID", range(1, len(df) + 1)) | |
| taxonomy_path = Path("taxonomy_mapping.json") | |
| theme_label = list(map( | |
| lambda _: "See themes.json for full mapping", | |
| range(len(comparison)), | |
| )) | |
| comparison["Theme_Assignment"] = theme_label | |
| out_path = "comparison_abstract_vs_title.csv" | |
| comparison.to_csv(out_path, index=False) | |
| return json.dumps({"output_csv": out_path, "rows": len(comparison), "columns": comparison.columns.tolist()}) | |
| # --------------------------------------------------------------------------- | |
| # Tool 7 — export_narrative | |
| # --------------------------------------------------------------------------- | |
| def export_narrative(context_json: str = "{}") -> str: | |
| """Generate a ~500-word Section 7 narrative via Mistral, synthesising all | |
| prior analysis. context_json may contain extra instructions. Returns the | |
| narrative text and saves it to narrative.md.""" | |
| context = json.loads(context_json) | |
| themes = json.loads(Path("themes.json").read_text()) if Path("themes.json").exists() else [] | |
| mapping = json.loads(Path("taxonomy_mapping.json").read_text()) if Path("taxonomy_mapping.json").exists() else [] | |
| summaries = json.loads(Path("summaries.json").read_text()) if Path("summaries.json").exists() else {} | |
| themes_summary = "\n".join(list(map( | |
| lambda t: f"- **{t['theme_name']}**: " + "; ".join(t["top_sentences"][:1]), | |
| themes, | |
| ))) | |
| mapping_summary = "\n".join(list(map( | |
| lambda m: f"- {m.get('theme_name','?')} → {m.get('pajais_category','?')} " | |
| f"(confidence: {m.get('confidence', '?')})", | |
| mapping, | |
| ))) | |
| prompt = PromptTemplate.from_template( | |
| "You are a senior academic researcher writing a systematic literature review. " | |
| "Write Section 7 (Discussion & Synthesis) of approximately 500 words. " | |
| "Use an academic tone, Braun & Clarke (2006) thematic analysis framing, " | |
| "and reference the themes and PAJAIS taxonomy mappings provided.\n\n" | |
| "Dataset summary:\n{summaries}\n\n" | |
| "Themes identified:\n{themes}\n\n" | |
| "PAJAIS taxonomy mapping:\n{mapping}\n\n" | |
| "Extra context: {extra}\n\n" | |
| "Write the section now. Use markdown headings." | |
| ) | |
| chain = prompt | _MISTRAL | |
| result = chain.invoke({ | |
| "summaries": json.dumps(summaries, indent=2), | |
| "themes": themes_summary, | |
| "mapping": mapping_summary, | |
| "extra": context.get("extra_instructions", "None"), | |
| }) | |
| narrative = result.content | |
| Path("narrative.md").write_text(narrative) | |
| return json.dumps({"narrative_path": "narrative.md", "word_count": len(narrative.split())}) |