Spaces:
Sleeping
Sleeping
| """ | |
| tools.py — Core functions for the AI-driven topic modeling pipeline. | |
| This module provides all analytical functions used by the TopicAgent: | |
| - CSV ingestion and validation | |
| - Text preprocessing (lowercasing, stopword removal, cleaning) | |
| - Topic modeling via BERTopic (with fallback to sklearn LDA) | |
| - Automatic human-readable label generation | |
| - Cross-source theme comparison (Title vs Abstract) | |
| - Taxonomy mapping (MAPPED / NOVEL classification) | |
| """ | |
| import re | |
| import json | |
| import logging | |
| from typing import Dict, List, Tuple, Optional, Any | |
| import numpy as np | |
| import pandas as pd | |
| import nltk | |
| from nltk.corpus import stopwords | |
| from nltk.tokenize import word_tokenize | |
| # --------------------------------------------------------------------------- | |
| # Logging | |
| # --------------------------------------------------------------------------- | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| # --------------------------------------------------------------------------- | |
| # NLTK data download (idempotent) | |
| # --------------------------------------------------------------------------- | |
| for _resource in ("punkt", "punkt_tab", "stopwords"): | |
| try: | |
| nltk.data.find(f"tokenizers/{_resource}" if "punkt" in _resource else f"corpora/{_resource}") | |
| except LookupError: | |
| nltk.download(_resource, quiet=True) | |
| # --------------------------------------------------------------------------- | |
| # Reference taxonomy of known AI / business / research themes | |
| # Used by create_taxonomy_map() for MAPPED vs NOVEL classification | |
| # --------------------------------------------------------------------------- | |
| KNOWN_THEMES: List[str] = [ | |
| # AI / ML | |
| "artificial intelligence", "machine learning", "deep learning", "neural network", | |
| "natural language processing", "computer vision", "reinforcement learning", | |
| "generative ai", "large language model", "transformer", "chatbot", | |
| "recommendation system", "knowledge graph", "robotics", "autonomous", | |
| "explainable ai", "federated learning", "transfer learning", "ai ethics", | |
| "adversarial", "gan", "diffusion model", "prompt engineering", | |
| # Data science | |
| "data mining", "big data", "analytics", "data science", "data quality", | |
| "feature engineering", "dimensionality reduction", "clustering", "classification", | |
| "regression", "time series", "anomaly detection", "sentiment analysis", | |
| # Business / Management | |
| "digital transformation", "innovation", "strategy", "supply chain", | |
| "customer experience", "marketing", "e-commerce", "fintech", "blockchain", | |
| "sustainability", "corporate social responsibility", "knowledge management", | |
| "decision support", "business intelligence", "enterprise", "organizational", | |
| "human resource", "leadership", "entrepreneurship", "business model", | |
| # Information systems | |
| "information systems", "technology adoption", "user acceptance", "privacy", | |
| "security", "trust", "social media", "online community", "platform", | |
| "crowdsourcing", "cloud computing", "iot", "internet of things", | |
| "software engineering", "agile", "devops", "digital platform", | |
| # Healthcare / Society | |
| "healthcare", "telemedicine", "electronic health", "public health", | |
| "education", "e-learning", "smart city", "government", "policy", | |
| "ethics", "fairness", "bias", "misinformation", "content moderation", | |
| # Research methods | |
| "survey", "experiment", "case study", "meta-analysis", "bibliometric", | |
| "systematic review", "structural equation", "grounded theory", | |
| ] | |
| # =================================================================== | |
| # 1. load_csv — Ingest and validate the CSV dataset | |
| # =================================================================== | |
| def load_csv(filepath: str) -> pd.DataFrame: | |
| """ | |
| Load a CSV file and ensure the required columns (Title, Abstract) exist. | |
| Parameters | |
| ---------- | |
| filepath : str | |
| Path to the CSV file. | |
| Returns | |
| ------- | |
| pd.DataFrame | |
| DataFrame with at least 'Title' and 'Abstract' columns. | |
| Raises | |
| ------ | |
| FileNotFoundError | |
| If the specified file does not exist. | |
| ValueError | |
| If required columns are missing. | |
| """ | |
| logger.info("Loading CSV from %s", filepath) | |
| df = pd.read_csv(filepath, encoding="utf-8", on_bad_lines="skip") | |
| logger.info("Loaded %d rows × %d columns", len(df), len(df.columns)) | |
| # Validate required columns (case-insensitive match) | |
| col_map = {c.strip().lower(): c for c in df.columns} | |
| required = {"title", "abstract"} | |
| missing = required - set(col_map.keys()) | |
| if missing: | |
| raise ValueError(f"CSV is missing required columns: {missing}. Found: {list(df.columns)}") | |
| # Rename to canonical form | |
| df = df.rename(columns={col_map["title"]: "Title", col_map["abstract"]: "Abstract"}) | |
| # Drop rows where both Title and Abstract are empty | |
| df = df.dropna(subset=["Title", "Abstract"], how="all").reset_index(drop=True) | |
| df["Title"] = df["Title"].fillna("") | |
| df["Abstract"] = df["Abstract"].fillna("") | |
| logger.info("After cleaning: %d usable rows", len(df)) | |
| return df | |
| # =================================================================== | |
| # 2. preprocess_text — Clean and normalise a list of text documents | |
| # =================================================================== | |
| def preprocess_text(documents: List[str]) -> List[str]: | |
| """ | |
| Apply professional-grade text preprocessing: | |
| 1. Lowercase | |
| 2. Remove URLs, emails, special characters, digits | |
| 3. Tokenize | |
| 4. Remove stopwords (NLTK English) | |
| 5. Remove very short tokens (length ≤ 2) | |
| 6. Rejoin into cleaned strings | |
| Parameters | |
| ---------- | |
| documents : list of str | |
| Raw text documents. | |
| Returns | |
| ------- | |
| list of str | |
| Cleaned text documents. | |
| """ | |
| stop_words = set(stopwords.words("english")) | |
| # Extended stopwords common in academic abstracts | |
| stop_words.update([ | |
| "©", "elsevier", "rights", "reserved", "doi", "http", "https", | |
| "vol", "pp", "fig", "table", "journal", "author", "authors", | |
| "study", "paper", "research", "results", "findings", "however", | |
| "propose", "proposed", "approach", "using", "based", "also", | |
| "show", "shows", "shown", "may", "used", "use", "one", "two", | |
| "three", "new", "well", "within", "among", "across", "toward", | |
| "towards", "et", "al", "ie", "eg", "cf", "thus", "therefore", | |
| "moreover", "furthermore", "addition", "conclusion", "conclusions", | |
| ]) | |
| cleaned: List[str] = [] | |
| for doc in documents: | |
| if not isinstance(doc, str) or not doc.strip(): | |
| cleaned.append("") | |
| continue | |
| text = doc.lower() | |
| # Remove URLs | |
| text = re.sub(r"https?://\S+|www\.\S+", " ", text) | |
| # Remove emails | |
| text = re.sub(r"\S+@\S+", " ", text) | |
| # Remove digits and special characters but keep spaces | |
| text = re.sub(r"[^a-z\s]", " ", text) | |
| # Collapse whitespace | |
| text = re.sub(r"\s+", " ", text).strip() | |
| # Tokenize and filter | |
| tokens = word_tokenize(text) | |
| tokens = [t for t in tokens if t not in stop_words and len(t) > 2] | |
| cleaned.append(" ".join(tokens)) | |
| logger.info("Preprocessed %d documents", len(cleaned)) | |
| return cleaned | |
| # =================================================================== | |
| # 3. run_topic_modeling — Discover topics via BERTopic (or LDA fallback) | |
| # =================================================================== | |
| def run_topic_modeling( | |
| documents: List[str], | |
| source_label: str = "documents", | |
| min_topics: int = 100, | |
| use_bertopic: bool = True, | |
| ) -> Tuple[pd.DataFrame, Any]: | |
| """ | |
| Perform topic modeling on a corpus of preprocessed documents. | |
| Strategy: | |
| 1. Try BERTopic with UMAP + HDBSCAN. If the result has < min_topics, | |
| automatically fall back to sklearn LDA. | |
| 2. LDA is configured with n_components = min_topics to guarantee the | |
| requested topic count. | |
| Parameters | |
| ---------- | |
| documents : list of str | |
| Preprocessed text documents. | |
| source_label : str | |
| Label for logging (e.g. "Titles" or "Abstracts"). | |
| min_topics : int | |
| Minimum number of topics required (default 100). | |
| use_bertopic : bool | |
| Whether to attempt BERTopic first. | |
| Returns | |
| ------- | |
| topics_df : pd.DataFrame | |
| Columns: topic_id, keywords (comma-separated), representative_docs | |
| model : object | |
| The fitted topic model for downstream inspection. | |
| """ | |
| # Filter out empty documents | |
| valid_docs = [d for d in documents if d.strip()] | |
| if len(valid_docs) < 20: | |
| raise ValueError(f"Not enough valid documents ({len(valid_docs)}) for topic modeling.") | |
| logger.info("Running topic modeling on %d %s (target ≥ %d topics)", len(valid_docs), source_label, min_topics) | |
| topics_df = None | |
| model = None | |
| # ------ Attempt BERTopic ------ | |
| if use_bertopic: | |
| try: | |
| topics_df, model = _run_bertopic(valid_docs, source_label, min_topics) | |
| except Exception as exc: | |
| logger.warning("BERTopic failed (%s). Falling back to LDA.", exc) | |
| topics_df = None | |
| # ------ Fallback to LDA if needed ------ | |
| if topics_df is None or len(topics_df) < min_topics: | |
| logger.info("Using LDA to guarantee ≥ %d topics for %s", min_topics, source_label) | |
| topics_df, model = _run_lda(valid_docs, source_label, min_topics) | |
| logger.info("Topic modeling complete for %s: %d topics discovered", source_label, len(topics_df)) | |
| return topics_df, model | |
| def _run_bertopic(docs: List[str], source_label: str, min_topics: int): | |
| """Run BERTopic with tuned parameters.""" | |
| from bertopic import BERTopic | |
| from umap import UMAP | |
| from hdbscan import HDBSCAN | |
| from sklearn.feature_extraction.text import CountVectorizer | |
| umap_model = UMAP( | |
| n_neighbors=10, | |
| n_components=5, | |
| min_dist=0.0, | |
| metric="cosine", | |
| random_state=42, | |
| ) | |
| hdbscan_model = HDBSCAN( | |
| min_cluster_size=5, | |
| min_samples=2, | |
| prediction_data=True, | |
| ) | |
| vectorizer = CountVectorizer( | |
| stop_words="english", | |
| ngram_range=(1, 2), | |
| max_df=0.90, | |
| min_df=2, | |
| ) | |
| topic_model = BERTopic( | |
| umap_model=umap_model, | |
| hdbscan_model=hdbscan_model, | |
| vectorizer_model=vectorizer, | |
| nr_topics="auto", | |
| top_n_words=10, | |
| verbose=False, | |
| ) | |
| topics, _probs = topic_model.fit_transform(docs) | |
| info = topic_model.get_topic_info() | |
| # Exclude outlier topic (-1) | |
| info = info[info["Topic"] != -1].reset_index(drop=True) | |
| rows = [] | |
| for _, row in info.iterrows(): | |
| tid = int(row["Topic"]) | |
| topic_words = topic_model.get_topic(tid) | |
| kw = ", ".join([w for w, _ in topic_words[:10]]) | |
| rows.append({"topic_id": tid, "keywords": kw, "source": source_label}) | |
| df = pd.DataFrame(rows) | |
| return df, topic_model | |
| def _run_lda(docs: List[str], source_label: str, n_topics: int): | |
| """Run sklearn LDA to guarantee the requested number of topics.""" | |
| from sklearn.decomposition import LatentDirichletAllocation | |
| from sklearn.feature_extraction.text import CountVectorizer | |
| vectorizer = CountVectorizer( | |
| stop_words="english", | |
| max_df=0.90, | |
| min_df=2, | |
| ngram_range=(1, 2), | |
| max_features=10000, | |
| ) | |
| dtm = vectorizer.fit_transform(docs) | |
| feature_names = vectorizer.get_feature_names_out() | |
| lda = LatentDirichletAllocation( | |
| n_components=n_topics, | |
| max_iter=25, | |
| learning_method="online", | |
| random_state=42, | |
| n_jobs=-1, | |
| ) | |
| lda.fit(dtm) | |
| rows = [] | |
| for idx, component in enumerate(lda.components_): | |
| top_indices = component.argsort()[-10:][::-1] | |
| kw = ", ".join([feature_names[i] for i in top_indices]) | |
| rows.append({"topic_id": idx, "keywords": kw, "source": source_label}) | |
| df = pd.DataFrame(rows) | |
| return df, lda | |
| # =================================================================== | |
| # 4. generate_labels — Create human-readable labels for each topic | |
| # =================================================================== | |
| def generate_labels( | |
| topics_df: pd.DataFrame, | |
| use_llm: bool = False, | |
| groq_api_key: Optional[str] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Generate a short human-readable label for every topic. | |
| Strategy: | |
| - If use_llm=True and a Groq API key is provided, use the Groq LLM | |
| (llama-3.3-70b-versatile, free tier) to produce contextual labels. | |
| - Otherwise, apply a heuristic: capitalise the first 3–4 keywords. | |
| Parameters | |
| ---------- | |
| topics_df : pd.DataFrame | |
| Must contain columns 'topic_id' and 'keywords'. | |
| use_llm : bool | |
| Whether to use the Groq LLM for label generation. | |
| groq_api_key : str, optional | |
| Groq API key, required if use_llm is True. | |
| Returns | |
| ------- | |
| pd.DataFrame | |
| Same DataFrame with an additional 'label' column. | |
| """ | |
| if use_llm and groq_api_key: | |
| logger.info("Generating labels using Groq LLM …") | |
| topics_df = _generate_labels_llm(topics_df, groq_api_key) | |
| else: | |
| logger.info("Generating labels using keyword heuristic …") | |
| topics_df = _generate_labels_heuristic(topics_df) | |
| return topics_df | |
| def _generate_labels_heuristic(df: pd.DataFrame) -> pd.DataFrame: | |
| """Create labels from the top keywords of each topic.""" | |
| labels = [] | |
| for _, row in df.iterrows(): | |
| kws = [kw.strip() for kw in row["keywords"].split(",")] | |
| # Take the first 3-4 non-trivial keywords and title-case them | |
| candidates = [kw.title() for kw in kws if len(kw) > 2][:4] | |
| label = " / ".join(candidates) if candidates else f"Topic {row['topic_id']}" | |
| labels.append(label) | |
| df = df.copy() | |
| df["label"] = labels | |
| return df | |
| def _generate_labels_llm(df: pd.DataFrame, api_key: str) -> pd.DataFrame: | |
| """Use Groq API to generate contextual labels for topics (batched).""" | |
| import time | |
| try: | |
| from groq import Groq | |
| except ImportError: | |
| logger.warning("groq package not installed. Falling back to heuristic labels.") | |
| return _generate_labels_heuristic(df) | |
| client = Groq(api_key=api_key) | |
| labels = [] | |
| # Process in batches to avoid rate limits | |
| batch_size = 10 | |
| for batch_start in range(0, len(df), batch_size): | |
| batch = df.iloc[batch_start:batch_start + batch_size] | |
| prompt_lines = [] | |
| for _, row in batch.iterrows(): | |
| prompt_lines.append(f"Topic {row['topic_id']}: keywords = [{row['keywords']}]") | |
| prompt = ( | |
| "You are a research taxonomy expert. For each topic below, " | |
| "generate a concise, descriptive label (3-6 words) that captures " | |
| "the theme of the keywords. Return ONLY a JSON list of objects " | |
| 'with keys "topic_id" and "label". No extra text.\n\n' | |
| + "\n".join(prompt_lines) | |
| ) | |
| try: | |
| chat = client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.3, | |
| max_tokens=1024, | |
| ) | |
| resp = chat.choices[0].message.content.strip() | |
| # Parse JSON from the response | |
| # Find JSON array in response | |
| json_match = re.search(r"\[.*\]", resp, re.DOTALL) | |
| if json_match: | |
| batch_labels = json.loads(json_match.group()) | |
| label_map = {item["topic_id"]: item["label"] for item in batch_labels} | |
| for _, row in batch.iterrows(): | |
| labels.append(label_map.get(row["topic_id"], f"Topic {row['topic_id']}")) | |
| else: | |
| # Fallback for this batch | |
| for _, row in batch.iterrows(): | |
| kws = [kw.strip().title() for kw in row["keywords"].split(",")][:4] | |
| labels.append(" / ".join(kws)) | |
| except Exception as exc: | |
| logger.warning("Groq API error for batch starting at %d: %s", batch_start, exc) | |
| for _, row in batch.iterrows(): | |
| kws = [kw.strip().title() for kw in row["keywords"].split(",")][:4] | |
| labels.append(" / ".join(kws)) | |
| # Rate-limit courtesy delay | |
| time.sleep(0.5) | |
| df = df.copy() | |
| df["label"] = labels | |
| return df | |
| # =================================================================== | |
| # 5. compare_themes — Cross-compare title vs abstract topics | |
| # =================================================================== | |
| def compare_themes( | |
| title_topics: pd.DataFrame, | |
| abstract_topics: pd.DataFrame, | |
| ) -> pd.DataFrame: | |
| """ | |
| Build a comparison table showing dominant themes from titles and | |
| abstracts side-by-side. | |
| Matching strategy: | |
| - Compute keyword overlap (Jaccard similarity) between every | |
| title-topic and abstract-topic pair. | |
| - For each title-topic, find the best matching abstract-topic. | |
| - Report similarity score and alignment status. | |
| Parameters | |
| ---------- | |
| title_topics : pd.DataFrame | |
| Topics extracted from titles (with 'topic_id', 'keywords', 'label'). | |
| abstract_topics : pd.DataFrame | |
| Topics extracted from abstracts (with 'topic_id', 'keywords', 'label'). | |
| Returns | |
| ------- | |
| pd.DataFrame | |
| Comparison table with columns: | |
| title_topic_id, title_label, title_keywords, | |
| abstract_topic_id, abstract_label, abstract_keywords, | |
| similarity, alignment | |
| """ | |
| logger.info("Comparing themes: %d title topics × %d abstract topics", | |
| len(title_topics), len(abstract_topics)) | |
| def _keywords_set(kw_str: str) -> set: | |
| return set(kw.strip().lower() for kw in kw_str.split(",") if kw.strip()) | |
| rows = [] | |
| for _, t_row in title_topics.iterrows(): | |
| t_kws = _keywords_set(t_row["keywords"]) | |
| best_sim = 0.0 | |
| best_match = None | |
| for _, a_row in abstract_topics.iterrows(): | |
| a_kws = _keywords_set(a_row["keywords"]) | |
| if not t_kws or not a_kws: | |
| continue | |
| # Jaccard similarity | |
| intersection = len(t_kws & a_kws) | |
| union = len(t_kws | a_kws) | |
| sim = intersection / union if union else 0.0 | |
| if sim > best_sim: | |
| best_sim = sim | |
| best_match = a_row | |
| alignment = ( | |
| "Strong" if best_sim >= 0.4 | |
| else "Moderate" if best_sim >= 0.2 | |
| else "Weak" if best_sim > 0 | |
| else "No Match" | |
| ) | |
| rows.append({ | |
| "title_topic_id": t_row["topic_id"], | |
| "title_label": t_row.get("label", ""), | |
| "title_keywords": t_row["keywords"], | |
| "abstract_topic_id": best_match["topic_id"] if best_match is not None else None, | |
| "abstract_label": best_match.get("label", "") if best_match is not None else "", | |
| "abstract_keywords": best_match["keywords"] if best_match is not None else "", | |
| "similarity": round(best_sim, 4), | |
| "alignment": alignment, | |
| }) | |
| comparison_df = pd.DataFrame(rows) | |
| logger.info("Theme comparison complete: %d rows", len(comparison_df)) | |
| return comparison_df | |
| # =================================================================== | |
| # 6. create_taxonomy_map — Classify themes as MAPPED or NOVEL | |
| # =================================================================== | |
| def create_taxonomy_map( | |
| topics_df: pd.DataFrame, | |
| known_themes: Optional[List[str]] = None, | |
| threshold: float = 0.15, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Classify each topic as either MAPPED (similar to a well-known | |
| AI / business / IS research theme) or NOVEL (previously unseen). | |
| Heuristic: | |
| For each topic's keyword set, compute its best token-overlap | |
| ratio against the known themes list. If the ratio exceeds the | |
| threshold, label it as MAPPED; otherwise NOVEL. | |
| Parameters | |
| ---------- | |
| topics_df : pd.DataFrame | |
| Must contain 'topic_id', 'keywords', and 'label' columns. | |
| known_themes : list of str, optional | |
| Reference themes (defaults to the built-in KNOWN_THEMES). | |
| threshold : float | |
| Minimum overlap ratio to classify as MAPPED. | |
| Returns | |
| ------- | |
| dict | |
| JSON-serialisable taxonomy map: | |
| { | |
| "metadata": { ... }, | |
| "mapped": [ {topic_id, label, keywords, matched_theme, score}, ... ], | |
| "novel": [ {topic_id, label, keywords, score}, ... ], | |
| } | |
| """ | |
| if known_themes is None: | |
| known_themes = KNOWN_THEMES | |
| logger.info("Building taxonomy map for %d topics (threshold=%.2f)", len(topics_df), threshold) | |
| mapped: List[Dict] = [] | |
| novel: List[Dict] = [] | |
| known_tokens_list = [set(theme.lower().split()) for theme in known_themes] | |
| for _, row in topics_df.iterrows(): | |
| topic_tokens = set( | |
| kw.strip().lower() | |
| for kw in row["keywords"].split(",") | |
| if kw.strip() | |
| ) | |
| # Also include individual words from multi-word keywords | |
| expanded_tokens = set() | |
| for token in topic_tokens: | |
| expanded_tokens.update(token.split()) | |
| expanded_tokens.update(topic_tokens) | |
| best_score = 0.0 | |
| best_theme = "" | |
| for theme_str, theme_tokens in zip(known_themes, known_tokens_list): | |
| if not expanded_tokens or not theme_tokens: | |
| continue | |
| intersection = len(expanded_tokens & theme_tokens) | |
| union_size = len(expanded_tokens | theme_tokens) | |
| score = intersection / union_size if union_size else 0.0 | |
| if score > best_score: | |
| best_score = score | |
| best_theme = theme_str | |
| entry = { | |
| "topic_id": int(row["topic_id"]), | |
| "label": row.get("label", ""), | |
| "keywords": row["keywords"], | |
| "score": round(best_score, 4), | |
| } | |
| if best_score >= threshold: | |
| entry["matched_theme"] = best_theme | |
| entry["classification"] = "MAPPED" | |
| mapped.append(entry) | |
| else: | |
| entry["classification"] = "NOVEL" | |
| novel.append(entry) | |
| taxonomy = { | |
| "metadata": { | |
| "total_topics": len(topics_df), | |
| "mapped_count": len(mapped), | |
| "novel_count": len(novel), | |
| "threshold": threshold, | |
| }, | |
| "mapped": mapped, | |
| "novel": novel, | |
| } | |
| logger.info("Taxonomy: %d MAPPED, %d NOVEL", len(mapped), len(novel)) | |
| return taxonomy | |