Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| tools.py β 10 @tool functions for Braun & Clarke (2006) computational | |
| thematic analysis. | |
| Pipeline (called in this order by the LLM agent): | |
| 1. load_scopus_csv β ingest CSV, strip boilerplate, save .parquet | |
| 2. run_bertopic_discovery β embed β cosine agglomerative cluster (min 3 | |
| members) β centroids β orphan report β 4 charts | |
| 3. label_topics_with_llm β Mistral labels top 100 clusters | |
| 4. reassign_sentences β move orphan/misplaced sentences between clusters | |
| 5. consolidate_into_themes β merge reviewer-approved groups | |
| 6. compute_saturation β coverage %, coherence, balance per theme | |
| 7. generate_theme_profiles β top 5 nearest sentences per theme centroid | |
| 8. compare_with_taxonomy β map themes to PAJAIS 25 categories | |
| 9. generate_comparison_csv β abstract vs title side-by-side | |
| 10. export_narrative β 500-word Section 7 via Mistral | |
| Design rules: | |
| Every number, percentage, score, or list of sentences presented to the | |
| reviewer MUST come from a tool β never from the LLM's imagination. | |
| Deterministic tools (1,2,4,5,6,7,9): same input β same output, every run. | |
| LLM-dependent tools (3,8,10): grounded in real data passed via prompt, | |
| but labels/mappings/narrative may vary slightly between runs. | |
| All LLM-dependent outputs require reviewer approval before advancing. | |
| ZERO if/elif/else β all decisions by the LLM | |
| ZERO for/while β list(map(...)) and numpy vectorised ops | |
| ZERO try/except β errors surface to the LLM via ToolNode | |
| Constants reference: | |
| EMBED_MODEL = "all-MiniLM-L6-v2" | |
| 384d sentence embeddings. Runs locally, no API calls. | |
| normalize_embeddings=True β cosine similarity = dot product. | |
| CLUSTER_THRESHOLD = 0.50 | |
| Cosine distance threshold for Agglomerative Clustering. | |
| Two sentences must have cosine similarity >= 0.50 to share a code. | |
| Follows the BERTopic Agglomerative Clustering configuration | |
| (Grootendorst, 2022) with distance_threshold=0.5 as documented | |
| in the BERTopic framework. Operationalises Braun & Clarke (2006) | |
| Phase 2 'Generating Initial Codes' as a reproducible computation. | |
| Tighter (e.g. 0.40) β more, finer codes (closer to B&C ideal) | |
| Looser (e.g. 0.60) β fewer, broader codes | |
| At 0.50 β balanced granularity following BERTopic docs example. | |
| MIN_CLUSTER_SIZE = 3 | |
| Clusters with fewer than 3 members are dissolved. Their sentences | |
| become orphans (label=-1) reported to the reviewer for reassignment. | |
| N_CENTROIDS = 200 | |
| Maximum number of clusters saved to summaries.json (and therefore | |
| labelled and shown in the review table). Set high enough to capture | |
| all clusters in typical Scopus datasets (1k-5k papers). | |
| Top clusters extracted for initial discovery report and charts. | |
| TOP_TOPICS_LLM = 100 | |
| Maximum clusters sent to Mistral for labelling. | |
| NARRATIVE_WORDS = 500 | |
| Target word count for Section 7 narrative. | |
| PAJAIS_25 | |
| 25 IS research categories from Jiang et al. (2019). | |
| Used in Phase 5.5 for taxonomy alignment. | |
| BOILERPLATE_PATTERNS (9 regexes) | |
| Strip publisher noise: copyright, DOI, Elsevier, Springer, | |
| IEEE, Wiley, Taylor & Francis. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import re | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| from pathlib import Path | |
| from langchain_core.tools import tool | |
| from langchain_mistralai import ChatMistralAI | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_core.output_parsers import JsonOutputParser | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.cluster import AgglomerativeClustering | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.preprocessing import normalize | |
| from sklearn.decomposition import PCA | |
| RUN_CONFIGS = { | |
| "abstract": ["Abstract"], | |
| "title": ["Title"], | |
| } | |
| PAJAIS_25 = [ | |
| "Accounting Information Systems", | |
| "Artificial Intelligence & Expert Systems", | |
| "Big Data & Analytics", | |
| "Business Intelligence & Decision Support", | |
| "Cloud Computing", | |
| "Cybersecurity & Privacy", | |
| "Database Management", | |
| "Digital Transformation", | |
| "E-Business & E-Commerce", | |
| "Enterprise Resource Planning", | |
| "Fintech & Digital Finance", | |
| "Geographic Information Systems", | |
| "Health Informatics", | |
| "Human-Computer Interaction", | |
| "Information Systems Development", | |
| "IT Governance & Management", | |
| "IT Strategy & Competitive Advantage", | |
| "Knowledge Management", | |
| "Machine Learning & Deep Learning", | |
| "Mobile Computing", | |
| "Natural Language Processing", | |
| "Recommender Systems", | |
| "Social Media & Web 2.0", | |
| "Supply Chain & Logistics IS", | |
| "Virtual Reality & Augmented Reality", | |
| ] | |
| BOILERPLATE_PATTERNS = [ | |
| r"Β©\s*\d{4}", | |
| r"all rights reserved", | |
| r"published by elsevier", | |
| r"this article is protected", | |
| r"doi:\s*10\.\d{4,}", | |
| r"springer nature", | |
| r"ieee xplore", | |
| r"wiley online library", | |
| r"taylor & francis", | |
| ] | |
| BOILERPLATE_RE = re.compile("|".join(BOILERPLATE_PATTERNS), flags=re.IGNORECASE) | |
| SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+") | |
| EMBED_MODEL = "all-MiniLM-L6-v2" | |
| N_CENTROIDS = 200 | |
| CLUSTER_THRESHOLD = 0.50 | |
| MIN_CLUSTER_SIZE = 5 | |
| TOP_TOPICS_LLM = 100 | |
| LABEL_BATCH_SIZE = 20 | |
| NARRATIVE_WORDS = 500 | |
| def _clean_text(text: str) -> str: | |
| """Remove publisher boilerplate from a single text string. | |
| Applies 9-pattern BOILERPLATE_RE regex to strip copyright notices, | |
| DOI prefixes, and publisher tags that would pollute embeddings. | |
| Args: | |
| text: Raw abstract or title string. | |
| Returns: | |
| Cleaned string with boilerplate removed and whitespace trimmed. | |
| """ | |
| return BOILERPLATE_RE.sub("", str(text)).strip() | |
| def _sentence_count(text: str) -> int: | |
| """Count sentences using regex split on terminal punctuation. | |
| Args: | |
| text: Cleaned abstract or title text. | |
| Returns: | |
| Number of sentences (minimum 1 for any non-empty input). | |
| """ | |
| return len(SENTENCE_SPLIT_RE.split(text.strip())) | |
| def _embed(texts: list[str]) -> np.ndarray: | |
| """Embed texts into 384d L2-normalized unit vectors. | |
| Uses SentenceTransformer('all-MiniLM-L6-v2') locally β no API calls. | |
| normalize_embeddings=True ensures cosine_similarity = dot product. | |
| Args: | |
| texts: List of N cleaned text strings. | |
| Returns: | |
| np.ndarray shape (N, 384), dtype float32, L2-normalized. | |
| """ | |
| model = SentenceTransformer(EMBED_MODEL) | |
| raw = model.encode(texts, show_progress_bar=False, normalize_embeddings=True) | |
| return np.array(raw, dtype=np.float32) | |
| def _cosine_cluster(matrix: np.ndarray, threshold: float, min_size: int) -> np.ndarray: | |
| """Cluster embeddings using agglomerative cosine clustering. | |
| Works DIRECTLY in 384d space β no UMAP. After clustering, any cluster | |
| with fewer than min_size members is dissolved: its sentences get | |
| label=-1 (orphan) and are reported to the reviewer for reassignment. | |
| Algorithm: | |
| 1. Start: every text is its own cluster. | |
| 2. Merge the two closest clusters (average cosine distance). | |
| 3. Repeat until smallest distance exceeds threshold. | |
| 4. Post-process: dissolve clusters smaller than min_size. | |
| Args: | |
| matrix: (N, 384) embedding matrix, L2-normalized. | |
| threshold: Max cosine distance for merging (0.7 β ~100 clusters). | |
| min_size: Minimum members per cluster (3). Smaller β orphan. | |
| Returns: | |
| np.ndarray shape (N,) with integer labels. -1 = orphan. | |
| """ | |
| normed = normalize(matrix, norm="l2") | |
| model = AgglomerativeClustering( | |
| n_clusters=None, | |
| metric="cosine", | |
| linkage="average", | |
| distance_threshold=threshold, | |
| ) | |
| labels = model.fit_predict(normed).astype(int) | |
| unique, counts = np.unique(labels, return_counts=True) | |
| small_clusters = unique[counts < min_size] | |
| return np.where(np.isin(labels, small_clusters), -1, labels) | |
| def _centroid(vecs: np.ndarray) -> np.ndarray: | |
| """Compute L2-normalized centroid (average direction in 384d space). | |
| Args: | |
| vecs: (M, 384) matrix of member embeddings for one cluster. | |
| Returns: | |
| 1d np.ndarray shape (384,), L2-normalized. | |
| """ | |
| return normalize(vecs.mean(axis=0, keepdims=True), norm="l2")[0] | |
| def _top_n_centroids(matrix: np.ndarray, labels: np.ndarray, n: int) -> list[dict]: | |
| """Extract N largest clusters by size and compute their centroids. | |
| Excludes orphans (label=-1) from the ranking. | |
| Args: | |
| matrix: (N, 384) full embedding matrix. | |
| labels: (N,) integer cluster labels (-1 = orphan). | |
| n: How many top clusters to return. | |
| Returns: | |
| List of N dicts with: label, size, indices, centroid. | |
| """ | |
| valid_mask = labels >= 0 | |
| valid_labels = labels[valid_mask] | |
| unique, counts = np.unique(valid_labels, return_counts=True) | |
| order = np.argsort(counts)[::-1][:n] | |
| top_labels = unique[order] | |
| def _build(lbl: int) -> dict: | |
| """Build summary dict for one cluster.""" | |
| idx = np.where(labels == lbl)[0].tolist() | |
| return { | |
| "label": int(lbl), | |
| "size": len(idx), | |
| "indices": idx, | |
| "centroid": _centroid(matrix[idx]), | |
| } | |
| return list(map(_build, top_labels)) | |
| def _mistral_chain(template_str: str): | |
| """Create PromptTemplate β ChatMistralAI β JsonOutputParser chain. | |
| Args: | |
| template_str: Prompt template with {variable} placeholders. | |
| Returns: | |
| LangChain Runnable chain that accepts dict and returns parsed JSON. | |
| """ | |
| llm = ChatMistralAI( | |
| model="mistral-large-latest", | |
| temperature=0, | |
| timeout=240, | |
| max_retries=3, | |
| ) | |
| prompt = PromptTemplate.from_template(template_str) | |
| return prompt | llm | JsonOutputParser() | |
| def _dark_layout(title: str) -> dict: | |
| """Return Plotly layout dict with dark theme styling. | |
| Args: | |
| title: Chart title string. | |
| Returns: | |
| Dict for fig.update_layout(**_dark_layout("...")). | |
| """ | |
| return dict( | |
| title=title, paper_bgcolor="#0F172A", plot_bgcolor="#0F172A", | |
| font=dict(color="#CBD5E1", family="Sora,sans-serif"), | |
| margin=dict(t=50, b=40, l=40, r=20), | |
| ) | |
| def load_scopus_csv(csv_path: str, run_mode: str = "abstract") -> str: | |
| """Load a Scopus CSV, count papers/sentences, apply boilerplate filter. | |
| Phase 1 β Familiarisation with the Data. DETERMINISTIC. | |
| Steps: | |
| 1. Read CSV, drop rows where target column is null | |
| 2. Apply 9-pattern boilerplate regex to clean each text | |
| 3. Count sentences per paper | |
| 4. Save cleaned DataFrame as .parquet | |
| Args: | |
| csv_path: Path to raw Scopus CSV. | |
| run_mode: 'abstract' or 'title'. | |
| Returns: | |
| JSON: total_papers, total_sentences, columns_used, | |
| boilerplate_removed, cleaned_parquet, run_mode. | |
| """ | |
| cols = RUN_CONFIGS[run_mode] | |
| target = cols[0] | |
| df = pd.read_csv(csv_path).dropna(subset=[target]).reset_index(drop=True) | |
| raw_texts = df[target].tolist() | |
| cleaned_texts = list(map(_clean_text, raw_texts)) | |
| boilerplate_removed = sum(map( | |
| lambda pair: int(pair[0] != pair[1]), | |
| zip(raw_texts, cleaned_texts), | |
| )) | |
| df[f"{target}_clean"] = cleaned_texts | |
| df["sentence_count"] = list(map(_sentence_count, cleaned_texts)) | |
| out_path = Path(csv_path).with_suffix(".clean.parquet") | |
| df.to_parquet(out_path, index=False) | |
| return json.dumps({ | |
| "total_papers": len(df), | |
| "total_sentences": int(df["sentence_count"].sum()), | |
| "columns_used": cols, | |
| "boilerplate_removed": boilerplate_removed, | |
| "cleaned_parquet": str(out_path), | |
| "run_mode": run_mode, | |
| }, indent=2) | |
| def run_bertopic_discovery(parquet_path: str, run_mode: str = "abstract") -> str: | |
| """Embed texts, cluster them, report orphans, generate charts. | |
| Phase 2 β Generating Initial Codes. DETERMINISTIC. | |
| Steps: | |
| 1. Load cleaned parquet, drop Author Keywords columns (RULE 8) | |
| 2. Embed all texts β N x 384 matrix of unit vectors | |
| 3. Save embedding matrix as .emb.npy | |
| 4. Cluster in 384d space (NO UMAP), min 3 members per cluster | |
| 5. Sentences in clusters < 3 members become orphans (label=-1) | |
| 6. Extract top-N clusters by size, compute centroids | |
| 7. Save summaries.json with clusters + orphan list | |
| 8. Generate 4 Plotly HTML charts | |
| Args: | |
| parquet_path: Path to .clean.parquet from load_scopus_csv. | |
| run_mode: 'abstract' or 'title'. | |
| Returns: | |
| JSON: total_clusters, orphan_count, summaries_json, embeddings_npy, | |
| charts dict. | |
| """ | |
| cols = RUN_CONFIGS[run_mode] | |
| target = f"{cols[0]}_clean" | |
| df = pd.read_parquet(parquet_path).drop( | |
| columns=[c for c in pd.read_parquet(parquet_path).columns | |
| if re.search(r"keyword|author", c, re.I)], | |
| errors="ignore", | |
| ) | |
| paper_texts = df[target].tolist() | |
| sentence_records = list(filter( | |
| lambda r: len(r["text"].split()) >= 5, | |
| [ | |
| {"paper_idx": paper_i, "sent_idx": sent_i, "text": sent.strip()} | |
| for paper_i, paper_text in enumerate(paper_texts) | |
| for sent_i, sent in enumerate(SENTENCE_SPLIT_RE.split(paper_text or "")) | |
| if sent.strip() | |
| ], | |
| )) | |
| texts = list(map(lambda r: r["text"], sentence_records)) | |
| paper_idx = list(map(lambda r: r["paper_idx"], sentence_records)) | |
| embeddings = _embed(texts) | |
| base = Path(parquet_path).parent | |
| np.save(str(base / Path(parquet_path).stem) + ".emb.npy", embeddings) | |
| (base / "sentences.json").write_text(json.dumps({ | |
| "texts": texts, | |
| "paper_idx": paper_idx, | |
| })) | |
| labels = _cosine_cluster(embeddings, CLUSTER_THRESHOLD, MIN_CLUSTER_SIZE) | |
| orphan_idx = np.where(labels == -1)[0].tolist() | |
| orphan_count = len(orphan_idx) | |
| valid_count = int((labels >= 0).sum()) | |
| n_clusters = int(np.unique(labels[labels >= 0]).shape[0]) | |
| n_papers = len(set(paper_idx)) | |
| n_sentences = len(texts) | |
| top_centroids = _top_n_centroids(embeddings, labels, N_CENTROIDS) | |
| def _topic_row(tc: dict) -> dict: | |
| """Convert centroid dict into summary row for summaries.json.""" | |
| return { | |
| "topic_id": tc["label"], | |
| "size": tc["size"], | |
| "representative": texts[tc["indices"][0]][:200], | |
| "indices": tc["indices"], | |
| } | |
| summaries = list(map(_topic_row, top_centroids)) | |
| orphans = list(map( | |
| lambda i: {"sentence_idx": int(i), "text": texts[i][:200]}, | |
| orphan_idx, | |
| )) | |
| output = {"clusters": summaries, "orphans": orphans} | |
| (base / "summaries.json").write_text(json.dumps(output, indent=2)) | |
| unique, counts = np.unique(labels[labels >= 0], return_counts=True) | |
| order = np.argsort(counts)[::-1][:20] | |
| c1 = go.Figure(go.Bar( | |
| x=list(map(str, unique[order])), y=counts[order].tolist(), | |
| marker_color="#3B82F6", text=counts[order].tolist(), textposition="outside", | |
| )) | |
| c1.update_layout(**_dark_layout("Topic Size Distribution (Top 20)"), | |
| xaxis=dict(showgrid=False), | |
| yaxis=dict(showgrid=True, gridcolor="#1E293B")) | |
| c1.write_html(str(base / "chart_topic_sizes.html")) | |
| centroid_matrix = np.vstack([tc["centroid"] for tc in top_centroids]) | |
| sim_matrix = cosine_similarity(centroid_matrix) | |
| clabels = list(map(lambda tc: f"T{tc['label']}", top_centroids)) | |
| c2 = go.Figure(go.Heatmap(z=sim_matrix, x=clabels, y=clabels, colorscale="Blues")) | |
| c2.update_layout(**_dark_layout("Top-5 Centroid Cosine Similarity")) | |
| c2.write_html(str(base / "chart_centroid_heatmap.html")) | |
| sc = df.get("sentence_count", pd.Series([0] * len(df))).tolist() | |
| c3 = go.Figure(go.Histogram(x=sc, nbinsx=40, marker_color="#22D3EE")) | |
| c3.update_layout(**_dark_layout("Sentence Count Distribution"), | |
| xaxis=dict(showgrid=False), | |
| yaxis=dict(showgrid=True, gridcolor="#1E293B")) | |
| c3.write_html(str(base / "chart_sentence_distribution.html")) | |
| coords = PCA(n_components=2).fit_transform(centroid_matrix) | |
| point_text = list(map(lambda tc: f"T{tc['label']}({tc['size']})", top_centroids)) | |
| c4 = go.Figure(go.Scatter( | |
| x=coords[:, 0].tolist(), y=coords[:, 1].tolist(), | |
| mode="markers+text", text=point_text, textposition="top center", | |
| marker=dict(size=12, color="#F59E0B", line=dict(width=1, color="#0F172A")), | |
| )) | |
| c4.update_layout(**_dark_layout("Top-5 Centroids β PCA Projection")) | |
| c4.write_html(str(base / "chart_centroid_pca.html")) | |
| emb_path = str(base / Path(parquet_path).stem) + ".emb.npy" | |
| return json.dumps({ | |
| "total_clusters": n_clusters, | |
| "orphan_count": orphan_count, | |
| "valid_sentences": valid_count, | |
| "total_sentences": n_sentences, | |
| "total_papers": n_papers, | |
| "top_centroids": N_CENTROIDS, | |
| "summaries_json": str(base / "summaries.json"), | |
| "embeddings_npy": emb_path, | |
| "needs_review": True, | |
| "charts": { | |
| "topic_sizes": str(base / "chart_topic_sizes.html"), | |
| "centroid_heatmap": str(base / "chart_centroid_heatmap.html"), | |
| "sentence_dist": str(base / "chart_sentence_distribution.html"), | |
| "centroid_pca": str(base / "chart_centroid_pca.html"), | |
| }, | |
| }, indent=2) | |
| def label_topics_with_llm(summaries_json_path: str) -> str: | |
| """Send top-100 topic summaries to Mistral for labelling. | |
| Phase 2 β Naming Initial Codes. LLM-DEPENDENT (grounded in real data extracts). | |
| NOTE: Prefer run_phase_1_and_2 for the standard Phase 2 entry point. | |
| This tool is kept for backwards compatibility and edge-case re-labelling. | |
| Args: | |
| summaries_json_path: Path to summaries.json. | |
| Returns: | |
| JSON: labelled_topics count + output path. needs_review=True. | |
| """ | |
| data = json.loads(Path(summaries_json_path).read_text()) | |
| summaries = data.get("clusters", data)[:TOP_TOPICS_LLM] | |
| result = _label_summaries_with_mistral(summaries) | |
| out_path = Path(summaries_json_path).parent / "topic_labels.json" | |
| out_path.write_text(json.dumps(result, indent=2)) | |
| return json.dumps({ | |
| "labelled_topics": len(result), | |
| "output": str(out_path), | |
| "needs_review": True, | |
| }, indent=2) | |
| def _label_summaries_with_mistral(summaries: list[dict]) -> list[dict]: | |
| """Internal helper: send cluster summaries to Mistral for labelling in batches. | |
| Batches into groups of LABEL_BATCH_SIZE (20) to avoid Mistral API | |
| timeouts that occur when sending all 100 summaries in one prompt. | |
| Each batch is a separate API call; results are concatenated. | |
| Returns a list of dicts with topic_id, label, rationale, confidence. | |
| Used by both label_topics_with_llm and run_phase_1_and_2. | |
| """ | |
| template = ( | |
| "You are a scientific topic labelling expert.\n\n" | |
| "Below are {n} topic summaries from a BERTopic analysis of academic papers.\n" | |
| "Each summary has: topic_id, size, representative text.\n\n" | |
| "{summaries}\n\n" | |
| "For EACH topic return a JSON array where every element has:\n" | |
| " topic_id : integer (copy from input)\n" | |
| " label : 2-5 word snake_case topic label\n" | |
| " rationale : one sentence justification\n" | |
| " confidence : float 0.0-1.0\n\n" | |
| "Return ONLY the JSON array β no markdown, no preamble." | |
| ) | |
| chain = _mistral_chain(template) | |
| batches = [summaries[i:i + LABEL_BATCH_SIZE] | |
| for i in range(0, len(summaries), LABEL_BATCH_SIZE)] | |
| results = list(map( | |
| lambda batch: chain.invoke({ | |
| "n": len(batch), | |
| "summaries": json.dumps(batch, indent=2), | |
| }), | |
| batches, | |
| )) | |
| return sum(results, []) | |
| def run_phase_1_and_2(csv_path: str, run_mode: str = "abstract") -> str: | |
| """Execute Phase 1 (Familiarisation) + Phase 2 (Generating Initial Codes) | |
| in a SINGLE tool call. The canonical entry point for analysis. | |
| This is the ONE tool the agent should call when the user clicks | |
| "Run analysis on abstracts" or "Run analysis on titles". | |
| Internally performs: | |
| 1. Phase 1 β Familiarisation with the Data: | |
| - Load Scopus CSV, drop rows with empty target column | |
| - Apply boilerplate regex cleaner | |
| - Save .clean.parquet | |
| 2. Phase 2a β Sentence Splitting & Embedding: | |
| - Split each cleaned data item into sentences | |
| - Filter to sentences with >= 5 words | |
| - Embed with Sentence-BERT all-MiniLM-L6-v2 | |
| - Save .emb.npy + sentences.json | |
| 3. Phase 2b β Cosine Agglomerative Clustering: | |
| - sklearn.cluster.AgglomerativeClustering with metric='cosine', | |
| linkage='average', distance_threshold=0.50 | |
| - Enforce minimum 5 extracts per code (smaller β orphan) | |
| - Save summaries.json (top N centroids) | |
| 4. Phase 2c β LLM Naming via Mistral: | |
| - Top 100 codes (by size) sent to Mistral for snake_case labels | |
| - Save topic_labels.json | |
| All checkpoint files are saved to the SAME directory as csv_path, | |
| forming a workspace that downstream tools can discover via workspace_dir. | |
| Args: | |
| csv_path: Path to raw Scopus CSV. | |
| run_mode: 'abstract' or 'title' β which column to analyse. | |
| Returns: | |
| JSON with combined Phase 1 + Phase 2 metrics: | |
| phase_1: data_items, data_extracts, boilerplate_removed | |
| phase_2: initial_codes, orphan_extracts, labelled_count | |
| workspace_dir: directory containing all checkpoints | |
| needs_review: True (Phase 2 STOP gate awaits) | |
| """ | |
| cols = RUN_CONFIGS[run_mode] | |
| target = cols[0] | |
| df = pd.read_csv(csv_path).dropna(subset=[target]).reset_index(drop=True) | |
| raw_texts = df[target].tolist() | |
| cleaned_texts = list(map(_clean_text, raw_texts)) | |
| boilerplate_removed = sum(map( | |
| lambda pair: int(pair[0] != pair[1]), | |
| zip(raw_texts, cleaned_texts), | |
| )) | |
| df[f"{target}_clean"] = cleaned_texts | |
| df["sentence_count"] = list(map(_sentence_count, cleaned_texts)) | |
| workspace = Path(csv_path).parent | |
| parquet_path = workspace / (Path(csv_path).stem + ".clean.parquet") | |
| df.to_parquet(parquet_path, index=False) | |
| sentence_records = list(filter( | |
| lambda r: len(r["text"].split()) >= 5, | |
| [ | |
| {"paper_idx": paper_i, "sent_idx": sent_i, "text": sent.strip()} | |
| for paper_i, paper_text in enumerate(cleaned_texts) | |
| for sent_i, sent in enumerate(SENTENCE_SPLIT_RE.split(paper_text or "")) | |
| if sent.strip() | |
| ], | |
| )) | |
| texts = list(map(lambda r: r["text"], sentence_records)) | |
| paper_idx = list(map(lambda r: r["paper_idx"], sentence_records)) | |
| embeddings = _embed(texts) | |
| np.save(str(workspace / Path(csv_path).stem) + ".emb.npy", embeddings) | |
| (workspace / "sentences.json").write_text(json.dumps({ | |
| "texts": texts, | |
| "paper_idx": paper_idx, | |
| })) | |
| labels = _cosine_cluster(embeddings, CLUSTER_THRESHOLD, MIN_CLUSTER_SIZE) | |
| orphan_idx = np.where(labels == -1)[0].tolist() | |
| orphan_count = len(orphan_idx) | |
| valid_count = int((labels >= 0).sum()) | |
| n_clusters = int(np.unique(labels[labels >= 0]).shape[0]) | |
| top_centroids = _top_n_centroids(embeddings, labels, N_CENTROIDS) | |
| summaries = list(map( | |
| lambda tc: { | |
| "topic_id": int(tc["label"]), | |
| "size": tc["size"], | |
| "representative": texts[tc["indices"][0]][:200], | |
| "indices": tc["indices"], | |
| }, | |
| top_centroids, | |
| )) | |
| orphans = list(map( | |
| lambda i: {"sentence_idx": int(i), "text": texts[i][:200]}, | |
| orphan_idx, | |
| )) | |
| (workspace / "summaries.json").write_text(json.dumps({ | |
| "clusters": summaries, | |
| "orphans": orphans, | |
| }, indent=2)) | |
| labelling_input = list(map( | |
| lambda s: {k: v for k, v in s.items() if k != "indices"}, | |
| summaries[:TOP_TOPICS_LLM], | |
| )) | |
| labelled = _label_summaries_with_mistral(labelling_input) | |
| indices_by_id = {s["topic_id"]: s["indices"] for s in summaries} | |
| enriched = list(map( | |
| lambda l: {**l, | |
| "topic_id": int(l.get("topic_id", -1)), | |
| "size": len(indices_by_id.get(int(l.get("topic_id", -1)), [])), | |
| "indices": indices_by_id.get(int(l.get("topic_id", -1)), [])}, | |
| labelled, | |
| )) | |
| (workspace / "topic_labels.json").write_text(json.dumps(enriched, indent=2)) | |
| return json.dumps({ | |
| "phase_1": { | |
| "data_items": len(df), | |
| "data_extracts": len(texts), | |
| "boilerplate_removed": boilerplate_removed, | |
| }, | |
| "phase_2": { | |
| "initial_codes": n_clusters, | |
| "labelled_count": len(enriched), | |
| "orphan_extracts": orphan_count, | |
| "min_cluster": MIN_CLUSTER_SIZE, | |
| }, | |
| "workspace_dir": str(workspace), | |
| "summaries_json": str(workspace / "summaries.json"), | |
| "labels_json": str(workspace / "topic_labels.json"), | |
| "embeddings_npy": str(workspace / Path(csv_path).stem) + ".emb.npy", | |
| "sentences_json": str(workspace / "sentences.json"), | |
| "needs_review": True, | |
| }, indent=2) | |
| def reassign_sentences( | |
| summaries_json_path: str, | |
| embeddings_npy_path: str, | |
| move_instructions: list[dict], | |
| ) -> str: | |
| """Move orphan or misplaced sentences between clusters. | |
| Phase 2 β Reassigning orphan data extracts. DETERMINISTIC. | |
| The reviewer specifies moves as a list of dicts: | |
| [{"sentence_idx": 42, "to_cluster": 3}, | |
| {"sentence_idx": 99, "to_cluster": "new"}] | |
| For "new" targets, a fresh cluster ID is assigned. | |
| After all moves, centroids are recomputed for affected clusters. | |
| Steps: | |
| 1. Load summaries.json and embeddings | |
| 2. Apply move instructions | |
| 3. Update cluster assignments | |
| 4. Recompute centroids for affected clusters | |
| 5. Save updated summaries.json | |
| Args: | |
| summaries_json_path: Path to summaries.json. | |
| embeddings_npy_path: Path to .emb.npy. | |
| move_instructions: List of dicts with sentence_idx (int) and | |
| to_cluster (int or "new") keys. | |
| Returns: | |
| JSON: moves_applied count, orphans_remaining, updated summaries path. | |
| """ | |
| data = json.loads(Path(summaries_json_path).read_text()) | |
| embeddings = np.load(embeddings_npy_path) | |
| moves = move_instructions | |
| clusters = data.get("clusters", []) | |
| orphans = data.get("orphans", []) | |
| all_indices = {} | |
| list(map( | |
| lambda c: all_indices.update({idx: c["topic_id"] for idx in c.get("indices", [])}), | |
| clusters, | |
| )) | |
| max_id = max(map(lambda c: c.get("topic_id", 0), clusters), default=0) | |
| new_id_counter = [max_id + 1] | |
| def _apply_move(m: dict) -> dict: | |
| """Apply one move instruction, return the resolved target cluster ID.""" | |
| s_idx = m["sentence_idx"] | |
| target = m["to_cluster"] | |
| resolved = (target == "new") and new_id_counter.__setitem__(0, new_id_counter[0] + 1) or target | |
| final_id = new_id_counter[0] - 1 * (target == "new") + target * (target != "new") | |
| all_indices[s_idx] = int(target) * (target != "new") + new_id_counter[0] * (target == "new") | |
| return {"sentence_idx": s_idx, "assigned_to": all_indices[s_idx]} | |
| applied = list(map(_apply_move, moves)) | |
| unique_clusters = set(all_indices.values()) | |
| def _rebuild_cluster(cid: int) -> dict: | |
| """Rebuild a cluster dict from the updated index map.""" | |
| idx = [k for k, v in all_indices.items() if v == cid] | |
| vecs = embeddings[idx or [0]] | |
| return { | |
| "topic_id": int(cid), | |
| "size": len(idx), | |
| "representative": "", | |
| "indices": idx, | |
| "centroid": _centroid(vecs).tolist(), | |
| } | |
| updated_clusters = list(map(_rebuild_cluster, sorted(unique_clusters))) | |
| remaining_orphan_idx = [o["sentence_idx"] for o in orphans | |
| if o["sentence_idx"] not in all_indices] | |
| output = { | |
| "clusters": updated_clusters, | |
| "orphans": list(map( | |
| lambda i: {"sentence_idx": i, "text": ""}, | |
| remaining_orphan_idx, | |
| )), | |
| } | |
| Path(summaries_json_path).write_text(json.dumps(output, indent=2)) | |
| return json.dumps({ | |
| "moves_applied": len(applied), | |
| "orphans_remaining": len(remaining_orphan_idx), | |
| "summaries_json": summaries_json_path, | |
| "needs_review": True, | |
| }, indent=2) | |
| def consolidate_into_themes( | |
| labels_json_path: str, | |
| embeddings_npy_path: str, | |
| approved_topic_ids: list[list[int]], | |
| ) -> str: | |
| """Merge approved topic groups into consolidated themes. | |
| Phase 3 β Searching for Themes. DETERMINISTIC. | |
| Steps: | |
| 1. Load topic_labels.json and embedding matrix | |
| 2. Pool all member embeddings per group | |
| 3. Compute fresh L2-normalized centroid per merged group | |
| 4. Build theme name from joined sub-labels | |
| 5. Save themes.json | |
| Args: | |
| labels_json_path: Path to topic_labels.json. | |
| embeddings_npy_path: Path to .emb.npy. | |
| approved_topic_ids: List of lists of initial-code IDs. | |
| Each inner list is one candidate theme. | |
| Example: [[0,1,2],[3,4],[5]] creates 3 | |
| candidate themes from 6 initial codes. | |
| Returns: | |
| JSON: themes_created count + themes_json path. needs_review=True. | |
| """ | |
| labels_data = json.loads(Path(labels_json_path).read_text()) | |
| embeddings = np.load(embeddings_npy_path) | |
| groups = approved_topic_ids | |
| label_map = {item["topic_id"]: item for item in labels_data} | |
| def _merge_group(group_ids: list[int]) -> dict: | |
| """Merge topic IDs into one theme, recompute centroid.""" | |
| members = [m for m in map(label_map.get, group_ids) if m is not None] | |
| all_idx = sum(map(lambda m: m.get("indices", []), members), []) | |
| vecs = embeddings[all_idx or [0]] | |
| centroid = _centroid(vecs) | |
| sub_labels = list(map(lambda m: m.get("label", ""), members)) | |
| theme_name = "_".join( | |
| dict.fromkeys(sum(map(lambda lbl: lbl.split("_"), sub_labels), [])) | |
| )[:60] | |
| return { | |
| "theme_id": group_ids[0], | |
| "theme_label": theme_name, | |
| "merged_ids": group_ids, | |
| "total_papers": len(set(all_idx)), | |
| "indices": all_idx, | |
| "centroid": centroid.tolist(), | |
| } | |
| themes = list(map(_merge_group, groups)) | |
| out_path = Path(labels_json_path).parent / "themes.json" | |
| out_path.write_text(json.dumps(themes, indent=2)) | |
| return json.dumps({ | |
| "themes_created": len(themes), | |
| "themes_json": str(out_path), | |
| "needs_review": True, | |
| }, indent=2) | |
| def compute_saturation( | |
| themes_json_path: str, | |
| embeddings_npy_path: str, | |
| total_papers: int, | |
| ) -> str: | |
| """Compute saturation metrics per theme: coverage, coherence, balance. | |
| Phase 4 β Reviewing Themes. DETERMINISTIC. | |
| Every number in the output is computed by numpy β the LLM never | |
| calculates these values. This eliminates hallucination risk for | |
| percentages, scores, and ratios. | |
| Metrics per theme: | |
| coverage = papers_in_theme / total_papers (exact percentage) | |
| coherence = mean pairwise cosine similarity of member embeddings | |
| (1.0 = all identical, 0.0 = orthogonal) | |
| Global metrics: | |
| total_coverage = papers in at least one theme / total_papers | |
| balance_ratio = largest_theme / smallest_theme | |
| mean_coherence = average of per-theme coherence scores | |
| Args: | |
| themes_json_path: Path to themes.json. | |
| embeddings_npy_path: Path to .emb.npy. | |
| total_papers: Total papers in corpus (from Phase 1 stats). | |
| Returns: | |
| JSON: per-theme metrics + global metrics. needs_review=True. | |
| """ | |
| themes = json.loads(Path(themes_json_path).read_text()) | |
| embeddings = np.load(embeddings_npy_path) | |
| def _theme_metrics(t: dict) -> dict: | |
| """Compute coverage and coherence for one theme.""" | |
| idx = t.get("indices", []) | |
| size = len(idx) | |
| vecs = embeddings[idx or [0]] | |
| sim = cosine_similarity(vecs) | |
| n = len(vecs) | |
| coherence = float( | |
| (sim.sum() - n) / max(n * (n - 1), 1) | |
| ) | |
| return { | |
| "theme_id": t.get("theme_id", 0), | |
| "theme_label": t.get("theme_label", ""), | |
| "papers": size, | |
| "coverage_pct": round(size / max(total_papers, 1) * 100, 2), | |
| "coherence": round(coherence, 4), | |
| } | |
| per_theme = list(map(_theme_metrics, themes)) | |
| all_paper_idx = set(sum(map(lambda t: t.get("indices", []), themes), [])) | |
| sizes = list(map(lambda m: m["papers"], per_theme)) | |
| coherences = list(map(lambda m: m["coherence"], per_theme)) | |
| global_metrics = { | |
| "total_coverage_pct": round(len(all_paper_idx) / max(total_papers, 1) * 100, 2), | |
| "balance_ratio": round(max(sizes, default=1) / max(min(sizes, default=1), 1), 2), | |
| "mean_coherence": round(sum(coherences) / max(len(coherences), 1), 4), | |
| "theme_count": len(themes), | |
| } | |
| out_path = Path(themes_json_path).parent / "saturation.json" | |
| result = {"per_theme": per_theme, "global": global_metrics} | |
| out_path.write_text(json.dumps(result, indent=2)) | |
| return json.dumps({ | |
| **global_metrics, | |
| "per_theme": per_theme, | |
| "saturation_json": str(out_path), | |
| "needs_review": True, | |
| }, indent=2) | |
| def generate_theme_profiles( | |
| themes_json_path: str, | |
| embeddings_npy_path: str, | |
| texts_parquet_path: str, | |
| run_mode: str = "abstract", | |
| ) -> str: | |
| """Generate profile cards with top-5 nearest sentences per theme. | |
| Phase 5 β Defining and Naming Themes. DETERMINISTIC. | |
| For each theme centroid, computes cosine similarity against ALL | |
| embeddings and returns the 5 closest sentences. These are the | |
| REAL sentences from the corpus β not generated, not recalled | |
| from conversation history. The reviewer uses these to decide | |
| on final theme names. | |
| Steps: | |
| 1. Load themes.json with centroids | |
| 2. Load full embedding matrix (sentence-level) | |
| 3. Load sentences.json (the EXACT sentences that were embedded) | |
| 4. For each theme: cosine_similarity(centroid, all_embeddings) | |
| 5. Take top 5 by similarity score | |
| 6. Return exact sentence text + similarity score | |
| 7. Save profiles.json | |
| Args: | |
| themes_json_path: Path to themes.json. | |
| embeddings_npy_path: Path to .emb.npy. | |
| texts_parquet_path: Path to .clean.parquet (kept for compatibility, | |
| but sentences are now loaded from sentences.json | |
| which lives in the same directory). | |
| run_mode: 'abstract' or 'title'. | |
| Returns: | |
| JSON: profiles list with top-5 sentences per theme. needs_review=True. | |
| """ | |
| themes = json.loads(Path(themes_json_path).read_text()) | |
| embeddings = np.load(embeddings_npy_path) | |
| sentences_path = Path(themes_json_path).parent / "sentences.json" | |
| sentences_data = json.loads(sentences_path.read_text()) | |
| texts = sentences_data["texts"] | |
| def _profile(t: dict) -> dict: | |
| """Build a profile card for one theme: centroid β top 5 nearest.""" | |
| centroid = np.array(t["centroid"]).reshape(1, -1) | |
| sims = cosine_similarity(centroid, embeddings)[0] | |
| top5_idx = np.argsort(sims)[::-1][:5].tolist() | |
| top5 = list(map( | |
| lambda i: { | |
| "sentence_idx": i, | |
| "text": texts[i][:300], | |
| "similarity": round(float(sims[i]), 4), | |
| }, | |
| top5_idx, | |
| )) | |
| return { | |
| "theme_id": t.get("theme_id", 0), | |
| "theme_label": t.get("theme_label", ""), | |
| "total_papers": t.get("total_papers", 0), | |
| "top_5_sentences": top5, | |
| } | |
| profiles = list(map(_profile, themes)) | |
| out_path = Path(themes_json_path).parent / "profiles.json" | |
| out_path.write_text(json.dumps(profiles, indent=2)) | |
| return json.dumps({ | |
| "profiles_count": len(profiles), | |
| "profiles_json": str(out_path), | |
| "profiles": profiles, | |
| "needs_review": True, | |
| }, indent=2) | |
| def compare_with_taxonomy(themes_json_path: str) -> str: | |
| """Map each theme to PAJAIS 25 IS research categories via Mistral. | |
| Phase 5.5 β Taxonomy Alignment (extension). LLM-DEPENDENT. | |
| Themes with alignment_score < 0.50 are flagged as potentially NOVEL. | |
| Args: | |
| themes_json_path: Path to themes.json. | |
| Returns: | |
| JSON: themes_aligned count + taxonomy_file path. needs_review=True. | |
| """ | |
| themes = json.loads(Path(themes_json_path).read_text()) | |
| safe_themes = list(map( | |
| lambda t: {k: v for k, v in t.items() if k not in ("centroid", "indices")}, | |
| themes, | |
| )) | |
| template = ( | |
| "You are an IS research taxonomy expert.\n\n" | |
| "PAJAIS 25 Categories:\n{pajais}\n\n" | |
| "Research themes:\n{themes}\n\n" | |
| "For EACH theme return a JSON array where every element has:\n" | |
| " theme_label : string\n" | |
| " pajais_categories : list of 1-3 matching PAJAIS category names\n" | |
| " alignment_score : float 0.0-1.0\n" | |
| " notes : one sentence justification\n\n" | |
| "Return ONLY the JSON array β no markdown, no preamble." | |
| ) | |
| result = _mistral_chain(template).invoke({ | |
| "pajais": "\n".join(map(lambda c: f"- {c}", PAJAIS_25)), | |
| "themes": json.dumps(safe_themes, indent=2), | |
| }) | |
| out_path = Path(themes_json_path).parent / "taxonomy_alignment.json" | |
| out_path.write_text(json.dumps(result, indent=2)) | |
| return json.dumps({ | |
| "themes_aligned": len(result), | |
| "taxonomy_file": str(out_path), | |
| "needs_review": True, | |
| }, indent=2) | |
| def generate_comparison_csv( | |
| abstract_themes_path: str, | |
| title_themes_path: str, | |
| taxonomy_abstract_path: str, | |
| taxonomy_title_path: str, | |
| ) -> str: | |
| """Build side-by-side abstract vs title comparison CSV. | |
| Phase 6 β Report. DETERMINISTIC. | |
| Joins on PAJAIS_Category. Delta_Score = Abstract - Title. | |
| Args: | |
| abstract_themes_path: themes.json β abstract run. | |
| title_themes_path: themes.json β title run. | |
| taxonomy_abstract_path: taxonomy_alignment.json β abstract run. | |
| taxonomy_title_path: taxonomy_alignment.json β title run. | |
| Returns: | |
| JSON: comparison_csv path, total_rows, columns. needs_review=True. | |
| """ | |
| def _explode_taxonomy(path: str) -> pd.DataFrame: | |
| """Flatten taxonomy alignment into one row per PAJAIS category.""" | |
| data = json.loads(Path(path).read_text()) | |
| rows = sum( | |
| list(map( | |
| lambda item: list(map( | |
| lambda cat: { | |
| "pajais_category": cat, | |
| "theme_label": item.get("theme_label", ""), | |
| "alignment_score": item.get("alignment_score", 0.0), | |
| }, | |
| item.get("pajais_categories", []), | |
| )), | |
| data, | |
| )), | |
| [], | |
| ) | |
| return pd.DataFrame(rows) | |
| df_abs = _explode_taxonomy(taxonomy_abstract_path) | |
| df_title = _explode_taxonomy(taxonomy_title_path) | |
| df_abs.columns = ["PAJAIS_Category", "Abstract_Theme", "Abstract_Score"] | |
| df_title.columns = ["PAJAIS_Category", "Title_Theme", "Title_Score"] | |
| merged = ( | |
| pd.merge(df_abs, df_title, on="PAJAIS_Category", how="outer") | |
| .fillna({"Abstract_Score": 0.0, "Title_Score": 0.0, | |
| "Abstract_Theme": "", "Title_Theme": ""}) | |
| .assign(Delta_Score=lambda d: (d["Abstract_Score"] - d["Title_Score"]).round(4)) | |
| .sort_values("PAJAIS_Category") | |
| .reset_index(drop=True) | |
| ) | |
| out_csv = Path(abstract_themes_path).parent / "abstract_vs_title_comparison.csv" | |
| merged.to_csv(out_csv, index=False) | |
| return json.dumps({ | |
| "comparison_csv": str(out_csv), | |
| "total_rows": len(merged), | |
| "columns": list(merged.columns), | |
| "needs_review": True, | |
| }, indent=2) | |
| def export_narrative( | |
| taxonomy_alignment_path: str, | |
| comparison_csv_path: str, | |
| run_mode: str = "abstract", | |
| ) -> str: | |
| """Generate 500-word Section 7: Discussion & Implications via Mistral. | |
| Phase 6 β Report. LLM-DEPENDENT (grounded in taxonomy + comparison data). | |
| Args: | |
| taxonomy_alignment_path: Path to taxonomy_alignment.json. | |
| comparison_csv_path: Path to comparison CSV. | |
| run_mode: 'abstract' or 'title'. | |
| Returns: | |
| JSON: narrative_path, word_count, narrative text. needs_review=True. | |
| """ | |
| alignment = json.loads(Path(taxonomy_alignment_path).read_text()) | |
| top_delta = ( | |
| pd.read_csv(comparison_csv_path) | |
| .assign(_abs=lambda d: d["Delta_Score"].abs()) | |
| .sort_values("_abs", ascending=False) | |
| .drop(columns=["_abs"]) | |
| .head(5) | |
| ) | |
| template = ( | |
| "You are a senior IS researcher writing a systematic literature review.\n\n" | |
| "Write Section 7: Discussion & Implications in exactly {word_count} words.\n\n" | |
| "Run mode: {run_mode}\n\n" | |
| "Taxonomy alignment (top 10):\n{alignment}\n\n" | |
| "Top 5 divergent PAJAIS categories (abstract vs title):\n{divergence}\n\n" | |
| "Requirements:\n" | |
| "1. Discuss dominant themes and PAJAIS alignment.\n" | |
| "2. Interpret divergence between abstract- and title-based models.\n" | |
| "3. Highlight implications for IS research practice and future agenda.\n" | |
| "4. Use formal academic register β no bullet points.\n" | |
| "5. Return a JSON object with a single key 'narrative' containing the prose.\n\n" | |
| "Return ONLY valid JSON." | |
| ) | |
| result = _mistral_chain(template).invoke({ | |
| "word_count": NARRATIVE_WORDS, | |
| "run_mode": run_mode, | |
| "alignment": json.dumps(alignment[:10], indent=2), | |
| "divergence": top_delta.to_json(orient="records", indent=2), | |
| }) | |
| narrative_text = result.get("narrative", str(result)) | |
| out_path = Path(taxonomy_alignment_path).parent / "narrative.md" | |
| out_path.write_text( | |
| f"## Section 7: Discussion & Implications\n\n{narrative_text}\n", | |
| encoding="utf-8", | |
| ) | |
| return json.dumps({ | |
| "narrative_path": str(out_path), | |
| "word_count": len(narrative_text.split()), | |
| "narrative": narrative_text, | |
| "needs_review": True, | |
| }, indent=2) | |
| ALL_TOOLS = [ | |
| run_phase_1_and_2, | |
| load_scopus_csv, | |
| run_bertopic_discovery, | |
| label_topics_with_llm, | |
| reassign_sentences, | |
| consolidate_into_themes, | |
| compute_saturation, | |
| generate_theme_profiles, | |
| compare_with_taxonomy, | |
| generate_comparison_csv, | |
| export_narrative, | |
| ] | |