""" Классические методы классификации текстов: логистическая регрессия, SVM, случайный лес, градиентный бустинг, ансамбли и AutoML подходы. """ from __future__ import annotations import time from dataclasses import dataclass from typing import List, Dict, Any, Optional, Tuple, Union import numpy as np import pandas as pd from sklearn.linear_model import LogisticRegression from sklearn.svm import SVC from sklearn.ensemble import RandomForestClassifier, VotingClassifier, BaggingClassifier from sklearn.tree import DecisionTreeClassifier from sklearn.model_selection import cross_val_score, StratifiedKFold from sklearn.multioutput import MultiOutputClassifier from sklearn.multiclass import OneVsRestClassifier from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, classification_report, confusion_matrix, precision_recall_curve, roc_curve ) try: import xgboost as xgb XGBOOST_AVAILABLE = True except ImportError: XGBOOST_AVAILABLE = False try: import lightgbm as lgb LIGHTGBM_AVAILABLE = True except ImportError: LIGHTGBM_AVAILABLE = False try: import catboost as cb CATBOOST_AVAILABLE = True except ImportError: CATBOOST_AVAILABLE = False try: import autosklearn.classification AUTOSKLEARN_AVAILABLE = True except ImportError: AUTOSKLEARN_AVAILABLE = False try: from tpot import TPOTClassifier TPOT_AVAILABLE = True except ImportError: TPOT_AVAILABLE = False try: import h2o from h2o.automl import H2OAutoML H2O_AVAILABLE = True except ImportError: H2O_AVAILABLE = False @dataclass class ClassifierConfig: """Конфигурация классификатора.""" name: str model_type: str # lr, svm, rf, xgb, lgb, catboost, ensemble, autosklearn, tpot, h2o params: Dict[str, Any] = None use_class_weight: bool = True multilabel: bool = False # Использовать MultiOutputClassifier для multilabel class ClassicalClassifiers: """Класс для работы с классическими методами классификации.""" def __init__(self, config: ClassifierConfig): self.config = config self.model = self._create_model() self.train_time = 0.0 self.predict_time = 0.0 def _create_model(self): """Создает модель на основе конфигурации.""" model_type = self.config.model_type.lower() params = self.config.params or {} base_model = None if model_type == "lr": base_model = LogisticRegression( max_iter=1000, random_state=42, class_weight="balanced" if self.config.use_class_weight else None, **params ) elif model_type == "svm": base_model = SVC( probability=True, random_state=42, class_weight="balanced" if self.config.use_class_weight else None, **params ) elif model_type == "rf": base_model = RandomForestClassifier( n_estimators=100, random_state=42, class_weight="balanced" if self.config.use_class_weight else None, **params ) # Обертываем в MultiOutputClassifier для multilabel if self.config.multilabel and base_model is not None: return MultiOutputClassifier(base_model) if base_model is not None: return base_model if model_type == "xgb" and XGBOOST_AVAILABLE: model = xgb.XGBClassifier( random_state=42, eval_metric='mlogloss', **params ) return MultiOutputClassifier(model) if self.config.multilabel else model if model_type == "lgb" and LIGHTGBM_AVAILABLE: model = lgb.LGBMClassifier( random_state=42, verbose=-1, **params ) return MultiOutputClassifier(model) if self.config.multilabel else model if model_type == "catboost" and CATBOOST_AVAILABLE: model = cb.CatBoostClassifier( random_state=42, verbose=False, **params ) return MultiOutputClassifier(model) if self.config.multilabel else model if model_type == "ensemble": # Voting Classifier estimators = [ ('lr', LogisticRegression(max_iter=1000, random_state=42)), ('svm', SVC(probability=True, random_state=42)), ('rf', RandomForestClassifier(n_estimators=50, random_state=42)) ] model = VotingClassifier(estimators=estimators, voting='soft') return MultiOutputClassifier(model) if self.config.multilabel else model if model_type == "bagging": base = DecisionTreeClassifier(random_state=42) model = BaggingClassifier( base_estimator=base, n_estimators=10, random_state=42, **params ) return MultiOutputClassifier(model) if self.config.multilabel else model if model_type == "autosklearn" and AUTOSKLEARN_AVAILABLE: model = autosklearn.classification.AutoSklearnClassifier( time_left_for_this_task=300, # 5 минут memory_limit=4096, **params ) # AutoSklearn может не поддерживать multilabel напрямую return model if model_type == "tpot" and TPOT_AVAILABLE: model = TPOTClassifier( generations=5, population_size=20, verbosity=2, random_state=42, **params ) # TPOT может не поддерживать multilabel напрямую return model raise ValueError(f"Неизвестный тип модели: {model_type} или библиотека недоступна") def fit(self, X, y): """Обучение модели.""" start = time.time() self.model.fit(X, y) self.train_time = time.time() - start return self def predict(self, X): """Предсказание классов.""" start = time.time() predictions = self.model.predict(X) self.predict_time = time.time() - start return predictions def predict_proba(self, X): """Предсказание вероятностей.""" if hasattr(self.model, 'predict_proba'): return self.model.predict_proba(X) return None def get_feature_importance(self): """Получение важности признаков (если доступно).""" if hasattr(self.model, 'feature_importances_'): return self.model.feature_importances_ elif hasattr(self.model, 'coef_'): return np.abs(self.model.coef_[0]) if len(self.model.coef_.shape) > 1 else np.abs(self.model.coef_) return None def evaluate_classifier(y_true, y_pred, y_proba=None, task_type: str = "multiclass") -> Dict[str, Any]: """ Оценка качества классификатора. Args: y_true: Истинные метки y_pred: Предсказанные метки y_proba: Вероятности классов (опционально) task_type: Тип задачи (binary, multiclass, multilabel) Returns: Словарь с метриками """ metrics = { "accuracy": accuracy_score(y_true, y_pred), "precision_macro": precision_score(y_true, y_pred, average='macro', zero_division=0), "recall_macro": recall_score(y_true, y_pred, average='macro', zero_division=0), "f1_macro": f1_score(y_true, y_pred, average='macro', zero_division=0), "precision_micro": precision_score(y_true, y_pred, average='micro', zero_division=0), "recall_micro": recall_score(y_true, y_pred, average='micro', zero_division=0), "f1_micro": f1_score(y_true, y_pred, average='micro', zero_division=0), } # ROC-AUC для бинарной классификации if task_type == "binary" and y_proba is not None and y_proba.shape[1] == 2: try: metrics["roc_auc"] = roc_auc_score(y_true, y_proba[:, 1]) except: metrics["roc_auc"] = np.nan # ROC-AUC для многоклассовой (macro) elif task_type == "multiclass" and y_proba is not None: try: metrics["roc_auc_macro"] = roc_auc_score(y_true, y_proba, average='macro', multi_class='ovr') except: metrics["roc_auc_macro"] = np.nan # Метрики для многометочной классификации elif task_type == "multilabel": # Для multilabel используем специальные метрики from sklearn.metrics import hamming_loss, jaccard_score try: metrics["hamming_loss"] = hamming_loss(y_true, y_pred) metrics["jaccard_score"] = jaccard_score(y_true, y_pred, average='macro', zero_division=0) # ROC-AUC для multilabel (каждый класс отдельно, затем усреднение) if y_proba is not None: try: metrics["roc_auc_macro"] = roc_auc_score(y_true, y_proba, average='macro') except: metrics["roc_auc_macro"] = np.nan except Exception as e: print(f"Ошибка при вычислении метрик multilabel: {e}") return metrics def cross_validate_classifier(model, X, y, cv=5, scoring='f1_macro'): """Кросс-валидация классификатора.""" cv_scores = cross_val_score(model, X, y, cv=StratifiedKFold(n_splits=cv, shuffle=True, random_state=42), scoring=scoring) return { "mean": cv_scores.mean(), "std": cv_scores.std(), "scores": cv_scores.tolist() } def compare_classifiers(X_train, y_train, X_test, y_test, configs: List[ClassifierConfig], task_type: str = "multiclass", cv: Optional[int] = None) -> pd.DataFrame: """ Сравнение нескольких классификаторов. Args: X_train: Обучающие признаки y_train: Обучающие метки X_test: Тестовые признаки y_test: Тестовые метки configs: Список конфигураций классификаторов task_type: Тип задачи (binary, multiclass, multilabel) cv: Количество фолдов для кросс-валидации (опционально) Returns: DataFrame с результатами сравнения """ # Определяем, является ли задача multilabel is_multilabel = task_type == "multilabel" if is_multilabel: # Обновляем конфигурации для multilabel for cfg in configs: cfg.multilabel = True results = [] for cfg in configs: try: classifier = ClassicalClassifiers(cfg) # Обучение classifier.fit(X_train, y_train) # Предсказания y_pred = classifier.predict(X_test) y_proba = classifier.predict_proba(X_test) # Для multilabel y_pred может быть 2D, нужно преобразовать if is_multilabel and len(y_pred.shape) == 2: # y_pred уже в правильном формате для multilabel pass elif is_multilabel: # Если модель вернула 1D, преобразуем y_pred = y_pred.reshape(-1, 1) if len(y_pred.shape) == 1 else y_pred # Метрики metrics = evaluate_classifier(y_test, y_pred, y_proba, task_type) # Кросс-валидация (если запрошена) cv_results = None if cv: cv_results = cross_validate_classifier(classifier.model, X_train, y_train, cv=cv) result = { "Модель": cfg.name, "Тип": cfg.model_type, "Точность": round(metrics["accuracy"], 4), "Precision (macro)": round(metrics["precision_macro"], 4), "Recall (macro)": round(metrics["recall_macro"], 4), "F1 (macro)": round(metrics["f1_macro"], 4), "F1 (micro)": round(metrics["f1_micro"], 4), "Время обучения (с)": round(classifier.train_time, 2), "Время предсказания (с)": round(classifier.predict_time, 4), } if "roc_auc" in metrics: result["ROC-AUC"] = round(metrics["roc_auc"], 4) elif "roc_auc_macro" in metrics: result["ROC-AUC (macro)"] = round(metrics["roc_auc_macro"], 4) # Дополнительные метрики для multilabel if task_type == "multilabel": if "hamming_loss" in metrics: result["Hamming Loss"] = round(metrics["hamming_loss"], 4) if "jaccard_score" in metrics: result["Jaccard Score"] = round(metrics["jaccard_score"], 4) if cv_results: result["CV F1 (mean)"] = round(cv_results["mean"], 4) result["CV F1 (std)"] = round(cv_results["std"], 4) results.append(result) except Exception as e: print(f"Ошибка при обучении {cfg.name}: {e}") results.append({ "Модель": cfg.name, "Тип": cfg.model_type, "Ошибка": str(e) }) return pd.DataFrame(results) if __name__ == "__main__": # Тестирование from sklearn.datasets import make_classification from sklearn.model_selection import train_test_split X, y = make_classification(n_samples=1000, n_features=20, n_classes=3, random_state=42) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) configs = [ ClassifierConfig(name="Logistic Regression", model_type="lr"), ClassifierConfig(name="SVM", model_type="svm", params={"kernel": "linear"}), ClassifierConfig(name="Random Forest", model_type="rf"), ] if XGBOOST_AVAILABLE: configs.append(ClassifierConfig(name="XGBoost", model_type="xgb")) results_df = compare_classifiers(X_train, y_train, X_test, y_test, configs) print(results_df)