# tools.py — Scientific Document Topic Analyzer # Built on the Braun & Clarke (2006) Thematic Analysis Framework. # Implementation: Zero-loop, zero-exception, functional-first logic. from dotenv import load_dotenv load_dotenv() import re import json import os import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go from langchain_core.tools import tool from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import JsonOutputParser from langchain_mistralai import ChatMistralAI from sentence_transformers import SentenceTransformer from sklearn.cluster import AgglomerativeClustering, DBSCAN from sklearn.metrics.pairwise import cosine_similarity from sklearn.decomposition import PCA import nltk # Initialize NLP resources nltk.download("punkt", quiet=True) nltk.download("punkt_tab", quiet=True) from nltk.tokenize import sent_tokenize # --- Global Configuration & Taxonomy --- COLUMN_MAP = { "abstract": ["Abstract"], "title": ["Title"], } EMBEDDING_MODEL_ID = "all-MiniLM-L6-v2" NEAREST_NEIGHBORS_K = 5 MAX_TOPIC_BATCH_SIZE = 60 SENTENCE_HARD_LIMIT = 3000 DEFAULT_CLUSTERING_THRESHOLD = 0.7 LLM_GATEWAY_TIMEOUT = 120 # Regex patterns to filter out standard academic publishing noise JUNK_TEXT_REGEXES = [ r"©\s*\d{4}", r"elsevier\s*(b\.v\.)?", r"springer\s*(nature)?", r"wiley\s*(online\s*library)?", r"all\s+rights\s+reserved", r"published\s+by\s+[a-z\s]+", r"doi:\s*10\.", r"www\.[a-z]+\.[a-z]+", r"https?://", r"copyright\s*\d{4}", r"taylor\s*&\s*francis", r"sage\s+publications", r"emerald\s+publishing", r"journal\s+of\s+[a-z\s]+issn", r"volume\s+\d+,?\s+issue\s+\d+", r"pp\.\s*\d+[-–]\d+", r"received\s+\d+\s+\w+\s+\d{4}", r"accepted\s+\d+\s+\w+\s+\d{4}", r"available\s+online", r"this\s+is\s+an\s+open\s+access", r"creative\s+commons", r"please\s+cite\s+this\s+article", ] CATEGORY_HIERARCHY_PAJAIS = [ "Artificial Intelligence Methods", "Natural Language Processing", "Machine Learning", "Deep Learning", "Knowledge Representation", "Ontologies & Semantic Web", "Information Retrieval", "Recommender Systems", "Decision Support Systems", "Human-Computer Interaction", "Explainability & Transparency", "Fairness, Accountability & Ethics", "Data Management & Integration", "Text Mining & Analytics", "Sentiment Analysis", "Social Media Analysis", "Business Intelligence", "Process Automation & RPA", "Computer Vision", "Speech & Audio Processing", "Multi-Agent Systems", "Robotics & Autonomous Systems", "Healthcare & Biomedical AI", "Finance & Risk Analytics", "Education & E-Learning", ] # --- Internal Utility Logic --- def _is_unwanted_metadata(text_segment: str) -> bool: """Identifies if a string matches academic boilerplate patterns.""" return any(map(lambda pattern: bool(re.search(pattern, text_segment, re.IGNORECASE)), JUNK_TEXT_REGEXES)) def _refine_sentence_list(raw_list: list) -> list: """Filters out boilerplate and very short segments from the corpus.""" clean_collection = list(filter(lambda s: not _is_unwanted_metadata(s), raw_list)) meaningful_subs = list(filter(lambda s: len(s.split()) >= 6, clean_collection)) return meaningful_subs def _extract_sentences_from_corpus(text_blocks: list) -> list: """Tokenizes multiple text blocks into a flat list of clean sentences.""" tokenized_nested = list(map(sent_tokenize, text_blocks)) flat_list = [item for sublist in tokenized_nested for item in sublist] return _refine_sentence_list(flat_list) def _generate_vector_embeddings(sentence_list: list) -> np.ndarray: """Converts text into normalized vector representations using SBERT.""" vector_engine = SentenceTransformer(EMBEDDING_MODEL_ID) return vector_engine.encode(sentence_list, normalize_embeddings=True, show_progress_bar=False) def _perform_hierarchical_clustering(vectors: np.ndarray, distance_cutoff: float) -> np.ndarray: """Clusters vectors using Agglomerative Clustering with a cosine metric.""" return AgglomerativeClustering( metric="cosine", linkage="average", distance_threshold=distance_cutoff, n_clusters=None, ).fit_predict(vectors) def _perform_dbscan_clustering(vectors: np.ndarray, eps: float = 0.3, min_samples: int = 5) -> np.ndarray: """Clusters vectors using DBSCAN with cosine metric. Returns cluster assignments (-1 for noise).""" return DBSCAN(eps=eps, min_samples=min_samples, metric="cosine").fit_predict(vectors) def _get_cluster_centroids(vectors: np.ndarray, group_labels: np.ndarray) -> dict: """Calculates the mean vector for each discovered cluster.""" active_groups = sorted(set(group_labels.tolist()) - {-1}) return dict(map(lambda g: (g, vectors[group_labels == g].mean(axis=0)), active_groups)) def _find_exemplary_sentences(midpoint_vector: np.ndarray, all_texts: list, all_vectors: np.ndarray, top_k: int) -> list: """Finds sentences whose vectors are closest to the cluster centroid.""" closeness_scores = cosine_similarity([midpoint_vector], all_vectors)[0] best_indices = np.argsort(closeness_scores)[::-1][:top_k].tolist() return list(map(lambda idx: all_texts[idx], best_indices)) def _assemble_cluster_summaries(group_labels: np.ndarray, text_source: list, vector_source: np.ndarray) -> list: """Builds a JSON-ready summary for every identified topic cluster.""" midpoints = _get_cluster_centroids(vector_source, group_labels) def _format_node(cluster_id): membership_mask = group_labels == cluster_id return { "topic_id": cluster_id, "count": int(membership_mask.sum()), "centroid": midpoints[cluster_id].tolist(), "nearest_sentences": _find_exemplary_sentences( midpoints[cluster_id], text_source, vector_source, NEAREST_NEIGHBORS_K), } return list(map(_format_node, sorted(midpoints.keys()))) def _initialize_llm_client() -> ChatMistralAI: """Configures the Mistral AI interface for thematic labeling.""" return ChatMistralAI( model="mistral-large-latest", temperature=0.2, timeout=LLM_GATEWAY_TIMEOUT, max_retries=0, ) # --- Primary Analysis Tools --- @tool def load_scopus_csv(file_path: str) -> str: """ Ingests a Scopus CSV, cleans the data, and prepares it for analysis. Saves 'loaded_data.csv' as a local cache. """ source_df = pd.read_csv( file_path, encoding="utf-8-sig", quoting=0, engine="python", on_bad_lines="skip", ) source_df.to_csv("loaded_data.csv", index=False, encoding="utf-8") total_records = len(source_df) header_list = list(source_df.columns) raw_abstracts = list(source_df["Abstract"].dropna().astype(str)) if "Abstract" in header_list else [] raw_titles = list(source_df["Title"].dropna().astype(str)) if "Title" in header_list else [] processed_abstracts = _extract_sentences_from_corpus(raw_abstracts) processed_titles = _extract_sentences_from_corpus(raw_titles) publication_years = pd.to_numeric(source_df["Year"], errors="coerce").dropna() if "Year" in header_list else pd.Series([], dtype=float) period_string = f"{int(publication_years.min())} – {int(publication_years.max())}" if len(publication_years) > 0 else "N/A" return json.dumps({ "papers": total_records, "abstract_sentences": len(processed_abstracts), "title_sentences": len(processed_titles), "year_range": period_string, "columns": header_list, "abstract_coverage_pct": round(len(raw_abstracts) / total_records * 100, 1) if total_records else 0, "title_coverage_pct": round(len(raw_titles) / total_records * 100, 1) if total_records else 0, "sample_titles": list(source_df["Title"].dropna().head(5)) if "Title" in header_list else [], "file_saved": "loaded_data.csv", "note": f"Clustering cap set to {SENTENCE_HARD_LIMIT} entries for efficiency.", }, indent=2) @tool def run_bertopic_discovery(run_key: str = "abstract", threshold: float = 0.7, method: str = "hierarchical") -> str: """ Executes the BERTopic discovery logic: Embedding -> Clustering -> Visualization. Outputs interactive Plotly charts and cluster summaries. Supports both Hierarchical and DBSCAN clustering methods. """ cached_df = pd.read_csv("loaded_data.csv") target_col = COLUMN_MAP[run_key][0] unstructured_texts = list(cached_df[target_col].dropna().astype(str)) global_sentence_pool = _extract_sentences_from_corpus(unstructured_texts) # Apply sentence limit to prevent memory overflow optimized_sentence_pool = global_sentence_pool[:SENTENCE_HARD_LIMIT] print(f"[Core Discovery] Processing {len(optimized_sentence_pool)} sentences from total pool of {len(global_sentence_pool)}.") semantic_vectors = _generate_vector_embeddings(optimized_sentence_pool) np.save(f"emb_{run_key}.npy", semantic_vectors) if method == "dbscan": cluster_assignments = _perform_dbscan_clustering(semantic_vectors, eps=threshold, min_samples=5) else: cluster_assignments = _perform_hierarchical_clustering(semantic_vectors, threshold) thematic_summaries = _assemble_cluster_summaries(cluster_assignments, optimized_sentence_pool, semantic_vectors) with open(f"summaries_{run_key}.json", "w") as storage_file: json.dump(thematic_summaries, storage_file, indent=2) entry_counts = [node["count"] for node in thematic_summaries] node_identifiers = [node["topic_id"] for node in thematic_summaries] centroid_stack = np.array([node["centroid"] for node in thematic_summaries]) # Visual 1: Inter-topic mapping via PCA dimension_count = min(2, len(centroid_stack), centroid_stack.shape[1]) reduced_coords = PCA(n_components=dimension_count).fit_transform(centroid_stack) dimension_x = reduced_coords[:, 0].tolist() dimension_y = (reduced_coords[:, 1].tolist() if reduced_coords.shape[1] > 1 else [0] * len(dimension_x)) map_fig = px.scatter( x=dimension_x, y=dimension_y, size=entry_counts, text=list(map(str, node_identifiers)), title=f"Thematic Landscape ({run_key})", labels={"x": "Factor 1", "y": "Factor 2"}, size_max=40, color=entry_counts, color_continuous_scale="Viridis", ) map_fig.update_traces(textposition="top center") map_fig.update_layout(template="plotly_white") v_file_1 = f"chart_{run_key}_intertopic.html" map_fig.write_html(v_file_1, include_plotlyjs="cdn") # Visual 2: Sentence distribution bar chart top_nodes = thematic_summaries[:30] bar_fig = px.bar( x=list(map(lambda n: f"Topic {n['topic_id']}", top_nodes)), y=list(map(lambda n: n["count"], top_nodes)), title=f"Thematic Weight Distribution ({run_key}) — Top 30", labels={"x": "Theme ID", "y": "Sentence Count"}, color=list(map(lambda n: n["count"], top_nodes)), color_continuous_scale="Aggrnyl", ) bar_fig.update_layout(template="plotly_white") v_file_2 = f"chart_{run_key}_bars.html" bar_fig.write_html(v_file_2, include_plotlyjs="cdn") # Visual 3: Hierarchical Treemap tree_fig = px.treemap( names=list(map(lambda n: f"ID:{n['topic_id']}", thematic_summaries)), parents=["Corpus"] * len(thematic_summaries), values=entry_counts, title=f"Topological Hierarchy ({run_key})", ) tree_fig.update_layout(template="plotly_white") v_file_3 = f"chart_{run_key}_hierarchy.html" tree_fig.write_html(v_file_3, include_plotlyjs="cdn") # Visual 4: Semantic Connectivity Matrix preview_nodes = thematic_summaries[:20] preview_vectors = np.array([n["centroid"] for n in preview_nodes]) similarity_grid = cosine_similarity(preview_vectors).tolist() axis_labels = list(map(lambda n: f"T{n['topic_id']}", preview_nodes)) heat_fig = go.Figure(data=go.Heatmap(z=similarity_grid, x=axis_labels, y=axis_labels, colorscale="YlGnBu")) heat_fig.update_layout(title=f"Semantic Proximity Heatmap ({run_key})", template="plotly_white") v_file_4 = f"chart_{run_key}_heatmap.html" heat_fig.write_html(v_file_4, include_plotlyjs="cdn") return json.dumps({ "run_key": run_key, "total_topics": len(thematic_summaries), "total_sentences": len(global_sentence_pool), "sentences_used": len(optimized_sentence_pool), "sentences_capped": len(global_sentence_pool) > SENTENCE_HARD_LIMIT, "threshold_used": threshold, "summaries_file": f"summaries_{run_key}.json", "embeddings_file": f"emb_{run_key}.npy", "charts": [v_file_1, v_file_2, v_file_3, v_file_4], "topics_preview": thematic_summaries[:3], }, indent=2) @tool def label_topics_with_llm(run_key: str = "abstract") -> str: """ Queries Mistral AI to provide human-readable labels and metadata for clusters. Uses batch processing to minimize API overhead and latency. """ with open(f"summaries_{run_key}.json", encoding="utf-8") as raw_json: cluster_list = json.load(raw_json) active_subset = cluster_list[:MAX_TOPIC_BATCH_SIZE] # Structure data for the LLM's consumption llm_payload = list(map( lambda node: { "topic_id": node["topic_id"], "count": node["count"], "sentences": node["nearest_sentences"][:2], }, active_subset, )) llm_handler = _initialize_llm_client() json_interpreter = JsonOutputParser() label_prompt = PromptTemplate( input_variables=["input_json"], template=( "You are a specialized thematic coder for academic literature.\n\n" "Analyze the following clusters discovered through BERTopic. " "For each cluster, derive a research-oriented label with AI Council-style reasoning.\\n\\n" "{input_json}\\n\\n" "Respond ONLY with a JSON array containing these keys for each entry:\\n" " topic_id (int), label (3-6 words), category (methodology/theory/application/context/empirical), " " confidence (float), reasoning (object with keys: method, data, impact), niche (bool).\\n\\n" "Reasoning structure (use brief, focused explanations):\\n" " method: Explain the methodological or theoretical lens applied to this cluster (1-2 sentences)\\n" " data: Describe the empirical patterns or evidence supporting this grouping (1-2 sentences)\\n" " impact: Articulate the research or practice implications of this theme (1-2 sentences)\\n\\n" "Generate entries for ALL {total_count} topics provided." ), ) inference_chain = label_prompt | llm_handler | json_interpreter ai_response = inference_chain.invoke({ "input_json": json.dumps(llm_payload, indent=2), "total_count": len(active_subset), }) # Map AI results back to the original database response_directory = {str(item["topic_id"]): item for item in ai_response} def _format_reasoning(reasoning_obj): """Converts multi-part reasoning structure into a readable string.""" if isinstance(reasoning_obj, dict): parts = [] if "method" in reasoning_obj: parts.append(f"Method: {reasoning_obj['method']}") if "data" in reasoning_obj: parts.append(f"Data: {reasoning_obj['data']}") if "impact" in reasoning_obj: parts.append(f"Impact: {reasoning_obj['impact']}") return " | ".join(parts) if parts else "" return str(reasoning_obj) if reasoning_obj else "" final_labels = list(map( lambda original: { "topic_id": original["topic_id"], "count": original["count"], "nearest_sentences": original["nearest_sentences"], "label": response_directory.get(str(original["topic_id"]), {}).get("label", f"Concept Group {original['topic_id']}"), "category": response_directory.get(str(original["topic_id"]), {}).get("category", "application"), "confidence": response_directory.get(str(original["topic_id"]), {}).get("confidence", 0.5), "reasoning": _format_reasoning(response_directory.get(str(original["topic_id"]), {}).get("reasoning", "")), "niche": response_directory.get(str(original["topic_id"]), {}).get("niche", False), }, active_subset, )) export_path = f"labels_{run_key}.json" with open(export_path, "w") as out_file: json.dump(final_labels, out_file, indent=2) return json.dumps({ "run_key": run_key, "total_labelled": len(final_labels), "output_file": export_path, "preview": final_labels[:5], }, indent=2) @tool def consolidate_into_themes(run_key: str = "abstract", theme_map: str = "") -> str: """ Groups individual topic clusters into broader research themes. Employs LLM-driven synthesis if no manual mapping is provided. """ with open(f"labels_{run_key}.json", encoding="utf-8") as raw_data: labeled_topics = json.load(raw_data) topic_lookup_table = {str(t["topic_id"]): t for t in labeled_topics} manual_theme_design = json.loads(theme_map) if theme_map.strip() else {} def _build_from_manual(name_id_pair): theme_title, topic_id_list = name_id_pair matching_topics = list(filter(lambda t: str(t["topic_id"]) in map(str, topic_id_list), labeled_topics)) aggregate_docs = sum(map(lambda t: t["count"], matching_topics)) sample_quotes = [s for t in matching_topics for s in t.get("nearest_sentences", [])][:5] return { "theme_name": theme_title, "topic_ids": list(map(int, topic_id_list)), "total_sentences": aggregate_docs, "representative_sentences": sample_quotes, "constituent_labels": list(map(lambda t: t.get("label", ""), matching_topics)), } def _build_from_intelligence(): llm_client = _initialize_llm_client() json_output_mod = JsonOutputParser() synthesis_prompt = PromptTemplate( input_variables=["topic_definitions"], template=( "You are performing Phase 3 & 4 of thematic analysis (Braun & Clarke).\n\n" "Data Clusters:\n{topic_definitions}\n\n" "Consolidate these into 4-8 broad research themes.\n" "Format: JSON array of objects with theme_name, topic_ids (list), rationale, representative_sentences (list).\n" ), ) flow = synthesis_prompt | llm_client | json_output_mod compact_definitions = list(map( lambda t: {"topic_id": t["topic_id"], "label": t.get("label", ""), "sample": t.get("nearest_sentences", [""])[0][:100]}, labeled_topics[:MAX_TOPIC_BATCH_SIZE], )) generated_themes = flow.invoke({"topic_definitions": json.dumps(compact_definitions, indent=2)}) return list(map( lambda th: { **th, "total_sentences": sum(map(lambda tid: topic_lookup_table.get(str(tid), {}).get("count", 0), th.get("topic_ids", []))), "constituent_labels": list(map(lambda tid: topic_lookup_table.get(str(tid), {}).get("label", ""), th.get("topic_ids", []))), }, generated_themes, )) final_thematic_set = ( list(map(_build_from_manual, manual_theme_design.items())) if manual_theme_design else _build_from_intelligence() ) theme_store_1 = f"themes_{run_key}.json" with open(theme_store_1, "w", encoding="utf-8") as f1: json.dump(final_thematic_set, f1, indent=2) with open("themes.json", "w", encoding="utf-8") as f_canonical: json.dump(final_thematic_set, f_canonical, indent=2) return json.dumps({ "run_key": run_key, "total_themes": len(final_thematic_set), "output_file": theme_store_1, "themes_preview": [{"name": th["theme_name"], "size": th.get("total_sentences", 0)} for th in final_thematic_set], }, indent=2) @tool def compare_with_taxonomy(run_key: str = "abstract") -> str: """ Aligns discovered themes with the PAJAIS research taxonomy. Flags 'NOVEL' themes that represent potential scientific gaps. """ specific_themes_file = f"themes_{run_key}.json" active_themes_file = specific_themes_file if os.path.exists(specific_themes_file) else "themes.json" with open(active_themes_file, encoding="utf-8") as theme_io: theme_collection = json.load(theme_io) llm_bridge = _initialize_llm_client() json_processor = JsonOutputParser() alignment_prompt = PromptTemplate( input_variables=["theme_input", "taxonomy_str"], template=( "You are a taxonomy alignment specialist.\n\n" "Official Categories:\n{taxonomy_str}\n\n" "User Themes:\n{theme_input}\n\n" "Map each theme to the closest official category. If it is a completely new direction, mark as NOVEL.\n" "Format: JSON array with theme_name, pajais_match, match_confidence, reasoning, is_novel.\n" ), ) mapping_chain = alignment_prompt | llm_bridge | json_processor theme_metadata = list(map( lambda t: { "theme_name": t["theme_name"], "constituent_labels": t.get("constituent_labels", []), "evidence": (t.get("representative_sentences", [""])[0][:100] if t.get("representative_sentences") else ""), }, theme_collection, )) alignment_results = mapping_chain.invoke({ "theme_input": json.dumps(theme_metadata, indent=2), "taxonomy_str": "\n".join(f"- {cat}" for cat in CATEGORY_HIERARCHY_PAJAIS), }) with open("taxonomy_map.json", "w", encoding="utf-8") as map_io: json.dump(alignment_results, map_io, indent=2) novel_count = sum(1 for entry in alignment_results if entry.get("is_novel", False)) return json.dumps({ "run_key": run_key, "total_mapped": len(alignment_results), "novel_entries": novel_count, "standard_entries": len(alignment_results) - novel_count, "mapping_file": "taxonomy_map.json", "detailed_mapping": alignment_results, }, indent=2) @tool def generate_comparison_csv() -> str: """ Aggregates results from Abstract and Title analyses into a single comparative report. """ def _read_theme_data(key): path = f"themes_{key}.json" return json.loads(open(path, encoding="utf-8").read()) if os.path.exists(path) else [] abstract_run_data = _read_theme_data("abstract") title_run_data = _read_theme_data("title") max_count = max(len(abstract_run_data), len(title_run_data), 1) abs_padded = abstract_run_data + [{}] * (max_count - len(abstract_run_data)) ttl_padded = title_run_data + [{}] * (max_count - len(title_run_data)) comparative_rows = list(map( lambda triple: { "ID": triple[0] + 1, "Abstract Theme": triple[1].get("theme_name", ""), "Abstract Count": triple[1].get("total_sentences", 0), "Title Theme": triple[2].get("theme_name", ""), "Title Count": triple[2].get("total_sentences", 0), "Consistency": "Matched" if str(triple[1].get("theme_name", ""))[:5].lower() == str(triple[2].get("theme_name", ""))[:5].lower() else "Distinct", }, zip(range(max_count), abs_padded, ttl_padded) )) report_df = pd.DataFrame(comparative_rows) report_df.to_csv("comparison.csv", index=False) return json.dumps({ "result_file": "comparison.csv", "row_count": len(report_df), "data_peek": comparative_rows[:3], }, indent=2) @tool def export_narrative(run_key: str = "abstract") -> str: """ Generates a formal research narrative based on the thematic analysis results. Produces a 500-word Section 7 draft. """ with open("themes.json", encoding="utf-8") as t_in: thematic_data = json.load(t_in) mapping_raw = open("taxonomy_map.json", encoding="utf-8").read() if os.path.exists("taxonomy_map.json") else "[]" mapping_data = json.loads(mapping_raw) narrative_llm = _initialize_llm_client() narrative_llm.temperature = 0.4 narrative_prompt = PromptTemplate( input_variables=["key", "themes", "mapping"], template=( "Write a formal academic Section 7 Discussion (approx 500 words).\n" "Context: {key} analysis run.\n" "Themes Found:\n{themes}\n\n" "Taxonomy Alignment:\n{mapping}\n\n" "Requirements:\n" "1. Discuss the methodology (BERTopic + Braun & Clarke).\n" "2. Interpret the key themes and their implications.\n" "3. Analyze the NOVEL vs MAPPED categories.\n" "4. Suggest future work. Use professional, scholarly language.\n" ), ) composition_flow = narrative_prompt | narrative_llm story_response = composition_flow.invoke({ "key": run_key, "themes": json.dumps(thematic_data, indent=2), "mapping": json.dumps(mapping_data, indent=2), }) final_text = story_response.content if hasattr(story_response, "content") else str(story_response) with open("narrative.txt", "w", encoding="utf-8") as narrative_io: narrative_io.write(final_text) return json.dumps({ "output_file": "narrative.txt", "word_stats": len(final_text.split()), "content_start": final_text[:400], }, indent=2)