Spaces:
Sleeping
Sleeping
| """ | |
| Модуль для оценки качества моделей классификации и настройки гиперпараметров. | |
| Включает кросс-валидацию, подбор гиперпараметров и комплексные метрики. | |
| """ | |
| from __future__ import annotations | |
| import time | |
| from typing import List, Dict, Any, Optional, Tuple, Union | |
| from dataclasses import dataclass | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.model_selection import ( | |
| GridSearchCV, RandomizedSearchCV, StratifiedKFold, | |
| cross_val_score, train_test_split | |
| ) | |
| from sklearn.metrics import ( | |
| accuracy_score, precision_score, recall_score, f1_score, | |
| roc_auc_score, classification_report, confusion_matrix, | |
| precision_recall_curve, roc_curve, average_precision_score | |
| ) | |
| try: | |
| import optuna | |
| OPTUNA_AVAILABLE = True | |
| except ImportError: | |
| OPTUNA_AVAILABLE = False | |
| print("⚠️ Optuna не установлен. Bayesian optimization недоступен.") | |
| try: | |
| from hyperopt import fmin, tpe, hp, Trials, STATUS_OK | |
| HYPEROPT_AVAILABLE = True | |
| except ImportError: | |
| HYPEROPT_AVAILABLE = False | |
| print("⚠️ Hyperopt не установлен. Bayesian optimization недоступен.") | |
| class EvaluationMetrics: | |
| """Контейнер для метрик оценки.""" | |
| accuracy: float | |
| precision_macro: float | |
| recall_macro: float | |
| f1_macro: float | |
| precision_micro: float | |
| recall_micro: float | |
| f1_micro: float | |
| roc_auc: Optional[float] = None | |
| pr_auc: Optional[float] = None | |
| train_time: float = 0.0 | |
| predict_time: float = 0.0 | |
| def evaluate_classifier(y_true: np.ndarray, | |
| y_pred: np.ndarray, | |
| y_proba: Optional[np.ndarray] = None, | |
| task_type: str = "multiclass") -> EvaluationMetrics: | |
| """ | |
| Комплексная оценка классификатора. | |
| Args: | |
| y_true: Истинные метки | |
| y_pred: Предсказанные метки | |
| y_proba: Вероятности классов | |
| task_type: Тип задачи (binary, multiclass, multilabel) | |
| Returns: | |
| Объект EvaluationMetrics | |
| """ | |
| metrics = EvaluationMetrics( | |
| accuracy=accuracy_score(y_true, y_pred), | |
| precision_macro=precision_score(y_true, y_pred, average='macro', zero_division=0), | |
| recall_macro=recall_score(y_true, y_pred, average='macro', zero_division=0), | |
| f1_macro=f1_score(y_true, y_pred, average='macro', zero_division=0), | |
| precision_micro=precision_score(y_true, y_pred, average='micro', zero_division=0), | |
| recall_micro=recall_score(y_true, y_pred, average='micro', zero_division=0), | |
| f1_micro=f1_score(y_true, y_pred, average='micro', zero_division=0), | |
| ) | |
| # ROC-AUC для бинарной классификации | |
| if task_type == "binary" and y_proba is not None: | |
| if y_proba.shape[1] == 2: | |
| try: | |
| metrics.roc_auc = roc_auc_score(y_true, y_proba[:, 1]) | |
| metrics.pr_auc = average_precision_score(y_true, y_proba[:, 1]) | |
| except: | |
| pass | |
| elif y_proba.shape[1] == 1: | |
| try: | |
| metrics.roc_auc = roc_auc_score(y_true, y_proba.flatten()) | |
| metrics.pr_auc = average_precision_score(y_true, y_proba.flatten()) | |
| except: | |
| pass | |
| # ROC-AUC для многоклассовой (macro) | |
| elif task_type == "multiclass" and y_proba is not None: | |
| try: | |
| metrics.roc_auc = roc_auc_score(y_true, y_proba, average='macro', multi_class='ovr') | |
| except: | |
| pass | |
| return metrics | |
| def cross_validate(model, X: np.ndarray, y: np.ndarray, | |
| cv: int = 5, | |
| scoring: str = 'f1_macro', | |
| return_train_score: bool = False) -> Dict[str, Any]: | |
| """ | |
| Кросс-валидация модели. | |
| Args: | |
| model: Модель с интерфейсом sklearn | |
| X: Признаки | |
| y: Метки | |
| cv: Количество фолдов | |
| scoring: Метрика для оценки | |
| return_train_score: Возвращать ли оценки на обучении | |
| Returns: | |
| Словарь с результатами кросс-валидации | |
| """ | |
| cv_scores = cross_val_score( | |
| model, X, y, | |
| cv=StratifiedKFold(n_splits=cv, shuffle=True, random_state=42), | |
| scoring=scoring, | |
| return_train_score=return_train_score | |
| ) | |
| result = { | |
| "mean": float(cv_scores.mean()), | |
| "std": float(cv_scores.std()), | |
| "scores": cv_scores.tolist() | |
| } | |
| if return_train_score and hasattr(cv_scores, 'train_scores'): | |
| result["train_mean"] = float(cv_scores.train_scores.mean()) | |
| result["train_std"] = float(cv_scores.train_scores.std()) | |
| return result | |
| def grid_search(model, X: np.ndarray, y: np.ndarray, | |
| param_grid: Dict[str, List[Any]], | |
| cv: int = 5, | |
| scoring: str = 'f1_macro', | |
| n_jobs: int = -1) -> Dict[str, Any]: | |
| """ | |
| Подбор гиперпараметров методом Grid Search. | |
| Args: | |
| model: Модель с интерфейсом sklearn | |
| X: Признаки | |
| y: Метки | |
| param_grid: Сетка параметров | |
| cv: Количество фолдов | |
| scoring: Метрика для оценки | |
| n_jobs: Количество параллельных задач | |
| Returns: | |
| Словарь с лучшими параметрами и результатами | |
| """ | |
| grid_search = GridSearchCV( | |
| model, | |
| param_grid, | |
| cv=StratifiedKFold(n_splits=cv, shuffle=True, random_state=42), | |
| scoring=scoring, | |
| n_jobs=n_jobs, | |
| verbose=1 | |
| ) | |
| start = time.time() | |
| grid_search.fit(X, y) | |
| search_time = time.time() - start | |
| return { | |
| "best_params": grid_search.best_params_, | |
| "best_score": float(grid_search.best_score_), | |
| "best_model": grid_search.best_estimator_, | |
| "search_time": search_time, | |
| "cv_results": grid_search.cv_results_ | |
| } | |
| def random_search(model, X: np.ndarray, y: np.ndarray, | |
| param_distributions: Dict[str, List[Any]], | |
| n_iter: int = 50, | |
| cv: int = 5, | |
| scoring: str = 'f1_macro', | |
| n_jobs: int = -1) -> Dict[str, Any]: | |
| """ | |
| Подбор гиперпараметров методом Random Search. | |
| Args: | |
| model: Модель с интерфейсом sklearn | |
| X: Признаки | |
| y: Метки | |
| param_distributions: Распределения параметров | |
| n_iter: Количество итераций | |
| cv: Количество фолдов | |
| scoring: Метрика для оценки | |
| n_jobs: Количество параллельных задач | |
| Returns: | |
| Словарь с лучшими параметрами и результатами | |
| """ | |
| random_search = RandomizedSearchCV( | |
| model, | |
| param_distributions, | |
| n_iter=n_iter, | |
| cv=StratifiedKFold(n_splits=cv, shuffle=True, random_state=42), | |
| scoring=scoring, | |
| n_jobs=n_jobs, | |
| random_state=42, | |
| verbose=1 | |
| ) | |
| start = time.time() | |
| random_search.fit(X, y) | |
| search_time = time.time() - start | |
| return { | |
| "best_params": random_search.best_params_, | |
| "best_score": float(random_search.best_score_), | |
| "best_model": random_search.best_estimator_, | |
| "search_time": search_time, | |
| "cv_results": random_search.cv_results_ | |
| } | |
| def optuna_optimize(model_class, X: np.ndarray, y: np.ndarray, | |
| param_space: Dict[str, Any], | |
| n_trials: int = 50, | |
| cv: int = 5, | |
| scoring: str = 'f1_macro') -> Dict[str, Any]: | |
| """ | |
| Подбор гиперпараметров методом Bayesian Optimization (Optuna). | |
| Args: | |
| model_class: Класс модели | |
| X: Признаки | |
| y: Метки | |
| param_space: Пространство параметров (функции для Optuna) | |
| n_trials: Количество испытаний | |
| cv: Количество фолдов | |
| scoring: Метрика для оценки | |
| Returns: | |
| Словарь с лучшими параметрами и результатами | |
| """ | |
| if not OPTUNA_AVAILABLE: | |
| raise ImportError("Optuna не установлен. Установите: pip install optuna") | |
| def objective(trial): | |
| params = {} | |
| for param_name, param_func in param_space.items(): | |
| params[param_name] = param_func(trial) | |
| model = model_class(**params) | |
| scores = cross_val_score( | |
| model, X, y, | |
| cv=StratifiedKFold(n_splits=cv, shuffle=True, random_state=42), | |
| scoring=scoring | |
| ) | |
| return scores.mean() | |
| study = optuna.create_study(direction='maximize', study_name='classifier_optimization') | |
| start = time.time() | |
| study.optimize(objective, n_trials=n_trials, show_progress_bar=True) | |
| search_time = time.time() - start | |
| # Обучаем лучшую модель | |
| best_model = model_class(**study.best_params) | |
| best_model.fit(X, y) | |
| return { | |
| "best_params": study.best_params, | |
| "best_score": float(study.best_value), | |
| "best_model": best_model, | |
| "search_time": search_time, | |
| "study": study | |
| } | |
| def create_confusion_matrix_plot(y_true: np.ndarray, y_pred: np.ndarray, | |
| class_names: Optional[List[str]] = None) -> pd.DataFrame: | |
| """ | |
| Создает матрицу ошибок. | |
| Args: | |
| y_true: Истинные метки | |
| y_pred: Предсказанные метки | |
| class_names: Названия классов | |
| Returns: | |
| DataFrame с матрицей ошибок | |
| """ | |
| cm = confusion_matrix(y_true, y_pred) | |
| if class_names is None: | |
| class_names = [f"Класс {i}" for i in range(len(cm))] | |
| df = pd.DataFrame(cm, index=class_names, columns=class_names) | |
| return df | |
| def create_classification_report_df(y_true: np.ndarray, y_pred: np.ndarray, | |
| class_names: Optional[List[str]] = None) -> pd.DataFrame: | |
| """ | |
| Создает отчет о классификации. | |
| Args: | |
| y_true: Истинные метки | |
| y_pred: Предсказанные метки | |
| class_names: Названия классов | |
| Returns: | |
| DataFrame с отчетом | |
| """ | |
| report = classification_report(y_true, y_pred, target_names=class_names, output_dict=True) | |
| df = pd.DataFrame(report).transpose() | |
| return df | |
| if __name__ == "__main__": | |
| # Тестирование | |
| from sklearn.datasets import make_classification | |
| from sklearn.linear_model import LogisticRegression | |
| X, y = make_classification(n_samples=1000, n_features=20, n_classes=3, random_state=42) | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | |
| # Обучение модели | |
| model = LogisticRegression(max_iter=1000, random_state=42) | |
| model.fit(X_train, y_train) | |
| # Оценка | |
| y_pred = model.predict(X_test) | |
| y_proba = model.predict_proba(X_test) | |
| metrics = evaluate_classifier(y_test, y_pred, y_proba, task_type="multiclass") | |
| print("Метрики:") | |
| print(f"Accuracy: {metrics.accuracy:.4f}") | |
| print(f"F1 (macro): {metrics.f1_macro:.4f}") | |
| print(f"ROC-AUC: {metrics.roc_auc:.4f if metrics.roc_auc else 'N/A'}") | |
| # Кросс-валидация | |
| cv_results = cross_validate(model, X_train, y_train, cv=5) | |
| print(f"\nКросс-валидация F1: {cv_results['mean']:.4f} ± {cv_results['std']:.4f}") | |
| # Grid Search | |
| param_grid = { | |
| 'C': [0.1, 1, 10], | |
| 'penalty': ['l1', 'l2'] | |
| } | |
| # grid_results = grid_search(model, X_train, y_train, param_grid, cv=3) | |
| # print(f"\nЛучшие параметры (Grid Search): {grid_results['best_params']}") | |