Spaces:
Sleeping
Sleeping
| """ | |
| tools.py β 7 Stateless LangChain Tools for BERTopic Agentic Thematic Analysis | |
| All tools are decorated with @tool and use handle_tool_error=True. | |
| No if/elif/else, no for/while loops, no try/except blocks. | |
| """ | |
| import json | |
| import os | |
| import re | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from langchain_core.tools import tool | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.cluster import AgglomerativeClustering | |
| from sklearn.preprocessing import normalize | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| os.makedirs("outputs", exist_ok=True) | |
| BOILERPLATE_PATTERNS = [ | |
| r"Β©\s*\d{4}.*", | |
| r"all rights reserved.*", | |
| r"published by elsevier.*", | |
| r"this paper (proposes|presents|investigates|aims)", | |
| r"in this (paper|study|article|work)", | |
| r"the purpose of this (paper|study)", | |
| ] | |
| def _clean_text(text: str) -> str: | |
| """Remove boilerplate from a single text string.""" | |
| text = str(text).lower().strip() | |
| cleaned = re.sub("|".join(BOILERPLATE_PATTERNS), "", text, flags=re.IGNORECASE) | |
| return cleaned.strip() | |
| def _split_sentences(text: str) -> list: | |
| """Split text into sentences on '. ', '? ', '! '.""" | |
| raw = re.split(r"(?<=[.!?])\s+", str(text).strip()) | |
| return list(filter(lambda s: len(s.split()) > 4, raw)) | |
| def load_scopus_csv(file_path: str) -> str: | |
| """ | |
| Load a Scopus CSV file and return a summary of its contents. | |
| Args: | |
| file_path: Absolute or relative path to the Scopus CSV file. | |
| Returns: | |
| JSON string with keys: papers, abstract_sentences, title_sentences, | |
| columns, sample_titles, status. | |
| """ | |
| df = pd.read_csv(file_path, encoding="utf-8-sig") | |
| missing_columns = list(filter(lambda col: col not in df.columns, ["Title", "Abstract"])) | |
| if missing_columns: | |
| return json.dumps({ | |
| "status": "error", | |
| "message": f"Missing required columns: {', '.join(missing_columns)}", | |
| "columns": list(df.columns), | |
| }, indent=2) | |
| # Keep only rows with non-empty Title and Abstract | |
| df = df[df["Title"].notna() & df["Abstract"].notna()].reset_index(drop=True) | |
| df["Abstract_Clean"] = df["Abstract"].map(_clean_text) | |
| df["Title_Clean"] = df["Title"].map(_clean_text) | |
| abstract_sentences = sum(df["Abstract_Clean"].map(_split_sentences).map(len)) | |
| title_sentences = sum(df["Title_Clean"].map(_split_sentences).map(len)) | |
| df.to_json("outputs/cleaned_data.json", orient="records", indent=2) | |
| return json.dumps({ | |
| "status": "loaded", | |
| "papers": int(len(df)), | |
| "abstract_sentences": int(abstract_sentences), | |
| "title_sentences": int(title_sentences), | |
| "columns": list(df.columns), | |
| "sample_titles": list(df["Title"].head(5)), | |
| }, indent=2) | |
| def run_bertopic_discovery(run_config: str) -> str: | |
| """ | |
| Embed sentences, cluster with AgglomerativeClustering (cosine, threshold=0.7), | |
| extract top-5 evidence sentences per cluster, generate Plotly charts, and | |
| save summaries.json and embeddings.npy. | |
| Args: | |
| run_config: JSON string with key 'columns' β list of column names to use, | |
| e.g. '{"columns": ["Abstract"]}' or '{"columns": ["Title"]}' | |
| or '{"columns": ["Abstract", "Title"]}'. | |
| Returns: | |
| JSON string summarising clusters found. | |
| """ | |
| config = json.loads(run_config) | |
| columns = config.get("columns", ["Abstract"]) | |
| tag = "_".join(columns).lower() | |
| cleaned_data_path = "outputs/cleaned_data.json" | |
| if not os.path.exists(cleaned_data_path): | |
| return json.dumps({ | |
| "status": "error", | |
| "message": "Cleaned data file not found. Run load_scopus_csv first.", | |
| }, indent=2) | |
| df = pd.read_json(cleaned_data_path) | |
| col_map = {"Abstract": "Abstract_Clean", "Title": "Title_Clean"} | |
| use_cols = list(map(lambda c: col_map.get(c, c), columns)) | |
| missing_columns = list(filter(lambda c: c not in df.columns, use_cols)) | |
| if missing_columns: | |
| return json.dumps({ | |
| "status": "error", | |
| "message": f"Missing cleaned columns: {', '.join(missing_columns)}", | |
| "available_columns": list(df.columns), | |
| }, indent=2) | |
| # Collect (sentence, paper_index) pairs | |
| pairs = [] | |
| def _extract(row_tuple): | |
| idx, row = row_tuple | |
| return list(map(lambda s: (s, idx), | |
| _split_sentences(" ".join(str(row[c]) for c in use_cols)))) | |
| all_pairs = sum(map(_extract, df.iterrows()), []) | |
| sentences = list(map(lambda p: p[0], all_pairs)) | |
| paper_ids = list(map(lambda p: p[1], all_pairs)) | |
| if not sentences: | |
| empty_summaries_path = f"outputs/summaries_{tag}.json" | |
| with open(empty_summaries_path, "w", encoding="utf-8") as f: | |
| json.dump([], f, indent=2) | |
| return json.dumps({ | |
| "status": "completed", | |
| "tag": tag, | |
| "n_clusters": 0, | |
| "total_sentences": 0, | |
| "summaries_file": empty_summaries_path, | |
| "message": "No sentences available after preprocessing.", | |
| }, indent=2) | |
| model = SentenceTransformer("all-MiniLM-L6-v2") | |
| embeddings = model.encode(sentences, show_progress_bar=True, batch_size=64) | |
| # Convert to float32 and L2-normalise in-place to avoid large float64 copies | |
| embeddings = np.asarray(embeddings, dtype=np.float32) | |
| norms = np.linalg.norm(embeddings, axis=1, keepdims=True) | |
| embeddings = embeddings / (norms + 1e-12) | |
| embeddings = embeddings.astype(np.float32, copy=False) | |
| np.save(f"outputs/embeddings_{tag}.npy", embeddings) | |
| clusterer = AgglomerativeClustering( | |
| n_clusters=None, | |
| metric="cosine", | |
| linkage="average", | |
| distance_threshold=0.3 # cosine distance = 1 β similarity; 0.3 β similarity 0.7 | |
| ) | |
| labels = clusterer.fit_predict(embeddings) | |
| n_clusters = int(max(labels) + 1) | |
| def _summarise_cluster(cid): | |
| mask = np.where(np.array(labels) == cid)[0] | |
| vecs = embeddings[mask] | |
| if vecs.size == 0: | |
| top_sents = [] | |
| top_pids = [] | |
| size = 0 | |
| else: | |
| centroid = vecs.mean(axis=0, keepdims=True) | |
| sims = cosine_similarity(centroid, vecs)[0] | |
| top5_idx = mask[np.argsort(sims)[::-1][:5]] | |
| top_sents = list(map(lambda i: sentences[i], top5_idx)) | |
| top_pids = list(sorted(set(map(lambda i: int(paper_ids[i]), top5_idx)))) | |
| size = int(len(mask)) | |
| return { | |
| "cluster_id": cid, | |
| "size": size, | |
| "papers": top_pids, | |
| "top_sentences": top_sents, | |
| "label": f"Cluster_{cid}", | |
| "approved": False, | |
| "rename_to": "", | |
| "reasoning": "", | |
| } | |
| summaries = list(map(_summarise_cluster, range(n_clusters))) | |
| with open(f"outputs/summaries_{tag}.json", "w") as f: | |
| json.dump(summaries, f, indent=2) | |
| sizes = list(map(lambda s: s["size"], summaries)) | |
| cids = list(map(lambda s: f"C{s['cluster_id']}", summaries)) | |
| fig_dist = px.bar(x=cids, y=sizes, labels={"x": "Cluster", "y": "Sentences"}, | |
| title=f"Topic Distribution ({tag})", color=sizes, | |
| color_continuous_scale="Blues") | |
| fig_dist.write_html("outputs/chart_distribution.html") | |
| # Build centroids (one vector per cluster) using float32 to reduce memory | |
| centroids = [] | |
| emb_arr = embeddings | |
| labels_arr = np.array(labels) | |
| for s in summaries: | |
| mask = np.where(labels_arr == s["cluster_id"])[0] | |
| if mask.size == 0: | |
| centroids.append(np.zeros((emb_arr.shape[1],), dtype=np.float32)) | |
| else: | |
| centroids.append(emb_arr[mask].mean(axis=0).astype(np.float32)) | |
| centroids = np.vstack(centroids).astype(np.float32) | |
| # Avoid computing an enormous n_clusters x n_clusters heatmap which can OOM. | |
| HEATMAP_MAX = 300 | |
| if centroids.shape[0] > HEATMAP_MAX: | |
| with open("outputs/chart_heatmap.html", "w", encoding="utf-8") as f: | |
| f.write(f"<p style='color:grey'>Heatmap skipped: {centroids.shape[0]} clusters exceeds safe limit ({HEATMAP_MAX}).</p>") | |
| else: | |
| sim_matrix = cosine_similarity(centroids.astype(np.float32)) | |
| fig_heat = go.Figure(go.Heatmap(z=sim_matrix, x=cids, y=cids, | |
| colorscale="Viridis")) | |
| fig_heat.update_layout(title=f"Cluster Similarity Heatmap ({tag})") | |
| fig_heat.write_html("outputs/chart_heatmap.html") | |
| return json.dumps({ | |
| "status": "completed", | |
| "tag": tag, | |
| "n_clusters": n_clusters, | |
| "total_sentences": len(sentences), | |
| "summaries_file": f"outputs/summaries_{tag}.json", | |
| }, indent=2) | |
| def label_topics_with_llm(labelling_input: str) -> str: | |
| """ | |
| Use the LLM to generate a human-readable label, category, confidence score, | |
| and reasoning for each cluster based on its top evidence sentences. | |
| Args: | |
| labelling_input: JSON string with keys: | |
| - 'tag': run tag (e.g. 'abstract' or 'title') | |
| - 'llm_labels': list of dicts, each with keys | |
| 'cluster_id', 'label', 'category', 'confidence', 'reasoning' | |
| as returned by the LLM's own analysis. | |
| Returns: | |
| JSON string confirming labels saved. | |
| """ | |
| data = json.loads(labelling_input) | |
| tag = data.get("tag", "abstract") | |
| llm_labels = data.get("llm_labels", []) | |
| summaries_path = f"outputs/summaries_{tag}.json" | |
| with open(summaries_path) as f: | |
| summaries = json.load(f) | |
| label_map = {item["cluster_id"]: item for item in llm_labels} | |
| def _apply_label(s): | |
| update = label_map.get(s["cluster_id"], {}) | |
| return {**s, | |
| "label": update.get("label", s["label"]), | |
| "category": update.get("category", ""), | |
| "confidence": update.get("confidence", 0.0), | |
| "reasoning": update.get("reasoning", "")} | |
| updated = list(map(_apply_label, summaries)) | |
| with open(summaries_path, "w") as f: | |
| json.dump(updated, f, indent=2) | |
| return json.dumps({ | |
| "status": "labelled", | |
| "tag": tag, | |
| "topics_labelled": len(updated), | |
| }, indent=2) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOOL 4 β consolidate_into_themes | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def consolidate_into_themes(consolidation_input: str) -> str: | |
| """ | |
| Merge approved clusters into final themes based on user review table. | |
| Recomputes merged centroids and saves themes.json. | |
| Args: | |
| consolidation_input: JSON string with keys: | |
| - 'tag': run tag | |
| - 'approvals': list of dicts with keys | |
| 'cluster_id', 'approved' (bool), 'rename_to' (str), 'reasoning' (str) | |
| Returns: | |
| JSON string summarising final themes. | |
| """ | |
| data = json.loads(consolidation_input) | |
| tag = data.get("tag", "abstract") | |
| approvals = data.get("approvals", []) | |
| summaries_path = f"outputs/summaries_{tag}.json" | |
| with open(summaries_path) as f: | |
| summaries = json.load(f) | |
| approval_map = {a["cluster_id"]: a for a in approvals} | |
| def _apply_approval(s): | |
| a = approval_map.get(s["cluster_id"], {}) | |
| return {**s, | |
| "approved": a.get("approved", False), | |
| "rename_to": a.get("rename_to", ""), | |
| "reasoning": a.get("reasoning", "")} | |
| updated = list(map(_apply_approval, summaries)) | |
| approved = list(filter(lambda s: s["approved"], updated)) | |
| def _finalise(s): | |
| final_label = s["rename_to"].strip() if s["rename_to"].strip() else s["label"] | |
| return {**s, "final_label": final_label} | |
| themes = list(map(_finalise, approved)) | |
| with open(f"outputs/themes_{tag}.json", "w") as f: | |
| json.dump(themes, f, indent=2) | |
| # ββ Keyword chart per theme ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from collections import Counter | |
| stop = {"the","a","an","of","in","and","to","is","for","with","that","this","on","are", | |
| "by","as","from","be","was","at","it","or","has","have","been","which","their"} | |
| def _top_words(s): | |
| words = re.findall(r"\b[a-z]{4,}\b", | |
| " ".join(s.get("top_sentences", [])).lower()) | |
| filtered = list(filter(lambda w: w not in stop, words)) | |
| counted = Counter(filtered).most_common(5) | |
| return list(map(lambda kv: {"theme": s["final_label"], | |
| "word": kv[0], "count": kv[1]}, counted)) | |
| kw_rows = sum(map(_top_words, themes), []) | |
| kw_df = pd.DataFrame(kw_rows) | |
| if len(kw_df) > 0: | |
| fig_kw = px.bar(kw_df, x="count", y="word", color="theme", | |
| orientation="h", title="Top Keywords per Theme", | |
| barmode="group") | |
| fig_kw.write_html("outputs/chart_keywords.html") | |
| else: | |
| with open("outputs/chart_keywords.html", "w", encoding="utf-8") as f: | |
| f.write("<p style='color:grey'>No approved themes available yet.</p>") | |
| return json.dumps({ | |
| "status": "consolidated", | |
| "tag": tag, | |
| "themes_count": len(themes), | |
| "themes": list(map(lambda t: t["final_label"], themes)), | |
| }, indent=2) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOOL 5 β compare_with_taxonomy | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compare_with_taxonomy(taxonomy_input: str) -> str: | |
| """ | |
| Map final themes to the PAJAIS taxonomy. Identify MAPPED vs NOVEL themes. | |
| Args: | |
| taxonomy_input: JSON string with keys: | |
| - 'tag': run tag | |
| - 'mappings': list of dicts with keys | |
| 'final_label', 'pajais_category' (str or ''), 'mapped' (bool) | |
| Returns: | |
| JSON string with mapping results saved to taxonomy_mapping.json. | |
| """ | |
| PAJAIS_TAXONOMY = [ | |
| "IS Strategy & Governance", "AI & Machine Learning Applications", | |
| "Digital Transformation", "Human-Computer Interaction", | |
| "Knowledge Management", "Information Security & Privacy", | |
| "Business Intelligence & Analytics", "Enterprise Systems", | |
| "E-Commerce & Digital Markets", "IT Adoption & Acceptance", | |
| "Social Media & Collaboration", "Healthcare IS", | |
| "IS Research Methods", "Emerging Technologies", | |
| ] | |
| data = json.loads(taxonomy_input) | |
| tag = data.get("tag", "abstract") | |
| mappings = data.get("mappings", []) | |
| themes_path = f"outputs/themes_{tag}.json" | |
| with open(themes_path) as f: | |
| themes = json.load(f) | |
| mapping_map = {m["final_label"]: m for m in mappings} | |
| def _map_theme(t): | |
| m = mapping_map.get(t["final_label"], {}) | |
| status = "MAPPED" if m.get("mapped", False) else "NOVEL" | |
| return {**t, | |
| "pajais_category": m.get("pajais_category", ""), | |
| "mapping_status": status} | |
| mapped_themes = list(map(_map_theme, themes)) | |
| with open(f"outputs/taxonomy_mapping_{tag}.json", "w") as f: | |
| json.dump(mapped_themes, f, indent=2) | |
| mapped_count = len(list(filter(lambda t: t["mapping_status"] == "MAPPED", mapped_themes))) | |
| novel_count = len(mapped_themes) - mapped_count | |
| return json.dumps({ | |
| "status": "mapped", | |
| "tag": tag, | |
| "total_themes": len(mapped_themes), | |
| "mapped_count": mapped_count, | |
| "novel_count": novel_count, | |
| "pajais_taxonomy": PAJAIS_TAXONOMY, | |
| "output_file": f"outputs/taxonomy_mapping_{tag}.json", | |
| }, indent=2) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOOL 6 β generate_comparison_csv | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_comparison_csv(comparison_input: str) -> str: | |
| """ | |
| Compare Abstract-derived themes vs Title-derived themes. | |
| Produce a side-by-side CSV and a Plotly comparison chart. | |
| Args: | |
| comparison_input: JSON string with key 'tags' β list of two run tags, | |
| e.g. '{"tags": ["abstract", "title"]}'. | |
| Returns: | |
| JSON string with path to comparison CSV. | |
| """ | |
| data = json.loads(comparison_input) | |
| tags = data.get("tags", ["abstract", "title"]) | |
| def _load_themes(tag): | |
| path = f"outputs/themes_{tag}.json" | |
| with open(path) as f: | |
| themes = json.load(f) | |
| return list(map(lambda t: { | |
| "tag": tag, | |
| "final_label": t["final_label"], | |
| "size": t["size"], | |
| "papers": len(t.get("papers", [])), | |
| }, themes)) | |
| all_rows = sum(map(_load_themes, tags), []) | |
| df = pd.DataFrame(all_rows) | |
| df.to_csv("outputs/theme_comparison.csv", index=False) | |
| if len(df) > 0: | |
| fig = px.bar(df, x="final_label", y="size", color="tag", barmode="group", | |
| title="Abstract vs Title Theme Comparison", | |
| labels={"final_label": "Theme", "size": "Sentences", "tag": "Source"}) | |
| fig.write_html("outputs/chart_comparison.html") | |
| else: | |
| with open("outputs/chart_comparison.html", "w", encoding="utf-8") as f: | |
| f.write("<p style='color:grey'>No theme comparison available yet.</p>") | |
| return json.dumps({ | |
| "status": "comparison_generated", | |
| "csv_path": "outputs/theme_comparison.csv", | |
| "chart_path": "outputs/chart_comparison.html", | |
| "total_rows": len(df), | |
| }, indent=2) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOOL 7 β export_narrative | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def export_narrative(narrative_input: str) -> str: | |
| """ | |
| Generate a ~500-word Section 7 narrative report summarising all themes, | |
| their PAJAIS mapping, and key insights. Save as narrative_report.txt. | |
| Args: | |
| narrative_input: JSON string with keys: | |
| - 'tag': run tag to base report on | |
| - 'narrative': the 500-word narrative text (written by the LLM) | |
| - 'researcher_name': optional researcher name | |
| Returns: | |
| JSON string confirming report saved. | |
| """ | |
| data = json.loads(narrative_input) | |
| tag = data.get("tag", "abstract") | |
| narrative_text = data.get("narrative", "") | |
| researcher_name = data.get("researcher_name", "Researcher") | |
| # Auto-trim narrative to a maximum word count to avoid oversized reports | |
| try: | |
| max_words = int(data.get("max_words", 500)) | |
| except Exception: | |
| max_words = 500 | |
| words = narrative_text.split() | |
| trimmed = False | |
| if len(words) > max_words: | |
| narrative_text = " ".join(words[:max_words]).rstrip() + " ..." | |
| trimmed = True | |
| mapping_path = f"outputs/taxonomy_mapping_{tag}.json" | |
| with open(mapping_path) as f: | |
| themes = json.load(f) | |
| theme_lines = list(map( | |
| lambda t: f" β’ {t['final_label']} [{t.get('mapping_status','?')}]" | |
| f" β PAJAIS: {t.get('pajais_category','N/A')}", | |
| themes | |
| )) | |
| full_report = "\n".join([ | |
| "=" * 60, | |
| "SECTION 7: THEMATIC ANALYSIS NARRATIVE REPORT", | |
| f"Researcher: {researcher_name}", | |
| f"Source: {tag.upper()} columns", | |
| "=" * 60, | |
| "", | |
| narrative_text, | |
| "", | |
| "β" * 60, | |
| "THEME SUMMARY TABLE", | |
| "β" * 60, | |
| "\n".join(theme_lines), | |
| "", | |
| "=" * 60, | |
| ]) | |
| report_path = "outputs/narrative_report.txt" | |
| with open(report_path, "w", encoding="utf-8") as f: | |
| f.write(full_report) | |
| return json.dumps({ | |
| "status": "report_saved", | |
| "report_path": report_path, | |
| "word_count": len(narrative_text.split()), | |
| "trimmed": trimmed, | |
| "themes_in_report": len(themes), | |
| }, indent=2) | |