Spaces:
Running
Running
File size: 8,305 Bytes
c78c2c1 affe2db c78c2c1 affe2db c78c2c1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | """
PhilVerify β LDA Topic Analysis + LDA Feature Classifier (Layer 1)
Two responsibilities:
1. run_topic_analysis(samples, n_topics)
Fits LDA on training texts, prints top-N words per topic and the dominant
topic distribution per class (Credible / Unverified / Likely Fake).
Call directly to explore what topics the model discovers.
2. LDAFeatureClassifier
Concatenates LDA topic distribution features with TF-IDF features and feeds
the combined vector into LogisticRegression. Same predict() interface as
TFIDFClassifier β slots directly into eval.py.
Usage:
python -m ml.lda_analysis # standalone topic analysis
python -m ml.eval # compare LDAFeatureClassifier against others
"""
import logging
import numpy as np
import scipy.sparse as sp
from ml.dataset import LABEL_NAMES, get_split
from ml.naive_bayes_classifier import _lemmatize_tokens
from ml.tfidf_classifier import Layer1Result
logger = logging.getLogger(__name__)
_LABELS = {0: "Credible", 1: "Unverified", 2: "Likely Fake"}
# Human-readable labels for each LDA topic (1-indexed).
# Assigned by inspecting run_topic_analysis() output on the 100-sample PH dataset.
TOPIC_LABELS: dict[int, str] = {
1: "Health & Conspiracy",
2: "Breaking News",
3: "Crime & Law",
4: "Politics & Government",
5: "Filipino Current Events",
}
# ββ Standalone topic analysis ββββββββββββββββββββββββββββββββββββββββββββββββββ
def run_topic_analysis(
samples,
n_topics: int = 5,
n_top_words: int = 10,
) -> None:
"""
Fit LDA on samples and print:
- Top-N words per topic
- Mean topic distribution per class label
"""
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.feature_extraction.text import CountVectorizer
texts = [s.text.lower() for s in samples]
labels = [s.label for s in samples]
# LDA requires raw counts (not TF-IDF)
vectorizer = CountVectorizer(max_features=500, stop_words="english")
X = vectorizer.fit_transform(texts)
vocab = vectorizer.get_feature_names_out()
lda = LatentDirichletAllocation(
n_components=n_topics, random_state=42, max_iter=30, learning_method="batch"
)
doc_topics = lda.fit_transform(X) # (n_samples, n_topics)
print(f"\n{'='*62}")
print(f" LDA Topic Analysis ({n_topics} topics, {len(samples)} samples)")
print(f"{'='*62}")
for i, topic_vec in enumerate(lda.components_):
top_idx = topic_vec.argsort()[-n_top_words:][::-1]
top_words = [vocab[j] for j in top_idx]
print(f"\n Topic {i + 1}: {', '.join(top_words)}")
print(f"\n Per-class dominant topics:")
for label_id, label_name in sorted(LABEL_NAMES.items()):
class_idx = [i for i, l in enumerate(labels) if l == label_id]
if not class_idx:
continue
mean_dist = doc_topics[class_idx].mean(axis=0)
top2 = mean_dist.argsort()[-2:][::-1]
topic_str = " ".join(f"T{d+1}:{mean_dist[d]:.2f}" for d in top2)
print(f" {label_name:<14} {topic_str}")
# ββ LDA Feature Classifier βββββββββββββββββββββββββββββββββββββββββββββββββββββ
class LDAFeatureClassifier:
"""
LDA topic distribution + TF-IDF features β LogisticRegression.
Feature vector = sparse_hstack([tfidf_features, lda_topic_distribution])
Args:
train_samples: list[Sample]. If None, uses the full 100-sample dataset.
n_topics: number of LDA topics (default 5).
lemmatize: apply WordNet lemmatization before vectorization.
"""
def __init__(self, train_samples=None, n_topics: int = 5, lemmatize: bool = False):
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.linear_model import LogisticRegression
self._lemmatize = lemmatize
self._n_topics = n_topics
if train_samples is None:
from ml.dataset import get_dataset
train_samples = get_dataset()
texts = [self._preprocess(s.text) for s in train_samples]
labels = [s.label for s in train_samples]
# TF-IDF part
self._tfidf = TfidfVectorizer(
ngram_range=(1, 2), max_features=1000, sublinear_tf=True
)
X_tfidf = self._tfidf.fit_transform(texts)
# LDA part (requires raw counts)
self._count_vec = CountVectorizer(max_features=500)
X_counts = self._count_vec.fit_transform(texts)
self._lda = LatentDirichletAllocation(
n_components=n_topics, random_state=42, max_iter=30, learning_method="batch"
)
X_lda = self._lda.fit_transform(X_counts) # dense (n_samples, n_topics)
# Combine: sparse TF-IDF + dense LDA β sparse
X_combined = sp.hstack([X_tfidf, sp.csr_matrix(X_lda)])
self._clf = LogisticRegression(max_iter=500, C=1.0, random_state=42)
self._clf.fit(X_combined, labels)
logger.info(
"LDAFeatureClassifier trained on %d samples (n_topics=%d, lemmatize=%s)",
len(texts), n_topics, lemmatize,
)
def _preprocess(self, text: str) -> str:
text = text.lower()
if self._lemmatize:
return " ".join(_lemmatize_tokens(text.split()))
return text
def predict(self, text: str) -> Layer1Result:
processed = self._preprocess(text)
X_tfidf = self._tfidf.transform([processed])
X_counts = self._count_vec.transform([processed])
X_lda = self._lda.transform(X_counts) # (1, n_topics)
X_combined = sp.hstack([X_tfidf, sp.csr_matrix(X_lda)])
pred_label = int(self._clf.predict(X_combined)[0])
proba = self._clf.predict_proba(X_combined)[0]
confidence = round(float(max(proba)) * 100, 1)
verdict = _LABELS[pred_label]
# Top TF-IDF features
feature_names = self._tfidf.get_feature_names_out()
tfidf_scores = X_tfidf.toarray()[0]
top_idx = tfidf_scores.argsort()[-4:][::-1]
triggered = [feature_names[i] for i in top_idx if tfidf_scores[i] > 0]
# Prepend dominant topic label
dominant_topic = int(X_lda[0].argmax()) + 1
triggered.insert(0, f"lda_topic_{dominant_topic}")
return Layer1Result(
verdict=verdict,
confidence=confidence,
triggered_features=triggered[:5],
)
def get_topic_info(self, text: str) -> dict:
"""
Infer the dominant LDA topic for a new text.
Returns label (human-assigned), top 6 defining words, and confidence
(the probability mass on the dominant topic, 0β100%).
"""
processed = self._preprocess(text)
X_counts = self._count_vec.transform([processed])
X_lda = self._lda.transform(X_counts) # (1, n_topics)
topic_idx = int(X_lda[0].argmax())
confidence = round(float(X_lda[0][topic_idx]) * 100, 1)
vocab = self._count_vec.get_feature_names_out()
topic_vec = self._lda.components_[topic_idx]
top_words = [vocab[i] for i in topic_vec.argsort()[-6:][::-1]]
label = TOPIC_LABELS.get(topic_idx + 1, f"Topic {topic_idx + 1}")
return {"label": label, "top_words": top_words, "confidence": confidence}
# ββ Direct run βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="LDA topic analysis on PhilVerify dataset")
parser.add_argument("--n-topics", type=int, default=5)
parser.add_argument("--n-top-words", type=int, default=10)
parser.add_argument("--seed", type=int, default=42)
args = parser.parse_args()
train_samples, _ = get_split(seed=args.seed)
run_topic_analysis(train_samples, n_topics=args.n_topics, n_top_words=args.n_top_words)
|