""" tools.py — 7 Stateless LangChain Tools for BERTopic Agentic Thematic Analysis All tools are decorated with @tool and use handle_tool_error=True. No if/elif/else, no for/while loops, no try/except blocks. """ import json import os import re import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go from langchain_core.tools import tool from sentence_transformers import SentenceTransformer from sklearn.cluster import AgglomerativeClustering from sklearn.preprocessing import normalize from sklearn.metrics.pairwise import cosine_similarity os.makedirs("outputs", exist_ok=True) BOILERPLATE_PATTERNS = [ r"©\s*\d{4}.*", r"all rights reserved.*", r"published by elsevier.*", r"this paper (proposes|presents|investigates|aims)", r"in this (paper|study|article|work)", r"the purpose of this (paper|study)", ] def _clean_text(text: str) -> str: """Remove boilerplate from a single text string.""" text = str(text).lower().strip() cleaned = re.sub("|".join(BOILERPLATE_PATTERNS), "", text, flags=re.IGNORECASE) return cleaned.strip() def _split_sentences(text: str) -> list: """Split text into sentences on '. ', '? ', '! '.""" raw = re.split(r"(?<=[.!?])\s+", str(text).strip()) return list(filter(lambda s: len(s.split()) > 4, raw)) @tool def load_scopus_csv(file_path: str) -> str: """ Load a Scopus CSV file and return a summary of its contents. Args: file_path: Absolute or relative path to the Scopus CSV file. Returns: JSON string with keys: papers, abstract_sentences, title_sentences, columns, sample_titles, status. """ df = pd.read_csv(file_path, encoding="utf-8-sig") missing_columns = list(filter(lambda col: col not in df.columns, ["Title", "Abstract"])) if missing_columns: return json.dumps({ "status": "error", "message": f"Missing required columns: {', '.join(missing_columns)}", "columns": list(df.columns), }, indent=2) # Keep only rows with non-empty Title and Abstract df = df[df["Title"].notna() & df["Abstract"].notna()].reset_index(drop=True) df["Abstract_Clean"] = df["Abstract"].map(_clean_text) df["Title_Clean"] = df["Title"].map(_clean_text) abstract_sentences = sum(df["Abstract_Clean"].map(_split_sentences).map(len)) title_sentences = sum(df["Title_Clean"].map(_split_sentences).map(len)) df.to_json("outputs/cleaned_data.json", orient="records", indent=2) return json.dumps({ "status": "loaded", "papers": int(len(df)), "abstract_sentences": int(abstract_sentences), "title_sentences": int(title_sentences), "columns": list(df.columns), "sample_titles": list(df["Title"].head(5)), }, indent=2) @tool def run_bertopic_discovery(run_config: str) -> str: """ Embed sentences, cluster with AgglomerativeClustering (cosine, threshold=0.7), extract top-5 evidence sentences per cluster, generate Plotly charts, and save summaries.json and embeddings.npy. Args: run_config: JSON string with key 'columns' — list of column names to use, e.g. '{"columns": ["Abstract"]}' or '{"columns": ["Title"]}' or '{"columns": ["Abstract", "Title"]}'. Returns: JSON string summarising clusters found. """ config = json.loads(run_config) columns = config.get("columns", ["Abstract"]) tag = "_".join(columns).lower() cleaned_data_path = "outputs/cleaned_data.json" if not os.path.exists(cleaned_data_path): return json.dumps({ "status": "error", "message": "Cleaned data file not found. Run load_scopus_csv first.", }, indent=2) df = pd.read_json(cleaned_data_path) col_map = {"Abstract": "Abstract_Clean", "Title": "Title_Clean"} use_cols = list(map(lambda c: col_map.get(c, c), columns)) missing_columns = list(filter(lambda c: c not in df.columns, use_cols)) if missing_columns: return json.dumps({ "status": "error", "message": f"Missing cleaned columns: {', '.join(missing_columns)}", "available_columns": list(df.columns), }, indent=2) # Collect (sentence, paper_index) pairs pairs = [] def _extract(row_tuple): idx, row = row_tuple return list(map(lambda s: (s, idx), _split_sentences(" ".join(str(row[c]) for c in use_cols)))) all_pairs = sum(map(_extract, df.iterrows()), []) sentences = list(map(lambda p: p[0], all_pairs)) paper_ids = list(map(lambda p: p[1], all_pairs)) if not sentences: empty_summaries_path = f"outputs/summaries_{tag}.json" with open(empty_summaries_path, "w", encoding="utf-8") as f: json.dump([], f, indent=2) return json.dumps({ "status": "completed", "tag": tag, "n_clusters": 0, "total_sentences": 0, "summaries_file": empty_summaries_path, "message": "No sentences available after preprocessing.", }, indent=2) model = SentenceTransformer("all-MiniLM-L6-v2") embeddings = model.encode(sentences, show_progress_bar=True, batch_size=64) # Convert to float32 and L2-normalise in-place to avoid large float64 copies embeddings = np.asarray(embeddings, dtype=np.float32) norms = np.linalg.norm(embeddings, axis=1, keepdims=True) embeddings = embeddings / (norms + 1e-12) embeddings = embeddings.astype(np.float32, copy=False) np.save(f"outputs/embeddings_{tag}.npy", embeddings) clusterer = AgglomerativeClustering( n_clusters=None, metric="cosine", linkage="average", distance_threshold=0.3 # cosine distance = 1 – similarity; 0.3 ≈ similarity 0.7 ) labels = clusterer.fit_predict(embeddings) n_clusters = int(max(labels) + 1) def _summarise_cluster(cid): mask = np.where(np.array(labels) == cid)[0] vecs = embeddings[mask] if vecs.size == 0: top_sents = [] top_pids = [] size = 0 else: centroid = vecs.mean(axis=0, keepdims=True) sims = cosine_similarity(centroid, vecs)[0] top5_idx = mask[np.argsort(sims)[::-1][:5]] top_sents = list(map(lambda i: sentences[i], top5_idx)) top_pids = list(sorted(set(map(lambda i: int(paper_ids[i]), top5_idx)))) size = int(len(mask)) return { "cluster_id": cid, "size": size, "papers": top_pids, "top_sentences": top_sents, "label": f"Cluster_{cid}", "approved": False, "rename_to": "", "reasoning": "", } summaries = list(map(_summarise_cluster, range(n_clusters))) with open(f"outputs/summaries_{tag}.json", "w") as f: json.dump(summaries, f, indent=2) sizes = list(map(lambda s: s["size"], summaries)) cids = list(map(lambda s: f"C{s['cluster_id']}", summaries)) fig_dist = px.bar(x=cids, y=sizes, labels={"x": "Cluster", "y": "Sentences"}, title=f"Topic Distribution ({tag})", color=sizes, color_continuous_scale="Blues") fig_dist.write_html("outputs/chart_distribution.html") # Build centroids (one vector per cluster) using float32 to reduce memory centroids = [] emb_arr = embeddings labels_arr = np.array(labels) for s in summaries: mask = np.where(labels_arr == s["cluster_id"])[0] if mask.size == 0: centroids.append(np.zeros((emb_arr.shape[1],), dtype=np.float32)) else: centroids.append(emb_arr[mask].mean(axis=0).astype(np.float32)) centroids = np.vstack(centroids).astype(np.float32) # Avoid computing an enormous n_clusters x n_clusters heatmap which can OOM. HEATMAP_MAX = 300 if centroids.shape[0] > HEATMAP_MAX: with open("outputs/chart_heatmap.html", "w", encoding="utf-8") as f: f.write(f"

