| """ |
| Topic Modeling Engine |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| Problem: Product teams were reading thousands of reviews manually to find |
| recurring themes. They missed emerging issues and couldn't prioritize roadmap |
| decisions based on customer frequency. |
| |
| Solution: Automated topic discovery using NMF (Non-negative Matrix |
| Factorization) β fast, interpretable, and more coherent than LDA for short |
| texts like reviews and tweets. |
| |
| Output: Named topic clusters with example posts, keyword weights, and |
| sentiment distribution per cluster. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import re |
| import logging |
| from typing import List, Dict, Tuple, Optional |
| from collections import Counter |
|
|
| import numpy as np |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.decomposition import NMF, LatentDirichletAllocation |
| from sklearn.preprocessing import normalize |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| CUSTOM_STOP_WORDS = [ |
| "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", |
| "of", "with", "by", "from", "is", "was", "are", "were", "be", "been", |
| "have", "has", "had", "do", "does", "did", "will", "would", "could", |
| "should", "may", "might", "shall", "can", "this", "that", |
| "these", "those", "i", "we", "you", "they", "he", "she", "it", |
| "my", "our", "your", "their", "its", "me", "us", "them", "him", "her", |
| "very", "really", "just", "also", "even", "still", |
| "when", "where", "how", "what", "which", "who", "why", |
| "so", "as", "if", "up", "out", "about", |
| ] |
|
|
| |
| TOPIC_NAME_MAP = { |
| frozenset(["performance", "speed", "slow", "load", "latency", "fast", "crash"]): "Performance & Speed", |
| frozenset(["price", "billing", "cost", "expensive", "subscription", "fee", "refund"]): "Pricing & Billing", |
| frozenset(["support", "team", "response", "customer", "service", "help", "ticket"]): "Customer Support", |
| frozenset(["ui", "interface", "design", "dashboard", "navigation", "layout", "ux"]): "UI & Design", |
| frozenset(["feature", "api", "integration", "export", "report", "function", "capability"]): "Features & Integrations", |
| frozenset(["setup", "onboard", "doc", "documentation", "guide", "install", "config"]): "Onboarding & Docs", |
| frozenset(["data", "accuracy", "model", "analysis", "insight", "quality", "reliable"]): "Data Quality & Accuracy", |
| frozenset(["security", "privacy", "breach", "auth", "compliance", "sso", "gdpr"]): "Security & Compliance", |
| } |
|
|
|
|
| def _clean_text(text: str) -> str: |
| """Normalize text for vectorization.""" |
| text = text.lower() |
| text = re.sub(r"http\S+|www\S+|@\w+|#\w+", " ", text) |
| text = re.sub(r"[^a-z\s]", " ", text) |
| text = re.sub(r"\s+", " ", text).strip() |
| return text |
|
|
|
|
| def _infer_topic_name(keywords: List[str]) -> str: |
| """Heuristically name a topic from its top keywords.""" |
| keyword_set = set(keywords[:8]) |
| best_match = None |
| best_overlap = 0 |
|
|
| for key_words, name in TOPIC_NAME_MAP.items(): |
| overlap = len(keyword_set & key_words) |
| if overlap > best_overlap: |
| best_overlap = overlap |
| best_match = name |
|
|
| if best_match and best_overlap >= 1: |
| return best_match |
|
|
| |
| return keywords[0].replace("_", " ").title() + " Issues" if keywords else "General Feedback" |
|
|
|
|
| class TopicModeler: |
| """ |
| NMF-based topic modeling optimized for short product review texts. |
| |
| Why NMF over LDA? |
| - LDA assumes bag-of-words with Dirichlet priors β good for long documents. |
| - NMF with TF-IDF produces more coherent, interpretable topics for short texts. |
| - Faster training, better topic separation for review-length inputs. |
| """ |
|
|
| def __init__(self, n_topics: int = 8, max_features: int = 3000): |
| self.n_topics = n_topics |
| self.max_features = max_features |
| self.vectorizer: Optional[TfidfVectorizer] = None |
| self.model: Optional[NMF] = None |
| self.feature_names: List[str] = [] |
| self.topic_names: List[str] = [] |
| self.is_fitted = False |
|
|
| def fit(self, texts: List[str]) -> "TopicModeler": |
| """Fit the topic model on a corpus of texts.""" |
| cleaned = [_clean_text(t) for t in texts] |
| |
| |
| cleaned = [t for t in cleaned if t.strip()] |
| if len(cleaned) < 10: |
| logger.warning(f"Too few valid documents ({len(cleaned)}). Using simple clustering.") |
| self._create_fallback_topics(texts) |
| return self |
|
|
| self.vectorizer = TfidfVectorizer( |
| max_features=self.max_features, |
| stop_words=CUSTOM_STOP_WORDS, |
| ngram_range=(1, 2), |
| min_df=1, |
| max_df=0.95, |
| sublinear_tf=True, |
| ) |
| |
| try: |
| tfidf_matrix = self.vectorizer.fit_transform(cleaned) |
| self.feature_names = self.vectorizer.get_feature_names_out().tolist() |
| |
| |
| if tfidf_matrix.nnz == 0 or len(self.feature_names) < self.n_topics: |
| logger.warning("TF-IDF matrix is too sparse. Using fallback topics.") |
| self._create_fallback_topics(texts) |
| return self |
|
|
| self.model = NMF( |
| n_components=self.n_topics, |
| init="nndsvd", |
| random_state=42, |
| max_iter=300, |
| alpha_W=0.0, |
| alpha_H=0.0, |
| l1_ratio=0.0, |
| ) |
| self.model.fit(tfidf_matrix) |
| |
| self.topic_names = [ |
| _infer_topic_name(self._get_topic_keywords(i, top_n=10)) |
| for i in range(self.n_topics) |
| ] |
| self.is_fitted = True |
| logger.info(f"Topic model fitted. Topics: {self.topic_names}") |
| |
| except Exception as e: |
| logger.error(f"Topic model fitting failed: {e}. Using fallback.") |
| self._create_fallback_topics(texts) |
| |
| return self |
|
|
| def _create_fallback_topics(self, texts: List[str]) -> None: |
| """Create a simple fallback topic model when NMF fails.""" |
| logger.warning("Creating fallback topic model with keyword-based clustering") |
| self.n_topics = 5 |
| self.topic_names = [ |
| "Performance & Speed", |
| "Customer Support", |
| "Pricing & Billing", |
| "Features & UI", |
| "General Feedback" |
| ] |
| self.is_fitted = True |
| self._fallback_mode = True |
| |
| self._fallback_texts = texts[:100] |
|
|
| def _get_topic_keywords(self, topic_idx: int, top_n: int = 12) -> List[str]: |
| """Return top keywords for a topic.""" |
| if not hasattr(self, 'model') or self.model is None: |
| |
| fallback_keywords = { |
| 0: ['slow', 'fast', 'speed', 'performance', 'loading', 'lag', 'crash'], |
| 1: ['support', 'help', 'response', 'team', 'customer', 'service'], |
| 2: ['price', 'pricing', 'cost', 'expensive', 'billing', 'subscription'], |
| 3: ['feature', 'ui', 'interface', 'design', 'dashboard', 'ux'], |
| 4: ['good', 'better', 'platform', 'recommend', 'experience', 'overall'] |
| } |
| return fallback_keywords.get(topic_idx, ['general', 'feedback'])[:top_n] |
| |
| topic_vector = self.model.components_[topic_idx] |
| top_indices = topic_vector.argsort()[::-1][:top_n] |
| return [self.feature_names[i] for i in top_indices] |
|
|
| def transform(self, texts: List[str]) -> np.ndarray: |
| """Assign topic distributions to texts.""" |
| if hasattr(self, '_fallback_mode') and self._fallback_mode: |
| |
| n = len(texts) |
| distributions = np.zeros((n, self.n_topics)) |
| |
| keywords = { |
| 0: ['slow', 'speed', 'performance', 'loading', 'fast', 'lag'], |
| 1: ['support', 'help', 'response', 'team', 'customer'], |
| 2: ['price', 'pricing', 'cost', 'expensive', 'billing'], |
| 3: ['feature', 'ui', 'interface', 'design', 'dashboard'], |
| 4: [] |
| } |
| |
| for i, text in enumerate(texts): |
| text_lower = text.lower() |
| scores = np.zeros(self.n_topics) |
| |
| for topic_id, words in keywords.items(): |
| scores[topic_id] = sum(1 for w in words if w in text_lower) |
| |
| |
| if scores.sum() > 0: |
| scores = scores / scores.sum() |
| else: |
| scores[-1] = 1.0 |
| |
| distributions[i] = scores |
| |
| return distributions |
| |
| |
| cleaned = [_clean_text(t) for t in texts] |
| tfidf = self.vectorizer.transform(cleaned) |
| return self.model.transform(tfidf) |
|
|
| def get_document_topics(self, texts: List[str]) -> List[int]: |
| """Return the dominant topic index for each text.""" |
| distributions = self.transform(texts) |
| return distributions.argmax(axis=1).tolist() |
|
|
| def get_topics_summary( |
| self, |
| texts: List[str], |
| sentiments: Optional[List[str]] = None, |
| top_n_keywords: int = 10, |
| ) -> List[Dict]: |
| """ |
| Full topic summary with keywords, example posts, sentiment breakdown, |
| and cluster size β ready for frontend visualization. |
| """ |
| if not self.is_fitted: |
| raise RuntimeError("Model must be fitted before calling get_topics_summary.") |
|
|
| topic_assignments = self.get_document_topics(texts) |
| |
| |
| topic_buckets: Dict[int, List[int]] = {i: [] for i in range(self.n_topics)} |
| for idx, topic in enumerate(topic_assignments): |
| topic_buckets[topic].append(idx) |
|
|
| summary = [] |
| for topic_idx in range(self.n_topics): |
| indices = topic_buckets[topic_idx] |
| if not indices: |
| continue |
|
|
| keywords = self._get_topic_keywords(topic_idx, top_n=top_n_keywords) |
| examples = [texts[i] for i in indices[:3]] |
|
|
| |
| sentiment_dist = {"positive": 0, "negative": 0, "neutral": 0, "crisis": 0} |
| if sentiments: |
| for i in indices: |
| lbl = sentiments[i] if i < len(sentiments) else "neutral" |
| sentiment_dist[lbl] = sentiment_dist.get(lbl, 0) + 1 |
|
|
| total = len(indices) |
| dominant_sentiment = max(sentiment_dist, key=sentiment_dist.get) if sentiments else "neutral" |
| |
| |
| kw_weights = {} |
| if hasattr(self, 'model') and self.model is not None: |
| topic_vector = self.model.components_[topic_idx] |
| for kw in keywords: |
| if kw in self.feature_names: |
| feat_idx = self.feature_names.index(kw) |
| kw_weights[kw] = float(round(topic_vector[feat_idx], 4)) |
| else: |
| |
| for i, kw in enumerate(keywords): |
| kw_weights[kw] = float(round(1.0 - (i * 0.1), 2)) |
|
|
| summary.append({ |
| "id": topic_idx, |
| "name": self.topic_names[topic_idx], |
| "keywords": keywords, |
| "keyword_weights": kw_weights, |
| "post_count": total, |
| "percentage": round(100 * total / max(len(texts), 1), 1), |
| "dominant_sentiment": dominant_sentiment, |
| "sentiment_distribution": sentiment_dist, |
| "examples": examples, |
| }) |
|
|
| return sorted(summary, key=lambda x: x["post_count"], reverse=True) |
|
|
|
|
| |
| _modeler: Optional[TopicModeler] = None |
|
|
|
|
| def get_modeler(n_topics: int = 8) -> TopicModeler: |
| global _modeler |
| if _modeler is None: |
| _modeler = TopicModeler(n_topics=n_topics) |
| return _modeler |
|
|