Spaces:
Sleeping
Sleeping
| """ | |
| Модуль для кластеризации текстовых данных. | |
| Реализует все основные классические методы кластеризации: | |
| - Центроидные: k-Means, Mini-Batch k-Means, Spherical k-Means | |
| - Плотностные: DBSCAN, HDBSCAN | |
| - Иерархические: агломеративная кластеризация | |
| - Вероятностные: Gaussian Mixture Models, LDA | |
| - Графовые: спектральная кластеризация | |
| """ | |
| from __future__ import annotations | |
| import time | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.cluster import ( | |
| KMeans, MiniBatchKMeans, DBSCAN, AgglomerativeClustering, | |
| SpectralClustering | |
| ) | |
| from sklearn.mixture import GaussianMixture | |
| from sklearn.metrics import ( | |
| silhouette_score, calinski_harabasz_score, davies_bouldin_score, | |
| adjusted_rand_score, normalized_mutual_info_score, v_measure_score | |
| ) | |
| try: | |
| import hdbscan | |
| HDBSCAN_AVAILABLE = True | |
| except ImportError: | |
| HDBSCAN_AVAILABLE = False | |
| print("⚠️ hdbscan не установлен. HDBSCAN недоступен. Установите: pip install hdbscan") | |
| try: | |
| from gensim.models import LdaModel | |
| from gensim.corpora import Dictionary | |
| GENSIM_AVAILABLE = True | |
| except ImportError: | |
| GENSIM_AVAILABLE = False | |
| print("⚠️ gensim не установлен. LDA недоступен.") | |
| class ClusteringConfig: | |
| """Конфигурация алгоритма кластеризации.""" | |
| method: str # kmeans, minibatch_kmeans, spherical_kmeans, dbscan, hdbscan, | |
| # agglomerative, gmm, lda, spectral | |
| n_clusters: Optional[int] = None # Для методов, требующих число кластеров | |
| random_state: int = 42 | |
| # Специфичные параметры | |
| eps: float = 0.5 # Для DBSCAN | |
| min_samples: int = 5 # Для DBSCAN/HDBSCAN | |
| linkage: str = "ward" # Для Agglomerative | |
| metric: str = "euclidean" # Для Agglomerative | |
| n_components: int = 10 # Для LDA | |
| n_neighbors: int = 10 # Для Spectral | |
| class ClusteringAlgorithms: | |
| """Класс для работы с алгоритмами кластеризации.""" | |
| def __init__(self, config: ClusteringConfig): | |
| self.config = config | |
| self.model = self._create_model() | |
| self.labels_ = None | |
| self.fit_time = 0.0 | |
| self.predict_time = 0.0 | |
| def _create_model(self): | |
| """Создает модель кластеризации.""" | |
| method = self.config.method.lower() | |
| if method == "kmeans": | |
| if self.config.n_clusters is None: | |
| raise ValueError("Для k-Means требуется n_clusters") | |
| return KMeans( | |
| n_clusters=self.config.n_clusters, | |
| random_state=self.config.random_state, | |
| n_init=10 | |
| ) | |
| elif method == "minibatch_kmeans": | |
| if self.config.n_clusters is None: | |
| raise ValueError("Для Mini-Batch k-Means требуется n_clusters") | |
| return MiniBatchKMeans( | |
| n_clusters=self.config.n_clusters, | |
| random_state=self.config.random_state, | |
| n_init=3, | |
| batch_size=256 | |
| ) | |
| elif method == "spherical_kmeans": | |
| # Spherical k-Means через k-Means с нормализацией | |
| if self.config.n_clusters is None: | |
| raise ValueError("Для Spherical k-Means требуется n_clusters") | |
| return KMeans( | |
| n_clusters=self.config.n_clusters, | |
| random_state=self.config.random_state, | |
| n_init=10 | |
| ) | |
| elif method == "dbscan": | |
| return DBSCAN( | |
| eps=self.config.eps, | |
| min_samples=self.config.min_samples, | |
| metric='cosine' # Для текстов обычно используется cosine | |
| ) | |
| elif method == "hdbscan": | |
| if not HDBSCAN_AVAILABLE: | |
| raise ImportError("hdbscan не установлен. Установите: pip install hdbscan") | |
| return hdbscan.HDBSCAN( | |
| min_cluster_size=self.config.min_samples, | |
| metric='euclidean', | |
| cluster_selection_method='eom' | |
| ) | |
| elif method == "agglomerative": | |
| if self.config.n_clusters is None: | |
| raise ValueError("Для Agglomerative требуется n_clusters") | |
| return AgglomerativeClustering( | |
| n_clusters=self.config.n_clusters, | |
| linkage=self.config.linkage, | |
| metric=self.config.metric | |
| ) | |
| elif method == "gmm": | |
| if self.config.n_clusters is None: | |
| raise ValueError("Для GMM требуется n_clusters") | |
| return GaussianMixture( | |
| n_components=self.config.n_clusters, | |
| random_state=self.config.random_state, | |
| max_iter=100 | |
| ) | |
| elif method == "spectral": | |
| if self.config.n_clusters is None: | |
| raise ValueError("Для Spectral требуется n_clusters") | |
| return SpectralClustering( | |
| n_clusters=self.config.n_clusters, | |
| random_state=self.config.random_state, | |
| affinity='nearest_neighbors', | |
| n_neighbors=self.config.n_neighbors | |
| ) | |
| elif method == "lda": | |
| # LDA обрабатывается отдельно, так как это тематическая модель | |
| return None | |
| else: | |
| raise ValueError(f"Неизвестный метод кластеризации: {method}") | |
| def fit(self, X: np.ndarray): | |
| """Обучение модели кластеризации.""" | |
| start = time.time() | |
| # Spherical k-Means требует нормализации | |
| if self.config.method.lower() == "spherical_kmeans": | |
| from sklearn.preprocessing import normalize | |
| X = normalize(X, norm='l2') | |
| # Для DBSCAN/HDBSCAN с cosine метрикой также нормализуем | |
| if self.config.method.lower() in ["dbscan", "hdbscan"] and self.config.metric == "cosine": | |
| from sklearn.preprocessing import normalize | |
| X = normalize(X, norm='l2') | |
| if self.config.method.lower() == "lda": | |
| # LDA обрабатывается отдельно | |
| raise NotImplementedError("LDA используйте метод fit_lda") | |
| self.model.fit(X) | |
| if hasattr(self.model, 'labels_'): | |
| self.labels_ = self.model.labels_ | |
| elif hasattr(self.model, 'predict'): | |
| self.labels_ = self.model.predict(X) | |
| else: | |
| raise ValueError("Модель не вернула метки кластеров") | |
| self.fit_time = time.time() - start | |
| return self | |
| def fit_lda(self, texts: List[str], dictionary: Optional[Any] = None): | |
| """ | |
| Обучение LDA модели для кластеризации по темам. | |
| Args: | |
| texts: Список текстов (уже токенизированных) | |
| dictionary: Gensim Dictionary (опционально) | |
| """ | |
| if not GENSIM_AVAILABLE: | |
| raise ImportError("gensim не установлен. Установите: pip install gensim") | |
| if self.config.n_clusters is None: | |
| raise ValueError("Для LDA требуется n_clusters (число тем)") | |
| from gensim.utils import simple_preprocess | |
| # Токенизация, если нужно | |
| tokenized_texts = [] | |
| for text in texts: | |
| if isinstance(text, str): | |
| tokens = simple_preprocess(text, deacc=False, min_len=1) | |
| else: | |
| tokens = text | |
| tokenized_texts.append(tokens) | |
| # Создаем словарь | |
| if dictionary is None: | |
| dictionary = Dictionary(tokenized_texts) | |
| dictionary.filter_extremes(no_below=2, no_above=0.5) | |
| # Создаем корпус | |
| corpus = [dictionary.doc2bow(text) for text in tokenized_texts] | |
| # Обучаем LDA | |
| start = time.time() | |
| lda_model = LdaModel( | |
| corpus=corpus, | |
| num_topics=self.config.n_clusters, | |
| id2word=dictionary, | |
| random_state=self.config.random_state, | |
| passes=10, | |
| alpha='auto', | |
| per_word_topics=True | |
| ) | |
| self.fit_time = time.time() - start | |
| # Получаем метки кластеров (темы) для каждого документа | |
| self.labels_ = [] | |
| for doc in corpus: | |
| topic_dist = lda_model.get_document_topics(doc, minimum_probability=0.0) | |
| # Берем тему с максимальной вероятностью | |
| best_topic = max(topic_dist, key=lambda x: x[1])[0] | |
| self.labels_.append(best_topic) | |
| self.labels_ = np.array(self.labels_) | |
| self.model = lda_model | |
| self.dictionary = dictionary | |
| return self | |
| def predict(self, X: np.ndarray): | |
| """Предсказание кластеров для новых данных.""" | |
| start = time.time() | |
| if self.config.method.lower() == "lda": | |
| raise NotImplementedError("LDA predict требует отдельной реализации") | |
| if hasattr(self.model, 'predict'): | |
| predictions = self.model.predict(X) | |
| else: | |
| # Для DBSCAN и некоторых других методов | |
| predictions = self.model.fit_predict(X) | |
| self.predict_time = time.time() - start | |
| return predictions | |
| def evaluate_clustering(X: np.ndarray, labels: np.ndarray, | |
| y_true: Optional[np.ndarray] = None) -> Dict[str, float]: | |
| """ | |
| Оценка качества кластеризации. | |
| Args: | |
| X: Признаки | |
| labels: Предсказанные метки кластеров | |
| y_true: Истинные метки (опционально, для внешних метрик) | |
| Returns: | |
| Словарь с метриками | |
| """ | |
| metrics = {} | |
| # Внутренние метрики | |
| # Удаляем шумовые точки (-1) для метрик | |
| valid_mask = labels != -1 | |
| if valid_mask.sum() > 1: | |
| X_valid = X[valid_mask] | |
| labels_valid = labels[valid_mask] | |
| if len(np.unique(labels_valid)) > 1: | |
| metrics["silhouette"] = silhouette_score(X_valid, labels_valid) | |
| metrics["calinski_harabasz"] = calinski_harabasz_score(X_valid, labels_valid) | |
| metrics["davies_bouldin"] = davies_bouldin_score(X_valid, labels_valid) | |
| else: | |
| metrics["silhouette"] = -1.0 | |
| metrics["calinski_harabasz"] = 0.0 | |
| metrics["davies_bouldin"] = np.inf | |
| # Внешние метрики (если есть истинные метки) | |
| if y_true is not None: | |
| metrics["adjusted_rand_index"] = adjusted_rand_score(y_true, labels) | |
| metrics["normalized_mutual_info"] = normalized_mutual_info_score(y_true, labels) | |
| metrics["v_measure"] = v_measure_score(y_true, labels) | |
| # Статистика кластеров | |
| unique_labels, counts = np.unique(labels, return_counts=True) | |
| metrics["n_clusters"] = len(unique_labels[unique_labels != -1]) # Исключаем шум | |
| metrics["n_noise"] = (labels == -1).sum() if -1 in labels else 0 | |
| metrics["avg_cluster_size"] = counts[unique_labels != -1].mean() if len(counts[unique_labels != -1]) > 0 else 0 | |
| return metrics | |
| def compare_clustering_methods(X: np.ndarray, | |
| configs: List[ClusteringConfig], | |
| y_true: Optional[np.ndarray] = None) -> pd.DataFrame: | |
| """ | |
| Сравнение нескольких методов кластеризации. | |
| Args: | |
| X: Признаки | |
| configs: Список конфигураций | |
| y_true: Истинные метки (опционально) | |
| Returns: | |
| DataFrame с результатами сравнения | |
| """ | |
| results = [] | |
| for cfg in configs: | |
| try: | |
| clusterer = ClusteringAlgorithms(cfg) | |
| if cfg.method.lower() == "lda": | |
| # LDA требует тексты, пропускаем в этой функции | |
| continue | |
| clusterer.fit(X) | |
| metrics = evaluate_clustering(X, clusterer.labels_, y_true) | |
| result = { | |
| "Метод": cfg.method, | |
| "Число кластеров": metrics.get("n_clusters", cfg.n_clusters), | |
| "Шумовые точки": metrics.get("n_noise", 0), | |
| "Silhouette": round(metrics.get("silhouette", -1), 4), | |
| "Calinski-Harabasz": round(metrics.get("calinski_harabasz", 0), 4), | |
| "Davies-Bouldin": round(metrics.get("davies_bouldin", np.inf), 4), | |
| "Время обучения (с)": round(clusterer.fit_time, 2), | |
| } | |
| if y_true is not None: | |
| result["ARI"] = round(metrics.get("adjusted_rand_index", 0), 4) | |
| result["NMI"] = round(metrics.get("normalized_mutual_info", 0), 4) | |
| result["V-measure"] = round(metrics.get("v_measure", 0), 4) | |
| results.append(result) | |
| except Exception as e: | |
| print(f"Ошибка при кластеризации методом {cfg.method}: {e}") | |
| results.append({ | |
| "Метод": cfg.method, | |
| "Ошибка": str(e) | |
| }) | |
| return pd.DataFrame(results) | |
| if __name__ == "__main__": | |
| # Тестирование | |
| from sklearn.datasets import make_blobs | |
| X, y = make_blobs(n_samples=300, centers=4, random_state=42) | |
| configs = [ | |
| ClusteringConfig(method="kmeans", n_clusters=4), | |
| ClusteringConfig(method="dbscan", eps=0.5, min_samples=5), | |
| ClusteringConfig(method="agglomerative", n_clusters=4, linkage="ward"), | |
| ] | |
| if HDBSCAN_AVAILABLE: | |
| configs.append(ClusteringConfig(method="hdbscan", min_samples=5)) | |
| results_df = compare_clustering_methods(X, configs, y_true=y) | |
| print(results_df) | |