Heatmap skipped: {centroids.shape[0]} clusters exceeds safe limit ({HEATMAP_MAX}).

") else: sim_matrix = cosine_similarity(centroids.astype(np.float32)) fig_heat = go.Figure(go.Heatmap(z=sim_matrix, x=cids, y=cids, colorscale="Viridis")) fig_heat.update_layout(title=f"Cluster Similarity Heatmap ({tag})") fig_heat.write_html("outputs/chart_heatmap.html") return json.dumps({ "status": "completed", "tag": tag, "n_clusters": n_clusters, "total_sentences": len(sentences), "summaries_file": f"outputs/summaries_{tag}.json", }, indent=2) @tool def label_topics_with_llm(labelling_input: str) -> str: """ Use the LLM to generate a human-readable label, category, confidence score, and reasoning for each cluster based on its top evidence sentences. Args: labelling_input: JSON string with keys: - 'tag': run tag (e.g. 'abstract' or 'title') - 'llm_labels': list of dicts, each with keys 'cluster_id', 'label', 'category', 'confidence', 'reasoning' as returned by the LLM's own analysis. Returns: JSON string confirming labels saved. """ data = json.loads(labelling_input) tag = data.get("tag", "abstract") llm_labels = data.get("llm_labels", []) summaries_path = f"outputs/summaries_{tag}.json" with open(summaries_path) as f: summaries = json.load(f) label_map = {item["cluster_id"]: item for item in llm_labels} def _apply_label(s): update = label_map.get(s["cluster_id"], {}) return {**s, "label": update.get("label", s["label"]), "category": update.get("category", ""), "confidence": update.get("confidence", 0.0), "reasoning": update.get("reasoning", "")} updated = list(map(_apply_label, summaries)) with open(summaries_path, "w") as f: json.dump(updated, f, indent=2) return json.dumps({ "status": "labelled", "tag": tag, "topics_labelled": len(updated), }, indent=2) # ══════════════════════════════════════════════════════════════════════════════ # TOOL 4 — consolidate_into_themes # ══════════════════════════════════════════════════════════════════════════════ @tool def consolidate_into_themes(consolidation_input: str) -> str: """ Merge approved clusters into final themes based on user review table. Recomputes merged centroids and saves themes.json. Args: consolidation_input: JSON string with keys: - 'tag': run tag - 'approvals': list of dicts with keys 'cluster_id', 'approved' (bool), 'rename_to' (str), 'reasoning' (str) Returns: JSON string summarising final themes. """ data = json.loads(consolidation_input) tag = data.get("tag", "abstract") approvals = data.get("approvals", []) summaries_path = f"outputs/summaries_{tag}.json" with open(summaries_path) as f: summaries = json.load(f) approval_map = {a["cluster_id"]: a for a in approvals} def _apply_approval(s): a = approval_map.get(s["cluster_id"], {}) return {**s, "approved": a.get("approved", False), "rename_to": a.get("rename_to", ""), "reasoning": a.get("reasoning", "")} updated = list(map(_apply_approval, summaries)) approved = list(filter(lambda s: s["approved"], updated)) def _finalise(s): final_label = s["rename_to"].strip() if s["rename_to"].strip() else s["label"] return {**s, "final_label": final_label} themes = list(map(_finalise, approved)) with open(f"outputs/themes_{tag}.json", "w") as f: json.dump(themes, f, indent=2) # ── Keyword chart per theme ──────────────────────────────────────────────── from collections import Counter stop = {"the","a","an","of","in","and","to","is","for","with","that","this","on","are", "by","as","from","be","was","at","it","or","has","have","been","which","their"} def _top_words(s): words = re.findall(r"\b[a-z]{4,}\b", " ".join(s.get("top_sentences", [])).lower()) filtered = list(filter(lambda w: w not in stop, words)) counted = Counter(filtered).most_common(5) return list(map(lambda kv: {"theme": s["final_label"], "word": kv[0], "count": kv[1]}, counted)) kw_rows = sum(map(_top_words, themes), []) kw_df = pd.DataFrame(kw_rows) if len(kw_df) > 0: fig_kw = px.bar(kw_df, x="count", y="word", color="theme", orientation="h", title="Top Keywords per Theme", barmode="group") fig_kw.write_html("outputs/chart_keywords.html") else: with open("outputs/chart_keywords.html", "w", encoding="utf-8") as f: f.write("

