Spaces:
Sleeping
Sleeping
| """ | |
| Классические методы классификации текстов: логистическая регрессия, SVM, | |
| случайный лес, градиентный бустинг, ансамбли и AutoML подходы. | |
| """ | |
| from __future__ import annotations | |
| import time | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Any, Optional, Tuple, Union | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.svm import SVC | |
| from sklearn.ensemble import RandomForestClassifier, VotingClassifier, BaggingClassifier | |
| from sklearn.tree import DecisionTreeClassifier | |
| from sklearn.model_selection import cross_val_score, StratifiedKFold | |
| from sklearn.multioutput import MultiOutputClassifier | |
| from sklearn.multiclass import OneVsRestClassifier | |
| from sklearn.metrics import ( | |
| accuracy_score, precision_score, recall_score, f1_score, | |
| roc_auc_score, classification_report, confusion_matrix, | |
| precision_recall_curve, roc_curve | |
| ) | |
| try: | |
| import xgboost as xgb | |
| XGBOOST_AVAILABLE = True | |
| except ImportError: | |
| XGBOOST_AVAILABLE = False | |
| try: | |
| import lightgbm as lgb | |
| LIGHTGBM_AVAILABLE = True | |
| except ImportError: | |
| LIGHTGBM_AVAILABLE = False | |
| try: | |
| import catboost as cb | |
| CATBOOST_AVAILABLE = True | |
| except ImportError: | |
| CATBOOST_AVAILABLE = False | |
| try: | |
| import autosklearn.classification | |
| AUTOSKLEARN_AVAILABLE = True | |
| except ImportError: | |
| AUTOSKLEARN_AVAILABLE = False | |
| try: | |
| from tpot import TPOTClassifier | |
| TPOT_AVAILABLE = True | |
| except ImportError: | |
| TPOT_AVAILABLE = False | |
| try: | |
| import h2o | |
| from h2o.automl import H2OAutoML | |
| H2O_AVAILABLE = True | |
| except ImportError: | |
| H2O_AVAILABLE = False | |
| class ClassifierConfig: | |
| """Конфигурация классификатора.""" | |
| name: str | |
| model_type: str # lr, svm, rf, xgb, lgb, catboost, ensemble, autosklearn, tpot, h2o | |
| params: Dict[str, Any] = None | |
| use_class_weight: bool = True | |
| multilabel: bool = False # Использовать MultiOutputClassifier для multilabel | |
| class ClassicalClassifiers: | |
| """Класс для работы с классическими методами классификации.""" | |
| def __init__(self, config: ClassifierConfig): | |
| self.config = config | |
| self.model = self._create_model() | |
| self.train_time = 0.0 | |
| self.predict_time = 0.0 | |
| def _create_model(self): | |
| """Создает модель на основе конфигурации.""" | |
| model_type = self.config.model_type.lower() | |
| params = self.config.params or {} | |
| base_model = None | |
| if model_type == "lr": | |
| base_model = LogisticRegression( | |
| max_iter=1000, | |
| random_state=42, | |
| class_weight="balanced" if self.config.use_class_weight else None, | |
| **params | |
| ) | |
| elif model_type == "svm": | |
| base_model = SVC( | |
| probability=True, | |
| random_state=42, | |
| class_weight="balanced" if self.config.use_class_weight else None, | |
| **params | |
| ) | |
| elif model_type == "rf": | |
| base_model = RandomForestClassifier( | |
| n_estimators=100, | |
| random_state=42, | |
| class_weight="balanced" if self.config.use_class_weight else None, | |
| **params | |
| ) | |
| # Обертываем в MultiOutputClassifier для multilabel | |
| if self.config.multilabel and base_model is not None: | |
| return MultiOutputClassifier(base_model) | |
| if base_model is not None: | |
| return base_model | |
| if model_type == "xgb" and XGBOOST_AVAILABLE: | |
| model = xgb.XGBClassifier( | |
| random_state=42, | |
| eval_metric='mlogloss', | |
| **params | |
| ) | |
| return MultiOutputClassifier(model) if self.config.multilabel else model | |
| if model_type == "lgb" and LIGHTGBM_AVAILABLE: | |
| model = lgb.LGBMClassifier( | |
| random_state=42, | |
| verbose=-1, | |
| **params | |
| ) | |
| return MultiOutputClassifier(model) if self.config.multilabel else model | |
| if model_type == "catboost" and CATBOOST_AVAILABLE: | |
| model = cb.CatBoostClassifier( | |
| random_state=42, | |
| verbose=False, | |
| **params | |
| ) | |
| return MultiOutputClassifier(model) if self.config.multilabel else model | |
| if model_type == "ensemble": | |
| # Voting Classifier | |
| estimators = [ | |
| ('lr', LogisticRegression(max_iter=1000, random_state=42)), | |
| ('svm', SVC(probability=True, random_state=42)), | |
| ('rf', RandomForestClassifier(n_estimators=50, random_state=42)) | |
| ] | |
| model = VotingClassifier(estimators=estimators, voting='soft') | |
| return MultiOutputClassifier(model) if self.config.multilabel else model | |
| if model_type == "bagging": | |
| base = DecisionTreeClassifier(random_state=42) | |
| model = BaggingClassifier( | |
| base_estimator=base, | |
| n_estimators=10, | |
| random_state=42, | |
| **params | |
| ) | |
| return MultiOutputClassifier(model) if self.config.multilabel else model | |
| if model_type == "autosklearn" and AUTOSKLEARN_AVAILABLE: | |
| model = autosklearn.classification.AutoSklearnClassifier( | |
| time_left_for_this_task=300, # 5 минут | |
| memory_limit=4096, | |
| **params | |
| ) | |
| # AutoSklearn может не поддерживать multilabel напрямую | |
| return model | |
| if model_type == "tpot" and TPOT_AVAILABLE: | |
| model = TPOTClassifier( | |
| generations=5, | |
| population_size=20, | |
| verbosity=2, | |
| random_state=42, | |
| **params | |
| ) | |
| # TPOT может не поддерживать multilabel напрямую | |
| return model | |
| raise ValueError(f"Неизвестный тип модели: {model_type} или библиотека недоступна") | |
| def fit(self, X, y): | |
| """Обучение модели.""" | |
| start = time.time() | |
| self.model.fit(X, y) | |
| self.train_time = time.time() - start | |
| return self | |
| def predict(self, X): | |
| """Предсказание классов.""" | |
| start = time.time() | |
| predictions = self.model.predict(X) | |
| self.predict_time = time.time() - start | |
| return predictions | |
| def predict_proba(self, X): | |
| """Предсказание вероятностей.""" | |
| if hasattr(self.model, 'predict_proba'): | |
| return self.model.predict_proba(X) | |
| return None | |
| def get_feature_importance(self): | |
| """Получение важности признаков (если доступно).""" | |
| if hasattr(self.model, 'feature_importances_'): | |
| return self.model.feature_importances_ | |
| elif hasattr(self.model, 'coef_'): | |
| return np.abs(self.model.coef_[0]) if len(self.model.coef_.shape) > 1 else np.abs(self.model.coef_) | |
| return None | |
| def evaluate_classifier(y_true, y_pred, y_proba=None, | |
| task_type: str = "multiclass") -> Dict[str, Any]: | |
| """ | |
| Оценка качества классификатора. | |
| Args: | |
| y_true: Истинные метки | |
| y_pred: Предсказанные метки | |
| y_proba: Вероятности классов (опционально) | |
| task_type: Тип задачи (binary, multiclass, multilabel) | |
| Returns: | |
| Словарь с метриками | |
| """ | |
| metrics = { | |
| "accuracy": accuracy_score(y_true, y_pred), | |
| "precision_macro": precision_score(y_true, y_pred, average='macro', zero_division=0), | |
| "recall_macro": recall_score(y_true, y_pred, average='macro', zero_division=0), | |
| "f1_macro": f1_score(y_true, y_pred, average='macro', zero_division=0), | |
| "precision_micro": precision_score(y_true, y_pred, average='micro', zero_division=0), | |
| "recall_micro": recall_score(y_true, y_pred, average='micro', zero_division=0), | |
| "f1_micro": f1_score(y_true, y_pred, average='micro', zero_division=0), | |
| } | |
| # ROC-AUC для бинарной классификации | |
| if task_type == "binary" and y_proba is not None and y_proba.shape[1] == 2: | |
| try: | |
| metrics["roc_auc"] = roc_auc_score(y_true, y_proba[:, 1]) | |
| except: | |
| metrics["roc_auc"] = np.nan | |
| # ROC-AUC для многоклассовой (macro) | |
| elif task_type == "multiclass" and y_proba is not None: | |
| try: | |
| metrics["roc_auc_macro"] = roc_auc_score(y_true, y_proba, average='macro', multi_class='ovr') | |
| except: | |
| metrics["roc_auc_macro"] = np.nan | |
| # Метрики для многометочной классификации | |
| elif task_type == "multilabel": | |
| # Для multilabel используем специальные метрики | |
| from sklearn.metrics import hamming_loss, jaccard_score | |
| try: | |
| metrics["hamming_loss"] = hamming_loss(y_true, y_pred) | |
| metrics["jaccard_score"] = jaccard_score(y_true, y_pred, average='macro', zero_division=0) | |
| # ROC-AUC для multilabel (каждый класс отдельно, затем усреднение) | |
| if y_proba is not None: | |
| try: | |
| metrics["roc_auc_macro"] = roc_auc_score(y_true, y_proba, average='macro') | |
| except: | |
| metrics["roc_auc_macro"] = np.nan | |
| except Exception as e: | |
| print(f"Ошибка при вычислении метрик multilabel: {e}") | |
| return metrics | |
| def cross_validate_classifier(model, X, y, cv=5, scoring='f1_macro'): | |
| """Кросс-валидация классификатора.""" | |
| cv_scores = cross_val_score(model, X, y, cv=StratifiedKFold(n_splits=cv, shuffle=True, random_state=42), | |
| scoring=scoring) | |
| return { | |
| "mean": cv_scores.mean(), | |
| "std": cv_scores.std(), | |
| "scores": cv_scores.tolist() | |
| } | |
| def compare_classifiers(X_train, y_train, X_test, y_test, | |
| configs: List[ClassifierConfig], | |
| task_type: str = "multiclass", | |
| cv: Optional[int] = None) -> pd.DataFrame: | |
| """ | |
| Сравнение нескольких классификаторов. | |
| Args: | |
| X_train: Обучающие признаки | |
| y_train: Обучающие метки | |
| X_test: Тестовые признаки | |
| y_test: Тестовые метки | |
| configs: Список конфигураций классификаторов | |
| task_type: Тип задачи (binary, multiclass, multilabel) | |
| cv: Количество фолдов для кросс-валидации (опционально) | |
| Returns: | |
| DataFrame с результатами сравнения | |
| """ | |
| # Определяем, является ли задача multilabel | |
| is_multilabel = task_type == "multilabel" | |
| if is_multilabel: | |
| # Обновляем конфигурации для multilabel | |
| for cfg in configs: | |
| cfg.multilabel = True | |
| results = [] | |
| for cfg in configs: | |
| try: | |
| classifier = ClassicalClassifiers(cfg) | |
| # Обучение | |
| classifier.fit(X_train, y_train) | |
| # Предсказания | |
| y_pred = classifier.predict(X_test) | |
| y_proba = classifier.predict_proba(X_test) | |
| # Для multilabel y_pred может быть 2D, нужно преобразовать | |
| if is_multilabel and len(y_pred.shape) == 2: | |
| # y_pred уже в правильном формате для multilabel | |
| pass | |
| elif is_multilabel: | |
| # Если модель вернула 1D, преобразуем | |
| y_pred = y_pred.reshape(-1, 1) if len(y_pred.shape) == 1 else y_pred | |
| # Метрики | |
| metrics = evaluate_classifier(y_test, y_pred, y_proba, task_type) | |
| # Кросс-валидация (если запрошена) | |
| cv_results = None | |
| if cv: | |
| cv_results = cross_validate_classifier(classifier.model, X_train, y_train, cv=cv) | |
| result = { | |
| "Модель": cfg.name, | |
| "Тип": cfg.model_type, | |
| "Точность": round(metrics["accuracy"], 4), | |
| "Precision (macro)": round(metrics["precision_macro"], 4), | |
| "Recall (macro)": round(metrics["recall_macro"], 4), | |
| "F1 (macro)": round(metrics["f1_macro"], 4), | |
| "F1 (micro)": round(metrics["f1_micro"], 4), | |
| "Время обучения (с)": round(classifier.train_time, 2), | |
| "Время предсказания (с)": round(classifier.predict_time, 4), | |
| } | |
| if "roc_auc" in metrics: | |
| result["ROC-AUC"] = round(metrics["roc_auc"], 4) | |
| elif "roc_auc_macro" in metrics: | |
| result["ROC-AUC (macro)"] = round(metrics["roc_auc_macro"], 4) | |
| # Дополнительные метрики для multilabel | |
| if task_type == "multilabel": | |
| if "hamming_loss" in metrics: | |
| result["Hamming Loss"] = round(metrics["hamming_loss"], 4) | |
| if "jaccard_score" in metrics: | |
| result["Jaccard Score"] = round(metrics["jaccard_score"], 4) | |
| if cv_results: | |
| result["CV F1 (mean)"] = round(cv_results["mean"], 4) | |
| result["CV F1 (std)"] = round(cv_results["std"], 4) | |
| results.append(result) | |
| except Exception as e: | |
| print(f"Ошибка при обучении {cfg.name}: {e}") | |
| results.append({ | |
| "Модель": cfg.name, | |
| "Тип": cfg.model_type, | |
| "Ошибка": str(e) | |
| }) | |
| return pd.DataFrame(results) | |
| if __name__ == "__main__": | |
| # Тестирование | |
| from sklearn.datasets import make_classification | |
| from sklearn.model_selection import train_test_split | |
| X, y = make_classification(n_samples=1000, n_features=20, n_classes=3, random_state=42) | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
| configs = [ | |
| ClassifierConfig(name="Logistic Regression", model_type="lr"), | |
| ClassifierConfig(name="SVM", model_type="svm", params={"kernel": "linear"}), | |
| ClassifierConfig(name="Random Forest", model_type="rf"), | |
| ] | |
| if XGBOOST_AVAILABLE: | |
| configs.append(ClassifierConfig(name="XGBoost", model_type="xgb")) | |
| results_df = compare_classifiers(X_train, y_train, X_test, y_test, configs) | |
| print(results_df) | |