NLP_Homework_1 / src /clustering.py
Kolesnikov Dmitry
feat: Попытка навайбкодить 3 и 4 лабораторные
68545bc
"""
Модуль для кластеризации текстовых данных.
Реализует все основные классические методы кластеризации:
- Центроидные: k-Means, Mini-Batch k-Means, Spherical k-Means
- Плотностные: DBSCAN, HDBSCAN
- Иерархические: агломеративная кластеризация
- Вероятностные: Gaussian Mixture Models, LDA
- Графовые: спектральная кластеризация
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
import pandas as pd
from sklearn.cluster import (
KMeans, MiniBatchKMeans, DBSCAN, AgglomerativeClustering,
SpectralClustering
)
from sklearn.mixture import GaussianMixture
from sklearn.metrics import (
silhouette_score, calinski_harabasz_score, davies_bouldin_score,
adjusted_rand_score, normalized_mutual_info_score, v_measure_score
)
try:
import hdbscan
HDBSCAN_AVAILABLE = True
except ImportError:
HDBSCAN_AVAILABLE = False
print("⚠️ hdbscan не установлен. HDBSCAN недоступен. Установите: pip install hdbscan")
try:
from gensim.models import LdaModel
from gensim.corpora import Dictionary
GENSIM_AVAILABLE = True
except ImportError:
GENSIM_AVAILABLE = False
print("⚠️ gensim не установлен. LDA недоступен.")
@dataclass
class ClusteringConfig:
"""Конфигурация алгоритма кластеризации."""
method: str # kmeans, minibatch_kmeans, spherical_kmeans, dbscan, hdbscan,
# agglomerative, gmm, lda, spectral
n_clusters: Optional[int] = None # Для методов, требующих число кластеров
random_state: int = 42
# Специфичные параметры
eps: float = 0.5 # Для DBSCAN
min_samples: int = 5 # Для DBSCAN/HDBSCAN
linkage: str = "ward" # Для Agglomerative
metric: str = "euclidean" # Для Agglomerative
n_components: int = 10 # Для LDA
n_neighbors: int = 10 # Для Spectral
class ClusteringAlgorithms:
"""Класс для работы с алгоритмами кластеризации."""
def __init__(self, config: ClusteringConfig):
self.config = config
self.model = self._create_model()
self.labels_ = None
self.fit_time = 0.0
self.predict_time = 0.0
def _create_model(self):
"""Создает модель кластеризации."""
method = self.config.method.lower()
if method == "kmeans":
if self.config.n_clusters is None:
raise ValueError("Для k-Means требуется n_clusters")
return KMeans(
n_clusters=self.config.n_clusters,
random_state=self.config.random_state,
n_init=10
)
elif method == "minibatch_kmeans":
if self.config.n_clusters is None:
raise ValueError("Для Mini-Batch k-Means требуется n_clusters")
return MiniBatchKMeans(
n_clusters=self.config.n_clusters,
random_state=self.config.random_state,
n_init=3,
batch_size=256
)
elif method == "spherical_kmeans":
# Spherical k-Means через k-Means с нормализацией
if self.config.n_clusters is None:
raise ValueError("Для Spherical k-Means требуется n_clusters")
return KMeans(
n_clusters=self.config.n_clusters,
random_state=self.config.random_state,
n_init=10
)
elif method == "dbscan":
return DBSCAN(
eps=self.config.eps,
min_samples=self.config.min_samples,
metric='cosine' # Для текстов обычно используется cosine
)
elif method == "hdbscan":
if not HDBSCAN_AVAILABLE:
raise ImportError("hdbscan не установлен. Установите: pip install hdbscan")
return hdbscan.HDBSCAN(
min_cluster_size=self.config.min_samples,
metric='euclidean',
cluster_selection_method='eom'
)
elif method == "agglomerative":
if self.config.n_clusters is None:
raise ValueError("Для Agglomerative требуется n_clusters")
return AgglomerativeClustering(
n_clusters=self.config.n_clusters,
linkage=self.config.linkage,
metric=self.config.metric
)
elif method == "gmm":
if self.config.n_clusters is None:
raise ValueError("Для GMM требуется n_clusters")
return GaussianMixture(
n_components=self.config.n_clusters,
random_state=self.config.random_state,
max_iter=100
)
elif method == "spectral":
if self.config.n_clusters is None:
raise ValueError("Для Spectral требуется n_clusters")
return SpectralClustering(
n_clusters=self.config.n_clusters,
random_state=self.config.random_state,
affinity='nearest_neighbors',
n_neighbors=self.config.n_neighbors
)
elif method == "lda":
# LDA обрабатывается отдельно, так как это тематическая модель
return None
else:
raise ValueError(f"Неизвестный метод кластеризации: {method}")
def fit(self, X: np.ndarray):
"""Обучение модели кластеризации."""
start = time.time()
# Spherical k-Means требует нормализации
if self.config.method.lower() == "spherical_kmeans":
from sklearn.preprocessing import normalize
X = normalize(X, norm='l2')
# Для DBSCAN/HDBSCAN с cosine метрикой также нормализуем
if self.config.method.lower() in ["dbscan", "hdbscan"] and self.config.metric == "cosine":
from sklearn.preprocessing import normalize
X = normalize(X, norm='l2')
if self.config.method.lower() == "lda":
# LDA обрабатывается отдельно
raise NotImplementedError("LDA используйте метод fit_lda")
self.model.fit(X)
if hasattr(self.model, 'labels_'):
self.labels_ = self.model.labels_
elif hasattr(self.model, 'predict'):
self.labels_ = self.model.predict(X)
else:
raise ValueError("Модель не вернула метки кластеров")
self.fit_time = time.time() - start
return self
def fit_lda(self, texts: List[str], dictionary: Optional[Any] = None):
"""
Обучение LDA модели для кластеризации по темам.
Args:
texts: Список текстов (уже токенизированных)
dictionary: Gensim Dictionary (опционально)
"""
if not GENSIM_AVAILABLE:
raise ImportError("gensim не установлен. Установите: pip install gensim")
if self.config.n_clusters is None:
raise ValueError("Для LDA требуется n_clusters (число тем)")
from gensim.utils import simple_preprocess
# Токенизация, если нужно
tokenized_texts = []
for text in texts:
if isinstance(text, str):
tokens = simple_preprocess(text, deacc=False, min_len=1)
else:
tokens = text
tokenized_texts.append(tokens)
# Создаем словарь
if dictionary is None:
dictionary = Dictionary(tokenized_texts)
dictionary.filter_extremes(no_below=2, no_above=0.5)
# Создаем корпус
corpus = [dictionary.doc2bow(text) for text in tokenized_texts]
# Обучаем LDA
start = time.time()
lda_model = LdaModel(
corpus=corpus,
num_topics=self.config.n_clusters,
id2word=dictionary,
random_state=self.config.random_state,
passes=10,
alpha='auto',
per_word_topics=True
)
self.fit_time = time.time() - start
# Получаем метки кластеров (темы) для каждого документа
self.labels_ = []
for doc in corpus:
topic_dist = lda_model.get_document_topics(doc, minimum_probability=0.0)
# Берем тему с максимальной вероятностью
best_topic = max(topic_dist, key=lambda x: x[1])[0]
self.labels_.append(best_topic)
self.labels_ = np.array(self.labels_)
self.model = lda_model
self.dictionary = dictionary
return self
def predict(self, X: np.ndarray):
"""Предсказание кластеров для новых данных."""
start = time.time()
if self.config.method.lower() == "lda":
raise NotImplementedError("LDA predict требует отдельной реализации")
if hasattr(self.model, 'predict'):
predictions = self.model.predict(X)
else:
# Для DBSCAN и некоторых других методов
predictions = self.model.fit_predict(X)
self.predict_time = time.time() - start
return predictions
def evaluate_clustering(X: np.ndarray, labels: np.ndarray,
y_true: Optional[np.ndarray] = None) -> Dict[str, float]:
"""
Оценка качества кластеризации.
Args:
X: Признаки
labels: Предсказанные метки кластеров
y_true: Истинные метки (опционально, для внешних метрик)
Returns:
Словарь с метриками
"""
metrics = {}
# Внутренние метрики
# Удаляем шумовые точки (-1) для метрик
valid_mask = labels != -1
if valid_mask.sum() > 1:
X_valid = X[valid_mask]
labels_valid = labels[valid_mask]
if len(np.unique(labels_valid)) > 1:
metrics["silhouette"] = silhouette_score(X_valid, labels_valid)
metrics["calinski_harabasz"] = calinski_harabasz_score(X_valid, labels_valid)
metrics["davies_bouldin"] = davies_bouldin_score(X_valid, labels_valid)
else:
metrics["silhouette"] = -1.0
metrics["calinski_harabasz"] = 0.0
metrics["davies_bouldin"] = np.inf
# Внешние метрики (если есть истинные метки)
if y_true is not None:
metrics["adjusted_rand_index"] = adjusted_rand_score(y_true, labels)
metrics["normalized_mutual_info"] = normalized_mutual_info_score(y_true, labels)
metrics["v_measure"] = v_measure_score(y_true, labels)
# Статистика кластеров
unique_labels, counts = np.unique(labels, return_counts=True)
metrics["n_clusters"] = len(unique_labels[unique_labels != -1]) # Исключаем шум
metrics["n_noise"] = (labels == -1).sum() if -1 in labels else 0
metrics["avg_cluster_size"] = counts[unique_labels != -1].mean() if len(counts[unique_labels != -1]) > 0 else 0
return metrics
def compare_clustering_methods(X: np.ndarray,
configs: List[ClusteringConfig],
y_true: Optional[np.ndarray] = None) -> pd.DataFrame:
"""
Сравнение нескольких методов кластеризации.
Args:
X: Признаки
configs: Список конфигураций
y_true: Истинные метки (опционально)
Returns:
DataFrame с результатами сравнения
"""
results = []
for cfg in configs:
try:
clusterer = ClusteringAlgorithms(cfg)
if cfg.method.lower() == "lda":
# LDA требует тексты, пропускаем в этой функции
continue
clusterer.fit(X)
metrics = evaluate_clustering(X, clusterer.labels_, y_true)
result = {
"Метод": cfg.method,
"Число кластеров": metrics.get("n_clusters", cfg.n_clusters),
"Шумовые точки": metrics.get("n_noise", 0),
"Silhouette": round(metrics.get("silhouette", -1), 4),
"Calinski-Harabasz": round(metrics.get("calinski_harabasz", 0), 4),
"Davies-Bouldin": round(metrics.get("davies_bouldin", np.inf), 4),
"Время обучения (с)": round(clusterer.fit_time, 2),
}
if y_true is not None:
result["ARI"] = round(metrics.get("adjusted_rand_index", 0), 4)
result["NMI"] = round(metrics.get("normalized_mutual_info", 0), 4)
result["V-measure"] = round(metrics.get("v_measure", 0), 4)
results.append(result)
except Exception as e:
print(f"Ошибка при кластеризации методом {cfg.method}: {e}")
results.append({
"Метод": cfg.method,
"Ошибка": str(e)
})
return pd.DataFrame(results)
if __name__ == "__main__":
# Тестирование
from sklearn.datasets import make_blobs
X, y = make_blobs(n_samples=300, centers=4, random_state=42)
configs = [
ClusteringConfig(method="kmeans", n_clusters=4),
ClusteringConfig(method="dbscan", eps=0.5, min_samples=5),
ClusteringConfig(method="agglomerative", n_clusters=4, linkage="ward"),
]
if HDBSCAN_AVAILABLE:
configs.append(ClusteringConfig(method="hdbscan", min_samples=5))
results_df = compare_clustering_methods(X, configs, y_true=y)
print(results_df)