No approved themes available yet.

") return json.dumps({ "status": "consolidated", "tag": tag, "themes_count": len(themes), "themes": list(map(lambda t: t["final_label"], themes)), }, indent=2) # ══════════════════════════════════════════════════════════════════════════════ # TOOL 5 — compare_with_taxonomy # ══════════════════════════════════════════════════════════════════════════════ @tool def compare_with_taxonomy(taxonomy_input: str) -> str: """ Map final themes to the PAJAIS taxonomy. Identify MAPPED vs NOVEL themes. Args: taxonomy_input: JSON string with keys: - 'tag': run tag - 'mappings': list of dicts with keys 'final_label', 'pajais_category' (str or ''), 'mapped' (bool) Returns: JSON string with mapping results saved to taxonomy_mapping.json. """ PAJAIS_TAXONOMY = [ "IS Strategy & Governance", "AI & Machine Learning Applications", "Digital Transformation", "Human-Computer Interaction", "Knowledge Management", "Information Security & Privacy", "Business Intelligence & Analytics", "Enterprise Systems", "E-Commerce & Digital Markets", "IT Adoption & Acceptance", "Social Media & Collaboration", "Healthcare IS", "IS Research Methods", "Emerging Technologies", ] data = json.loads(taxonomy_input) tag = data.get("tag", "abstract") mappings = data.get("mappings", []) themes_path = f"outputs/themes_{tag}.json" with open(themes_path) as f: themes = json.load(f) mapping_map = {m["final_label"]: m for m in mappings} def _map_theme(t): m = mapping_map.get(t["final_label"], {}) status = "MAPPED" if m.get("mapped", False) else "NOVEL" return {**t, "pajais_category": m.get("pajais_category", ""), "mapping_status": status} mapped_themes = list(map(_map_theme, themes)) with open(f"outputs/taxonomy_mapping_{tag}.json", "w") as f: json.dump(mapped_themes, f, indent=2) mapped_count = len(list(filter(lambda t: t["mapping_status"] == "MAPPED", mapped_themes))) novel_count = len(mapped_themes) - mapped_count return json.dumps({ "status": "mapped", "tag": tag, "total_themes": len(mapped_themes), "mapped_count": mapped_count, "novel_count": novel_count, "pajais_taxonomy": PAJAIS_TAXONOMY, "output_file": f"outputs/taxonomy_mapping_{tag}.json", }, indent=2) # ══════════════════════════════════════════════════════════════════════════════ # TOOL 6 — generate_comparison_csv # ══════════════════════════════════════════════════════════════════════════════ @tool def generate_comparison_csv(comparison_input: str) -> str: """ Compare Abstract-derived themes vs Title-derived themes. Produce a side-by-side CSV and a Plotly comparison chart. Args: comparison_input: JSON string with key 'tags' — list of two run tags, e.g. '{"tags": ["abstract", "title"]}'. Returns: JSON string with path to comparison CSV. """ data = json.loads(comparison_input) tags = data.get("tags", ["abstract", "title"]) def _load_themes(tag): path = f"outputs/themes_{tag}.json" with open(path) as f: themes = json.load(f) return list(map(lambda t: { "tag": tag, "final_label": t["final_label"], "size": t["size"], "papers": len(t.get("papers", [])), }, themes)) all_rows = sum(map(_load_themes, tags), []) df = pd.DataFrame(all_rows) df.to_csv("outputs/theme_comparison.csv", index=False) if len(df) > 0: fig = px.bar(df, x="final_label", y="size", color="tag", barmode="group", title="Abstract vs Title Theme Comparison", labels={"final_label": "Theme", "size": "Sentences", "tag": "Source"}) fig.write_html("outputs/chart_comparison.html") else: with open("outputs/chart_comparison.html", "w", encoding="utf-8") as f: f.write("

No theme comparison available yet.

") return json.dumps({ "status": "comparison_generated", "csv_path": "outputs/theme_comparison.csv", "chart_path": "outputs/chart_comparison.html", "total_rows": len(df), }, indent=2) # ══════════════════════════════════════════════════════════════════════════════ # TOOL 7 — export_narrative # ══════════════════════════════════════════════════════════════════════════════ @tool def export_narrative(narrative_input: str) -> str: """ Generate a ~500-word Section 7 narrative report summarising all themes, their PAJAIS mapping, and key insights. Save as narrative_report.txt. Args: narrative_input: JSON string with keys: - 'tag': run tag to base report on - 'narrative': the 500-word narrative text (written by the LLM) - 'researcher_name': optional researcher name Returns: JSON string confirming report saved. """ data = json.loads(narrative_input) tag = data.get("tag", "abstract") narrative_text = data.get("narrative", "") researcher_name = data.get("researcher_name", "Researcher") # Auto-trim narrative to a maximum word count to avoid oversized reports try: max_words = int(data.get("max_words", 500)) except Exception: max_words = 500 words = narrative_text.split() trimmed = False if len(words) > max_words: narrative_text = " ".join(words[:max_words]).rstrip() + " ..." trimmed = True mapping_path = f"outputs/taxonomy_mapping_{tag}.json" with open(mapping_path) as f: themes = json.load(f) theme_lines = list(map( lambda t: f" • {t['final_label']} [{t.get('mapping_status','?')}]" f" — PAJAIS: {t.get('pajais_category','N/A')}", themes )) full_report = "\n".join([ "=" * 60, "SECTION 7: THEMATIC ANALYSIS NARRATIVE REPORT", f"Researcher: {researcher_name}", f"Source: {tag.upper()} columns", "=" * 60, "", narrative_text, "", "─" * 60, "THEME SUMMARY TABLE", "─" * 60, "\n".join(theme_lines), "", "=" * 60, ]) report_path = "outputs/narrative_report.txt" with open(report_path, "w", encoding="utf-8") as f: f.write(full_report) return json.dumps({ "status": "report_saved", "report_path": report_path, "word_count": len(narrative_text.split()), "trimmed": trimmed, "themes_in_report": len(themes), }, indent=2)