NLP-intelligence / nlp_core /topic_modeler.py
Nomio4640's picture
tm
8d189d1
"""
topic_modeler.py — BERTopic wrapper with small-dataset fallback.
Problem with the original:
BERTopic uses HDBSCAN for clustering. HDBSCAN has a min_cluster_size
parameter that defaults to 10 in BERTopic — meaning it needs at least
10 documents just to form a single cluster. Below that, EVERYTHING gets
assigned topic -1 (outlier) and the result is completely empty.
Solution:
For small datasets (MIN_BERTOPIC_DOCS = 10+): use BERTopic as normal.
For tiny datasets (MIN_TINY_DOCS = 3+): use KMeans clustering inside
BERTopic. KMeans always assigns every point to a cluster (no outliers),
and it works with as few as 2-3 documents.
The number of clusters (topics) is automatically chosen as:
n_clusters = max(2, min(n_docs // 2, MAX_TINY_TOPICS))
So 3 docs → 2 topics, 6 docs → 3 topics, 8 docs → 4 topics.
Below MIN_TINY_DOCS (3): return empty — can't cluster 1-2 texts meaningfully.
"""
from typing import List, Dict, Tuple
from .models import TopicResult
# ---------------------------------------------------------------------------
# Mongolian suffix stripping for c-TF-IDF keyword extraction
# ---------------------------------------------------------------------------
# BERTopic uses CountVectorizer + c-TF-IDF to label each topic cluster.
# Without this, agglutinated forms fragment a single concept into many
# low-frequency tokens: монголын / монголд / монголаас → 3 keywords
# With this tokenizer they all reduce to монгол → 1 keyword, higher weight.
#
# Rules are ordered longest-first so a longer suffix is tried before a
# shorter one that is a suffix of it (e.g. "аас" before "ас").
# Root must be ≥ 3 characters after stripping to avoid destroying short words.
_MN_SUFFIXES = [
# Ablative (longest first to avoid partial matches)
"аас", "ээс", "оос", "өөс",
# Genitive
"ийн", "ын", "ний",
# Comitative
"тай", "тэй", "той",
# Directive
"руу", "рүү",
# Plural
"ууд", "үүд",
# Accusative
"ийг", "ыг",
# Dative (single char — checked last so longer suffixes win)
"д", "т",
]
_MIN_ROOT = 3 # don't strip if remaining root would be shorter than this
# ---------------------------------------------------------------------------
# Mongolian stopwords for topic modeling c-TF-IDF
# ---------------------------------------------------------------------------
# These words appear in nearly every document and add no topic-discriminating
# value. Filtering them lets BERTopic surface meaningful content keywords.
_MN_STOPWORDS = {
# Copulas / auxiliary verbs
"байна", "байгаа", "байсан", "байх", "байдаг", "болно", "болох", "болсон",
"болж", "бол", "бна", "бсан", "бгаа", "бхаа", "бн", "бдаг", "бхоо", "бх",
# Common verbs (too generic for topics)
"хийх", "хийж", "хийсэн", "авах", "авч", "авсан", "өгөх", "өгч", "өгсөн",
"ирэх", "ирж", "ирсэн", "очих", "очсон", "гарах", "гарч", "гарсан",
"орох", "орж", "орсон", "үзүүлж", "явагдаж", "ажиллаж", "эхэлж", "эхэллээ",
# Conjunctions / particles
"ба", "бас", "болон", "мөн", "эсвэл", "гэхдээ", "харин", "бөгөөд",
"гэж", "гэх", "гэсэн", "гэжээ", "гэв", "гэвч", "гээд", "гэнэ", "гээ",
# Pronouns / demonstratives
"энэ", "тэр", "эдгээр", "тэдгээр", "үүн", "түүн", "бид", "тэд",
"би", "чи", "та", "миний", "чиний", "таны", "өөр", "өөрийн",
# Postpositions / spatial
"дээр", "доор", "дотор", "гадна", "хойно", "өмнө", "дунд",
# Intensifiers / degree
"их", "бага", "маш", "тун", "нэлээд", "шиг", "хамгийн",
# Single-char particles and suffixes
"л", "ч", "нь", "аа", "ээ", "оо", "өө", "юм", "биш",
"уу", "үү", "юу", "вэ", "бэ",
# Question words
"яаж", "яагаад", "хаана", "хэзээ", "хэн", "ямар",
# Informal / social media
"шд", "шдэ", "шдээ", "шт", "штэ", "штээ", "дээ", "даа",
"бз", "биз", "хаха", "кк",
# Generic high-frequency nouns (appear in every news article)
"монгол", "улс", "улсын", "хот", "хотын", "аймаг", "аймагт",
"шинэ", "онд", "жил", "жилд", "хувь", "хувиар", "тэрбум",
"байна.", "нэг", "гаруй", "дахин", "хэд", "хэдэн", "өнгөрсөн",
# Numbers written as words
"нэг", "хоёр", "гурав", "дөрөв", "тав", "зургаа", "долоо", "найм",
# Common news/media filler words
"ноцтой", "ноц", "томоохон", "чухал", "асуудал", "асуудлыг",
"нөлөө", "нөлөөл", "байгааг", "байгаад", "салбар", "салбарт",
"ажиллагаа", "ашиглалта", "ашиглалтад", "нэмэгдсэн", "нэмэгд",
"бууруул", "буурсан", "сайжруул", "хангах", "хангаж", "хүрч",
"хүрсэн", "хүрэлцэх", "шийдвэрлэх", "шаардлагатай", "шаардаж",
"түвшин", "түвш", "хэрэгжүүлж", "хэмжээ", "нийтлэл",
"алхам", "ахиц", "үр", "дүн", "олон", "бүх", "иргэд", "иргэн",
"засгийн", "газар", "засаг", "өмнөх",
# Other function words
"тийм", "ийм", "чинь", "минь", "билээ", "шүү",
"надад", "танд", "бусад", "зарим", "ийнхүү", "тухай",
"дамжуулан", "хүртэл", "ороос", "хооронд",
}
def _mn_stem(word: str) -> str:
for sfx in _MN_SUFFIXES:
if word.endswith(sfx) and len(word) - len(sfx) >= _MIN_ROOT:
return word[: -len(sfx)]
return word
def _mongolian_tokenizer(text: str) -> List[str]:
"""Tokenize, stem, and filter Mongolian text for BERTopic's c-TF-IDF."""
tokens = []
for w in text.split():
if not w or len(w) < 2:
continue
# Skip pure numbers (years, percentages, amounts)
if w.isdigit():
continue
stem = _mn_stem(w)
if stem.lower() not in _MN_STOPWORDS and len(stem) >= 2:
tokens.append(stem)
return tokens
# Thresholds
MIN_TINY_DOCS = 3 # minimum to attempt topic modeling at all
MIN_BERTOPIC_DOCS = 50 # use KMeans for <50 docs (HDBSCAN needs more)
MAX_TINY_TOPICS = 10 # cap for KMeans cluster count on small datasets
class TopicModeler:
"""Topic modeling service using BERTopic with small-dataset fallback."""
def __init__(
self,
embedding_model: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2",
language: str = "multilingual",
min_topics: int = 5,
max_topics: int = 15,
):
self.embedding_model_name = embedding_model
self.language = language
self.min_topics = min_topics
self.max_topics = max_topics
self._embedding_model = None
self._model = None # last fitted BERTopic model
def _load_embedding_model(self):
if self._embedding_model is None:
from sentence_transformers import SentenceTransformer
self._embedding_model = SentenceTransformer(self.embedding_model_name)
return self._embedding_model
def _make_bertopic(self, n_docs: int):
"""
Build a BERTopic instance appropriate for the dataset size.
For n_docs >= MIN_BERTOPIC_DOCS: standard BERTopic with HDBSCAN.
For n_docs < MIN_BERTOPIC_DOCS: BERTopic with KMeans so every
document gets a real topic assignment instead of -1.
"""
from bertopic import BERTopic
from bertopic.representation import MaximalMarginalRelevance
from sklearn.feature_extraction.text import CountVectorizer
vectorizer = CountVectorizer(
tokenizer=_mongolian_tokenizer,
min_df=1,
max_df=0.80, # ignore terms appearing in >80% of docs
)
# MMR picks diverse keywords instead of redundant near-synonyms
mmr = MaximalMarginalRelevance(diversity=0.5)
if n_docs >= MIN_BERTOPIC_DOCS:
# Large dataset: use KMeans to guarantee a controllable number
# of topics. HDBSCAN tends to produce too few topics (2-3) on
# medium datasets (100-1000 docs) because of aggressive merging.
from sklearn.cluster import KMeans
n_clusters = max(
self.min_topics,
min(n_docs // 10, self.max_topics), # Increased division base to allow more topics
)
# Ensure we don't request more clusters than documents
n_clusters = min(n_clusters, n_docs)
cluster_model = KMeans(
n_clusters=n_clusters, random_state=42, n_init="auto"
)
return BERTopic(
language=self.language,
embedding_model=self._load_embedding_model(),
hdbscan_model=cluster_model,
vectorizer_model=vectorizer,
representation_model=mmr,
min_topic_size=2,
)
else:
# Small/medium dataset (<50 docs): KMeans guarantees every
# document gets a topic (no outlier -1 assignments).
from sklearn.cluster import KMeans
n_clusters = max(min(2, n_docs), min(n_docs // 3, self.max_topics))
# If user wants min_topics=5, try to enforce it if dataset is large enough
n_clusters = min(max(n_clusters, self.min_topics), n_docs)
cluster_model = KMeans(n_clusters=n_clusters, random_state=42, n_init="auto")
return BERTopic(
language=self.language,
embedding_model=self._load_embedding_model(),
hdbscan_model=cluster_model,
vectorizer_model=vectorizer,
representation_model=mmr,
min_topic_size=1,
nr_topics="auto",
)
def fit_transform(self, texts: List[str]) -> Tuple[List[TopicResult], List[Dict]]:
"""
Fit topic model on texts and return per-document topic assignments.
Thresholds:
< MIN_TINY_DOCS (3): returns empty — not enough data
3 to 9 docs: KMeans-backed BERTopic
10+ docs: standard HDBSCAN BERTopic
Returns:
(topic_results, topic_summary)
topic_results — one TopicResult per input document
topic_summary — list of {topic_id, name, count} dicts
"""
# Filter empty strings — they confuse the embedding model
non_empty = [(i, t) for i, t in enumerate(texts) if t.strip()]
if len(non_empty) < MIN_TINY_DOCS:
return [], [{
"info": (
f"Topic modeling needs at least {MIN_TINY_DOCS} non-empty documents. "
f"Got {len(non_empty)}."
)
}]
indices, valid_texts = zip(*non_empty)
emb_model = self._load_embedding_model()
embeddings = emb_model.encode(list(valid_texts), show_progress_bar=False)
model = self._make_bertopic(len(valid_texts))
topics, probs = model.fit_transform(list(valid_texts), embeddings)
self._model = model
topic_info = model.get_topic_info()
# Build per-document results
# Map from valid_texts index back to original texts index
result_map: Dict[int, TopicResult] = {}
for pos, (orig_idx, topic_id) in enumerate(zip(indices, topics)):
try:
prob = float(probs[pos]) if probs is not None else 0.0
except (TypeError, IndexError, ValueError):
prob = 0.0
try:
topic_words = model.get_topic(topic_id)
keywords = [w for w, _ in (topic_words or [])[:5]]
except Exception:
keywords = []
topic_row = topic_info[topic_info["Topic"] == topic_id]
if not topic_row.empty and "Name" in topic_row.columns:
label = str(topic_row.iloc[0]["Name"])
else:
label = f"Topic {topic_id}" if topic_id != -1 else "Outlier"
result_map[orig_idx] = TopicResult(
topic_id=int(topic_id),
topic_label=label,
probability=float(prob),
keywords=keywords,
)
# Fill results list aligned to original texts list
# Documents that were empty strings get topic_id=-1
results = []
for i in range(len(texts)):
if i in result_map:
results.append(result_map[i])
else:
results.append(TopicResult(
topic_id=-1, topic_label="Empty", probability=0.0, keywords=[]
))
# Build summary (exclude outlier topic -1 from summary)
summary = []
for _, row in topic_info.iterrows():
tid = int(row["Topic"])
summary.append({
"topic_id": tid,
"name": str(row.get("Name", f"Topic {tid}")),
"count": int(row["Count"]),
})
return results, summary
def get_topic_info(self) -> List[Dict]:
"""Return topic summary from the last fitted model."""
if self._model is None:
return []
return [
{
"topic_id": int(row["Topic"]),
"name": str(row.get("Name", f"Topic {row['Topic']}")),
"count": int(row["Count"]),
}
for _, row in self._model.get_topic_info().iterrows()
]