""" Модуль для кластеризации текстовых данных. Реализует все основные классические методы кластеризации: - Центроидные: k-Means, Mini-Batch k-Means, Spherical k-Means - Плотностные: DBSCAN, HDBSCAN - Иерархические: агломеративная кластеризация - Вероятностные: Gaussian Mixture Models, LDA - Графовые: спектральная кластеризация """ from __future__ import annotations import time from dataclasses import dataclass from typing import List, Dict, Any, Optional, Tuple import numpy as np import pandas as pd from sklearn.cluster import ( KMeans, MiniBatchKMeans, DBSCAN, AgglomerativeClustering, SpectralClustering ) from sklearn.mixture import GaussianMixture from sklearn.metrics import ( silhouette_score, calinski_harabasz_score, davies_bouldin_score, adjusted_rand_score, normalized_mutual_info_score, v_measure_score ) try: import hdbscan HDBSCAN_AVAILABLE = True except ImportError: HDBSCAN_AVAILABLE = False print("⚠️ hdbscan не установлен. HDBSCAN недоступен. Установите: pip install hdbscan") try: from gensim.models import LdaModel from gensim.corpora import Dictionary GENSIM_AVAILABLE = True except ImportError: GENSIM_AVAILABLE = False print("⚠️ gensim не установлен. LDA недоступен.") @dataclass class ClusteringConfig: """Конфигурация алгоритма кластеризации.""" method: str # kmeans, minibatch_kmeans, spherical_kmeans, dbscan, hdbscan, # agglomerative, gmm, lda, spectral n_clusters: Optional[int] = None # Для методов, требующих число кластеров random_state: int = 42 # Специфичные параметры eps: float = 0.5 # Для DBSCAN min_samples: int = 5 # Для DBSCAN/HDBSCAN linkage: str = "ward" # Для Agglomerative metric: str = "euclidean" # Для Agglomerative n_components: int = 10 # Для LDA n_neighbors: int = 10 # Для Spectral class ClusteringAlgorithms: """Класс для работы с алгоритмами кластеризации.""" def __init__(self, config: ClusteringConfig): self.config = config self.model = self._create_model() self.labels_ = None self.fit_time = 0.0 self.predict_time = 0.0 def _create_model(self): """Создает модель кластеризации.""" method = self.config.method.lower() if method == "kmeans": if self.config.n_clusters is None: raise ValueError("Для k-Means требуется n_clusters") return KMeans( n_clusters=self.config.n_clusters, random_state=self.config.random_state, n_init=10 ) elif method == "minibatch_kmeans": if self.config.n_clusters is None: raise ValueError("Для Mini-Batch k-Means требуется n_clusters") return MiniBatchKMeans( n_clusters=self.config.n_clusters, random_state=self.config.random_state, n_init=3, batch_size=256 ) elif method == "spherical_kmeans": # Spherical k-Means через k-Means с нормализацией if self.config.n_clusters is None: raise ValueError("Для Spherical k-Means требуется n_clusters") return KMeans( n_clusters=self.config.n_clusters, random_state=self.config.random_state, n_init=10 ) elif method == "dbscan": return DBSCAN( eps=self.config.eps, min_samples=self.config.min_samples, metric='cosine' # Для текстов обычно используется cosine ) elif method == "hdbscan": if not HDBSCAN_AVAILABLE: raise ImportError("hdbscan не установлен. Установите: pip install hdbscan") return hdbscan.HDBSCAN( min_cluster_size=self.config.min_samples, metric='euclidean', cluster_selection_method='eom' ) elif method == "agglomerative": if self.config.n_clusters is None: raise ValueError("Для Agglomerative требуется n_clusters") return AgglomerativeClustering( n_clusters=self.config.n_clusters, linkage=self.config.linkage, metric=self.config.metric ) elif method == "gmm": if self.config.n_clusters is None: raise ValueError("Для GMM требуется n_clusters") return GaussianMixture( n_components=self.config.n_clusters, random_state=self.config.random_state, max_iter=100 ) elif method == "spectral": if self.config.n_clusters is None: raise ValueError("Для Spectral требуется n_clusters") return SpectralClustering( n_clusters=self.config.n_clusters, random_state=self.config.random_state, affinity='nearest_neighbors', n_neighbors=self.config.n_neighbors ) elif method == "lda": # LDA обрабатывается отдельно, так как это тематическая модель return None else: raise ValueError(f"Неизвестный метод кластеризации: {method}") def fit(self, X: np.ndarray): """Обучение модели кластеризации.""" start = time.time() # Spherical k-Means требует нормализации if self.config.method.lower() == "spherical_kmeans": from sklearn.preprocessing import normalize X = normalize(X, norm='l2') # Для DBSCAN/HDBSCAN с cosine метрикой также нормализуем if self.config.method.lower() in ["dbscan", "hdbscan"] and self.config.metric == "cosine": from sklearn.preprocessing import normalize X = normalize(X, norm='l2') if self.config.method.lower() == "lda": # LDA обрабатывается отдельно raise NotImplementedError("LDA используйте метод fit_lda") self.model.fit(X) if hasattr(self.model, 'labels_'): self.labels_ = self.model.labels_ elif hasattr(self.model, 'predict'): self.labels_ = self.model.predict(X) else: raise ValueError("Модель не вернула метки кластеров") self.fit_time = time.time() - start return self def fit_lda(self, texts: List[str], dictionary: Optional[Any] = None): """ Обучение LDA модели для кластеризации по темам. Args: texts: Список текстов (уже токенизированных) dictionary: Gensim Dictionary (опционально) """ if not GENSIM_AVAILABLE: raise ImportError("gensim не установлен. Установите: pip install gensim") if self.config.n_clusters is None: raise ValueError("Для LDA требуется n_clusters (число тем)") from gensim.utils import simple_preprocess # Токенизация, если нужно tokenized_texts = [] for text in texts: if isinstance(text, str): tokens = simple_preprocess(text, deacc=False, min_len=1) else: tokens = text tokenized_texts.append(tokens) # Создаем словарь if dictionary is None: dictionary = Dictionary(tokenized_texts) dictionary.filter_extremes(no_below=2, no_above=0.5) # Создаем корпус corpus = [dictionary.doc2bow(text) for text in tokenized_texts] # Обучаем LDA start = time.time() lda_model = LdaModel( corpus=corpus, num_topics=self.config.n_clusters, id2word=dictionary, random_state=self.config.random_state, passes=10, alpha='auto', per_word_topics=True ) self.fit_time = time.time() - start # Получаем метки кластеров (темы) для каждого документа self.labels_ = [] for doc in corpus: topic_dist = lda_model.get_document_topics(doc, minimum_probability=0.0) # Берем тему с максимальной вероятностью best_topic = max(topic_dist, key=lambda x: x[1])[0] self.labels_.append(best_topic) self.labels_ = np.array(self.labels_) self.model = lda_model self.dictionary = dictionary return self def predict(self, X: np.ndarray): """Предсказание кластеров для новых данных.""" start = time.time() if self.config.method.lower() == "lda": raise NotImplementedError("LDA predict требует отдельной реализации") if hasattr(self.model, 'predict'): predictions = self.model.predict(X) else: # Для DBSCAN и некоторых других методов predictions = self.model.fit_predict(X) self.predict_time = time.time() - start return predictions def evaluate_clustering(X: np.ndarray, labels: np.ndarray, y_true: Optional[np.ndarray] = None) -> Dict[str, float]: """ Оценка качества кластеризации. Args: X: Признаки labels: Предсказанные метки кластеров y_true: Истинные метки (опционально, для внешних метрик) Returns: Словарь с метриками """ metrics = {} # Внутренние метрики # Удаляем шумовые точки (-1) для метрик valid_mask = labels != -1 if valid_mask.sum() > 1: X_valid = X[valid_mask] labels_valid = labels[valid_mask] if len(np.unique(labels_valid)) > 1: metrics["silhouette"] = silhouette_score(X_valid, labels_valid) metrics["calinski_harabasz"] = calinski_harabasz_score(X_valid, labels_valid) metrics["davies_bouldin"] = davies_bouldin_score(X_valid, labels_valid) else: metrics["silhouette"] = -1.0 metrics["calinski_harabasz"] = 0.0 metrics["davies_bouldin"] = np.inf # Внешние метрики (если есть истинные метки) if y_true is not None: metrics["adjusted_rand_index"] = adjusted_rand_score(y_true, labels) metrics["normalized_mutual_info"] = normalized_mutual_info_score(y_true, labels) metrics["v_measure"] = v_measure_score(y_true, labels) # Статистика кластеров unique_labels, counts = np.unique(labels, return_counts=True) metrics["n_clusters"] = len(unique_labels[unique_labels != -1]) # Исключаем шум metrics["n_noise"] = (labels == -1).sum() if -1 in labels else 0 metrics["avg_cluster_size"] = counts[unique_labels != -1].mean() if len(counts[unique_labels != -1]) > 0 else 0 return metrics def compare_clustering_methods(X: np.ndarray, configs: List[ClusteringConfig], y_true: Optional[np.ndarray] = None) -> pd.DataFrame: """ Сравнение нескольких методов кластеризации. Args: X: Признаки configs: Список конфигураций y_true: Истинные метки (опционально) Returns: DataFrame с результатами сравнения """ results = [] for cfg in configs: try: clusterer = ClusteringAlgorithms(cfg) if cfg.method.lower() == "lda": # LDA требует тексты, пропускаем в этой функции continue clusterer.fit(X) metrics = evaluate_clustering(X, clusterer.labels_, y_true) result = { "Метод": cfg.method, "Число кластеров": metrics.get("n_clusters", cfg.n_clusters), "Шумовые точки": metrics.get("n_noise", 0), "Silhouette": round(metrics.get("silhouette", -1), 4), "Calinski-Harabasz": round(metrics.get("calinski_harabasz", 0), 4), "Davies-Bouldin": round(metrics.get("davies_bouldin", np.inf), 4), "Время обучения (с)": round(clusterer.fit_time, 2), } if y_true is not None: result["ARI"] = round(metrics.get("adjusted_rand_index", 0), 4) result["NMI"] = round(metrics.get("normalized_mutual_info", 0), 4) result["V-measure"] = round(metrics.get("v_measure", 0), 4) results.append(result) except Exception as e: print(f"Ошибка при кластеризации методом {cfg.method}: {e}") results.append({ "Метод": cfg.method, "Ошибка": str(e) }) return pd.DataFrame(results) if __name__ == "__main__": # Тестирование from sklearn.datasets import make_blobs X, y = make_blobs(n_samples=300, centers=4, random_state=42) configs = [ ClusteringConfig(method="kmeans", n_clusters=4), ClusteringConfig(method="dbscan", eps=0.5, min_samples=5), ClusteringConfig(method="agglomerative", n_clusters=4, linkage="ward"), ] if HDBSCAN_AVAILABLE: configs.append(ClusteringConfig(method="hdbscan", min_samples=5)) results_df = compare_clustering_methods(X, configs, y_true=y) print(results_df)