miracl-search / text_mining.py
thoshiths's picture
Upload text_mining.py with huggingface_hub
3a8d6b7 verified
# text_mining.py
"""
Text Mining Component for the MIRACL multilingual IR system.
Techniques:
1. Document Clustering β€” KMeans (k=12) on TF-IDF + LSA (TruncatedSVD)
2. Topic Modelling β€” Latent Dirichlet Allocation (LDA, sklearn)
3. Query Expansion β€” cluster top-terms injected into query
4. Keyphrase Extraction β€” per-document TF-IDF top-N keyphrases + query expansion
All classes expose a common ``expand_query(query, top_n)`` interface for use
by the search pipeline.
Author : Thoshith S
"""
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.cluster import KMeans
from sklearn.decomposition import TruncatedSVD, PCA, LatentDirichletAllocation
from sklearn.preprocessing import normalize
from sklearn.metrics import silhouette_score
# ══════════════════════════════════════════════════════════════════════════════
# 1 Β· DOCUMENT CLUSTERING (KMeans + LSA)
# ══════════════════════════════════════════════════════════════════════════════
class DocumentClusterer:
"""
Cluster English documents using KMeans on a TF-IDF + LSA (TruncatedSVD)
representation. Cluster term profiles are used to expand queries with
semantically related vocabulary, improving recall in downstream retrieval.
"""
def __init__(self, corpus: list, n_clusters: int = 12, random_state: int = 42):
self.corpus = corpus
self.n_clusters = n_clusters
self.random_state = random_state
self.vectorizer = None
self.svd = None
self.km = None
self.lsa_matrix = None # L2-normalised LSA vectors (N_en, n_components)
self.cluster_labels = None # (N_en,) int array
self.en_doc_ids = []
self.en_docs = []
self.cluster_descriptions = {} # cluster_id -> list[str] top terms
self.silhouette = None
# ------------------------------------------------------------------
def fit(self):
"""TF-IDF β†’ LSA β†’ KMeans pipeline on English documents only."""
self.en_docs = [d for d in self.corpus if d.get('language') == 'en']
self.en_doc_ids = [d['doc_id'] for d in self.en_docs]
if not self.en_docs:
raise ValueError("No English documents found in corpus.")
texts = [f"{d['title']}. {d['text']}" for d in self.en_docs]
self.vectorizer = TfidfVectorizer(
max_features=5000, ngram_range=(1, 2),
sublinear_tf=True, stop_words='english', min_df=2,
)
tfidf_matrix = self.vectorizer.fit_transform(texts)
n_components = min(100, len(self.en_docs) - 1)
self.svd = TruncatedSVD(n_components=n_components, random_state=self.random_state)
lsa_raw = self.svd.fit_transform(tfidf_matrix)
self.lsa_matrix = normalize(lsa_raw, norm='l2')
self.km = KMeans(
n_clusters=self.n_clusters, n_init=10,
random_state=self.random_state, init='k-means++',
)
self.cluster_labels = self.km.fit_predict(self.lsa_matrix)
feature_names = np.array(self.vectorizer.get_feature_names_out())
self.cluster_descriptions = {}
for cid in range(self.n_clusters):
mask = self.cluster_labels == cid
if not mask.any():
self.cluster_descriptions[cid] = []
continue
mean_tfidf = np.asarray(tfidf_matrix[mask].mean(axis=0)).flatten()
top_idx = mean_tfidf.argsort()[-5:][::-1]
self.cluster_descriptions[cid] = list(feature_names[top_idx])
sample = min(1000, len(self.lsa_matrix))
try:
self.silhouette = silhouette_score(
self.lsa_matrix, self.cluster_labels,
metric='cosine', sample_size=sample, random_state=self.random_state,
)
except Exception:
self.silhouette = float('nan')
self._print_cluster_summary()
# ------------------------------------------------------------------
def _print_cluster_summary(self):
print(f"\nDocument Clustering Summary")
print(f" English docs : {len(self.en_docs)}")
print(f" Clusters (k) : {self.n_clusters}")
print(f" Silhouette : {self.silhouette:.4f}")
print(f" {'Cluster':>7} {'Size':>5} Top Terms")
print(f" {'-'*65}")
for cid in range(self.n_clusters):
size = int((self.cluster_labels == cid).sum())
terms = ', '.join(self.cluster_descriptions.get(cid, []))
print(f" {cid:>7} {size:>5} {terms}")
# ------------------------------------------------------------------
def get_cluster_for_query(self, query: str) -> int:
if self.vectorizer is None:
raise RuntimeError("Call fit() first.")
tfidf_q = self.vectorizer.transform([query])
lsa_q = normalize(self.svd.transform(tfidf_q), norm='l2')
centroids_normed = normalize(self.km.cluster_centers_, norm='l2')
sims = (lsa_q @ centroids_normed.T).flatten()
return int(np.argmax(sims))
def expand_query(self, query: str, top_n: int = 5) -> list:
"""Return cluster-derived expansion terms not in the query."""
cid = self.get_cluster_for_query(query)
qtoks = set(query.lower().split())
return [t for t in self.cluster_descriptions.get(cid, []) if t not in qtoks][:top_n]
def get_cluster_documents(self, cluster_id: int) -> list:
if self.cluster_labels is None:
raise RuntimeError("Call fit() first.")
return [self.en_docs[i] for i, lbl in enumerate(self.cluster_labels) if lbl == cluster_id]
def get_cluster_summary(self) -> pd.DataFrame:
if self.cluster_labels is None:
raise RuntimeError("Call fit() first.")
rows = []
for cid in range(self.n_clusters):
mask = self.cluster_labels == cid
docs = [self.en_docs[i] for i in np.where(mask)[0]]
rows.append({
'cluster_id': cid,
'size': int(mask.sum()),
'top_terms': ', '.join(self.cluster_descriptions.get(cid, [])),
'sample_titles': ', '.join(d['title'] for d in docs[:3]),
})
return pd.DataFrame(rows)
# ------------------------------------------------------------------ Plots
def plot_clusters(self, save_path: str = None):
if self.lsa_matrix is None:
raise RuntimeError("Call fit() first.")
pca = PCA(n_components=2, random_state=self.random_state)
coords = pca.fit_transform(self.lsa_matrix)
fig, ax = plt.subplots(figsize=(12, 8))
sc = ax.scatter(coords[:, 0], coords[:, 1],
c=self.cluster_labels, cmap='tab20', s=6, alpha=0.7)
plt.colorbar(sc, ax=ax, label='Cluster')
ax.set_title("Document Clusters β€” MIRACL EN Corpus")
ax.set_xlabel("PC 1"); ax.set_ylabel("PC 2")
plt.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150); print(f"Saved to {save_path}")
else:
plt.show()
plt.close(fig)
def plot_cluster_sizes(self, save_path: str = None):
if self.cluster_labels is None:
raise RuntimeError("Call fit() first.")
sizes = [int((self.cluster_labels == c).sum()) for c in range(self.n_clusters)]
labels = [f"C{c}" for c in range(self.n_clusters)]
fig, ax = plt.subplots(figsize=(10, 6))
ax.barh(np.arange(len(labels)), sizes, align='center')
ax.set_yticks(np.arange(len(labels))); ax.set_yticklabels(labels)
ax.invert_yaxis(); ax.set_xlabel("Documents"); ax.set_title("Cluster Size Distribution")
plt.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150)
else:
plt.show()
plt.close(fig)
# ══════════════════════════════════════════════════════════════════════════════
# 2 Β· TOPIC MODELLING (LDA)
# ══════════════════════════════════════════════════════════════════════════════
class TopicModeller:
"""
Latent Dirichlet Allocation (LDA) topic modelling on English documents.
Discovers latent topics as probability distributions over vocabulary terms.
Each query is projected into the topic space; the dominant topic's top words
are appended to the query for semantic expansion β€” capturing thematic context
that exact-match TF-IDF misses.
"""
def __init__(self, corpus: list, n_topics: int = 10, random_state: int = 42):
self.corpus = corpus
self.n_topics = n_topics
self.random_state = random_state
self.vectorizer = None # CountVectorizer (raw TF)
self.lda = None # LatentDirichletAllocation
self.doc_topic_matrix = None # (N_en, n_topics)
self.topic_words = {} # topic_id β†’ list[str]
self.en_doc_ids = []
self.en_docs = []
self._is_fitted = False
# ------------------------------------------------------------------
def fit(self):
"""Fit LDA on English documents using raw term counts."""
self.en_docs = [d for d in self.corpus if d.get('language') == 'en']
self.en_doc_ids = [d['doc_id'] for d in self.en_docs]
if not self.en_docs:
raise ValueError("No English documents found.")
texts = [f"{d['title']}. {d['text']}" for d in self.en_docs]
# CountVectorizer β€” LDA assumes multinomial (count) input
self.vectorizer = CountVectorizer(
max_features=3000,
stop_words='english',
min_df=2,
ngram_range=(1, 1),
)
count_matrix = self.vectorizer.fit_transform(texts)
self.lda = LatentDirichletAllocation(
n_components=self.n_topics,
random_state=self.random_state,
max_iter=30,
learning_method='online',
learning_offset=10.0,
)
self.doc_topic_matrix = self.lda.fit_transform(count_matrix) # (N, n_topics)
feature_names = np.array(self.vectorizer.get_feature_names_out())
for tid, topic_dist in enumerate(self.lda.components_):
top_idx = topic_dist.argsort()[-12:][::-1]
self.topic_words[tid] = list(feature_names[top_idx])
self._is_fitted = True
self._print_topics()
# ------------------------------------------------------------------
def _print_topics(self):
print(f"\nLDA Topic Modelling Summary")
print(f" English docs : {len(self.en_docs)}")
print(f" Topics (k) : {self.n_topics}")
for tid, words in self.topic_words.items():
print(f" Topic {tid:2d}: {', '.join(words[:6])}")
# ------------------------------------------------------------------
def get_dominant_topic(self, query: str) -> int:
"""Return the dominant topic index for a query."""
if not self._is_fitted:
raise RuntimeError("Call fit() first.")
q_vec = self.vectorizer.transform([query])
topic_dist = self.lda.transform(q_vec)[0]
return int(np.argmax(topic_dist))
def get_topic_distribution(self, query: str) -> np.ndarray:
"""Return full topic probability vector for a query (sums to 1)."""
if not self._is_fitted:
raise RuntimeError("Call fit() first.")
return self.lda.transform(self.vectorizer.transform([query]))[0]
def expand_query(self, query: str, top_n: int = 5) -> list:
"""Return LDA topic words not already present in the query."""
tid = self.get_dominant_topic(query)
qtoks = set(query.lower().split())
return [w for w in self.topic_words.get(tid, []) if w not in qtoks][:top_n]
# ------------------------------------------------------------------
def get_topic_summary(self) -> pd.DataFrame:
"""DataFrame: topic_id, perplexity-weighted top words, representative docs."""
if not self._is_fitted:
raise RuntimeError("Call fit() first.")
rows = []
for tid, words in self.topic_words.items():
top_doc_idx = np.argsort(self.doc_topic_matrix[:, tid])[-3:][::-1]
sample_docs = ', '.join(self.en_docs[i]['title'] for i in top_doc_idx)
rows.append({
'topic_id': tid,
'top_words': ', '.join(words[:8]),
'sample_docs': sample_docs,
})
return pd.DataFrame(rows)
def plot_topic_heatmap(self, save_path: str = None):
"""Heatmap of per-document dominant topic assignments."""
if not self._is_fitted:
raise RuntimeError("Call fit() first.")
dominant = np.argmax(self.doc_topic_matrix, axis=1)
counts = np.bincount(dominant, minlength=self.n_topics)
fig, ax = plt.subplots(figsize=(10, 4))
im = ax.imshow(
self.doc_topic_matrix.T,
aspect='auto', cmap='YlOrRd', interpolation='nearest',
)
plt.colorbar(im, ax=ax, label='Topic probability')
ax.set_xlabel("Document index"); ax.set_ylabel("Topic ID")
ax.set_title("LDA Document-Topic Matrix (English corpus)")
plt.tight_layout()
if save_path:
fig.savefig(save_path, dpi=150)
else:
plt.show()
plt.close(fig)
return counts
# ══════════════════════════════════════════════════════════════════════════════
# 3 Β· KEYPHRASE EXTRACTION (TF-IDF per document)
# ══════════════════════════════════════════════════════════════════════════════
class KeyphraseExtractor:
"""
TF-IDF based keyphrase extraction.
Builds a corpus-wide TF-IDF model. For each document the highest-weighted
terms are its keyphrases. For query expansion, the query is projected into
the same TF-IDF space and its top terms (not already in the query) are
returned β€” focusing retrieval on the most discriminative vocabulary.
"""
def __init__(self, corpus: list):
self.corpus = corpus
self.vectorizer = None # TfidfVectorizer (whole corpus)
self.tfidf_matrix = None # sparse (N_docs, vocab)
self.doc_ids = []
self.doc_id_to_idx = {}
self._is_fitted = False
# ------------------------------------------------------------------
def fit(self):
"""Fit TF-IDF vectorizer on the full multilingual corpus."""
texts = [f"{d['title']}. {d['text']}" for d in self.corpus]
self.doc_ids = [d['doc_id'] for d in self.corpus]
self.doc_id_to_idx = {did: i for i, did in enumerate(self.doc_ids)}
self.vectorizer = TfidfVectorizer(
max_features=8000,
ngram_range=(1, 2),
sublinear_tf=True,
min_df=2,
)
self.tfidf_matrix = self.vectorizer.fit_transform(texts)
self._is_fitted = True
print(f"KeyphraseExtractor fitted on {len(self.corpus)} documents "
f"(vocab {self.tfidf_matrix.shape[1]:,})")
# ------------------------------------------------------------------
def extract_keyphrases(self, doc_id: str, n: int = 5) -> list:
"""Return top-n keyphrases for a single document."""
if not self._is_fitted:
raise RuntimeError("Call fit() first.")
idx = self.doc_id_to_idx.get(doc_id)
if idx is None:
return []
feature_names = np.array(self.vectorizer.get_feature_names_out())
row = np.asarray(self.tfidf_matrix[idx].todense()).flatten()
top_idx = row.argsort()[-n:][::-1]
return [feature_names[i] for i in top_idx if row[i] > 0]
def expand_query(self, query: str, top_n: int = 5) -> list:
"""
Transform the query string through the corpus TF-IDF model and return
top-n high-IDF terms not already in the query as expansion tokens.
"""
if not self._is_fitted:
raise RuntimeError("Call fit() first.")
q_vec = self.vectorizer.transform([query])
feature_names = np.array(self.vectorizer.get_feature_names_out())
row = np.asarray(q_vec.todense()).flatten()
top_idx = row.argsort()[-top_n * 3:][::-1]
qtoks = set(query.lower().split())
result = []
for i in top_idx:
if row[i] > 0:
term = feature_names[i]
if term not in qtoks:
result.append(term)
if len(result) >= top_n:
break
return result
# ------------------------------------------------------------------
def get_keyphrases_batch(self, n: int = 5) -> pd.DataFrame:
"""Return a DataFrame of doc_id, title, language, keyphrases for all docs."""
if not self._is_fitted:
raise RuntimeError("Call fit() first.")
rows = []
for doc in self.corpus:
kp = self.extract_keyphrases(doc['doc_id'], n=n)
rows.append({
'doc_id': doc['doc_id'],
'title': doc['title'],
'language': doc.get('language', ''),
'keyphrases': ', '.join(kp),
})
return pd.DataFrame(rows)
# ══════════════════════════════════════════════════════════════════════════════
# 4 Β· TEXT MINING COMPARATOR (benchmark all expansion strategies)
# ══════════════════════════════════════════════════════════════════════════════
def compare_text_mining_methods(
engine,
clusterer: DocumentClusterer,
topic_modeller: TopicModeller,
kp_extractor: KeyphraseExtractor,
queries: list,
k: int = 10,
) -> pd.DataFrame:
"""
Benchmark four text mining strategies against the official MIRACL qrels.
For each strategy the engine is asked to retrieve top-k documents for every
evaluation query. Average Precision (AP) and nDCG@k are computed, then
averaged across queries β†’ MAP and mean nDCG.
Parameters
----------
engine : fitted SearchEngine instance
clusterer : fitted DocumentClusterer
topic_modeller : fitted TopicModeller
kp_extractor : fitted KeyphraseExtractor
queries : list of eval query dicts (with 'relevant_corpus_ids')
k : rank cutoff
Returns
-------
DataFrame with columns: Method, MAP, nDCG@k, Ξ”_MAP (vs baseline)
"""
methods = {
'Baseline (no expansion)': None,
'Clustering (KMeans+LSA)': clusterer,
'Topic Modelling (LDA)': topic_modeller,
'Keyphrase Extraction (TF-IDF)': kp_extractor,
}
summary = {}
for method_name, expander in methods.items():
ap_list = []
ndcg_list = []
for q in queries:
relevant = set(q.get('relevant_corpus_ids', []))
if not relevant:
continue
lang = q.get('language', 'en')
query_text = q['query']
exp_terms = []
if expander is not None:
try:
exp_terms = expander.expand_query(query_text, top_n=5)
except Exception:
exp_terms = []
res_list = engine.search(
query_text,
language=lang,
top_k=k,
expanded_terms=exp_terms or None,
)
retrieved = [r['doc_id'] for r in res_list]
# AP
hits = 0
prec_sum = 0.0
for rank, did in enumerate(retrieved, 1):
if did in relevant:
hits += 1
prec_sum += hits / rank
ap = prec_sum / len(relevant)
ap_list.append(ap)
# nDCG@k
dcg = sum(1.0 / np.log2(r + 1)
for r, did in enumerate(retrieved, 1) if did in relevant)
ideal_dcg = sum(1.0 / np.log2(i + 2) for i in range(min(len(relevant), k)))
ndcg = dcg / ideal_dcg if ideal_dcg > 0 else 0.0
ndcg_list.append(ndcg)
summary[method_name] = {
'MAP': float(np.mean(ap_list)) if ap_list else 0.0,
f'nDCG@{k}': float(np.mean(ndcg_list)) if ndcg_list else 0.0,
}
# Build DataFrame with Ξ” MAP relative to baseline
baseline_map = summary['Baseline (no expansion)']['MAP']
rows = []
for method, metrics in summary.items():
delta = metrics['MAP'] - baseline_map
rows.append({
'Method': method,
'MAP': round(metrics['MAP'], 4),
f'nDCG@{k}': round(metrics[f'nDCG@{k}'], 4),
'Ξ” MAP': round(delta, 4),
})
return pd.DataFrame(rows)
# ══════════════════════════════════════════════════════════════════════════════
# Legacy helper (kept for backwards compat)
# ══════════════════════════════════════════════════════════════════════════════
def analyze_query_expansion(search_engine, clusterer, test_queries: list) -> pd.DataFrame:
"""
Compare mean top-5 retrieval score with / without cluster-based expansion.
Returns DataFrame: query, original_score, expanded_score, expansion_terms, improvement_%
"""
rows = []
for query in test_queries:
orig = search_engine.search(query, top_k=5)
o_sc = float(np.mean([r['score'] for r in orig])) if orig else 0.0
exp_terms = clusterer.expand_query(query, top_n=5)
exp_query = query + ' ' + ' '.join(exp_terms) if exp_terms else query
expanded = search_engine.search(exp_query, top_k=5)
e_sc = float(np.mean([r['score'] for r in expanded])) if expanded else 0.0
impr = 100.0 * (e_sc - o_sc) / o_sc if o_sc > 0 else 0.0
rows.append({
'query': query,
'original_score': round(o_sc, 4),
'expanded_score': round(e_sc, 4),
'expansion_terms': ', '.join(exp_terms),
'improvement_%': round(impr, 2),
})
return pd.DataFrame(rows)