Spaces:
Build error
Build error
| from typing import Dict, Any, Union, Callable, Optional, Tuple, List | |
| import numpy as np | |
| import pandas as pd | |
| from collections import defaultdict | |
| import torch | |
| from sklearn.model_selection import ( | |
| StratifiedKFold, GroupKFold, TimeSeriesSplit, | |
| GridSearchCV, RandomizedSearchCV | |
| ) | |
| from sklearn.metrics import ( | |
| accuracy_score, precision_score, recall_score, f1_score, | |
| roc_auc_score, average_precision_score, log_loss, | |
| confusion_matrix, classification_report | |
| ) | |
| from sklearn.base import BaseEstimator | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| OPTUNA_AVAILABLE = False | |
| HYPEROPT_AVAILABLE = False | |
| try: | |
| import optuna | |
| from optuna.samplers import TPESampler | |
| OPTUNA_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| try: | |
| from hyperopt import fmin, tpe, hp, Trials, STATUS_OK | |
| HYPEROPT_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| WANDB_AVAILABLE = False | |
| try: | |
| import wandb | |
| WANDB_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| def get_cv_splitter( | |
| cv_type: str = "stratified", | |
| n_splits: int = 5, | |
| groups: Optional[np.ndarray] = None, | |
| random_state: int = 42 | |
| ): | |
| if cv_type == "stratified": | |
| return StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state) | |
| elif cv_type == "group": | |
| if groups is None: | |
| raise ValueError("groups must be provided for GroupKFold") | |
| return GroupKFold(n_splits=n_splits) | |
| elif cv_type == "time": | |
| return TimeSeriesSplit(n_splits=n_splits) | |
| else: | |
| raise ValueError("cv_type must be 'stratified', 'group', or 'time'") | |
| def grid_search_cv( | |
| model: BaseEstimator, | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| param_grid: Dict[str, List], | |
| cv_type: str = "stratified", | |
| n_splits: int = 5, | |
| scoring: str = "f1_macro", | |
| groups: Optional[np.ndarray] = None, | |
| verbose: int = 1 | |
| ) -> GridSearchCV: | |
| cv = get_cv_splitter(cv_type, n_splits, groups) | |
| search = GridSearchCV( | |
| model, param_grid, cv=cv, scoring=scoring, verbose=verbose, n_jobs=-1 | |
| ) | |
| search.fit(X, y) | |
| return search | |
| def random_search_cv( | |
| model: BaseEstimator, | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| param_distributions: Dict[str, Any], | |
| n_iter: int = 20, | |
| cv_type: str = "stratified", | |
| n_splits: int = 5, | |
| scoring: str = "f1_macro", | |
| groups: Optional[np.ndarray] = None, | |
| verbose: int = 1 | |
| ) -> RandomizedSearchCV: | |
| cv = get_cv_splitter(cv_type, n_splits, groups) | |
| search = RandomizedSearchCV( | |
| model, param_distributions, n_iter=n_iter, cv=cv, | |
| scoring=scoring, verbose=verbose, n_jobs=-1, random_state=42 | |
| ) | |
| search.fit(X, y) | |
| return search | |
| def _optuna_objective( | |
| trial, | |
| model_fn: Callable, | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| cv, | |
| scoring: str = "f1_macro" | |
| ) -> float: | |
| if "logistic" in model_fn.__name__.lower(): | |
| C = trial.suggest_float("C", 1e-4, 1e2, log=True) | |
| penalty = trial.suggest_categorical("penalty", ["l1", "l2"]) | |
| solver = "liblinear" if penalty == "l1" else "lbfgs" | |
| model = model_fn(C=C, penalty=penalty, solver=solver) | |
| elif "random_forest" in model_fn.__name__.lower(): | |
| n_estimators = trial.suggest_int("n_estimators", 50, 300) | |
| max_depth = trial.suggest_int("max_depth", 3, 20) | |
| model = model_fn(n_estimators=n_estimators, max_depth=max_depth) | |
| else: | |
| model = model_fn(trial) | |
| scores = [] | |
| for train_idx, val_idx in cv.split(X, y): | |
| X_train, X_val = X[train_idx], X[val_idx] | |
| y_train, y_val = y[train_idx], y[val_idx] | |
| model.fit(X_train, y_train) | |
| y_pred = model.predict(X_val) | |
| if scoring == "f1_macro": | |
| score = f1_score(y_val, y_pred, average="macro") | |
| elif scoring == "roc_auc": | |
| y_proba = model.predict_proba(X_val)[:, 1] | |
| score = roc_auc_score(y_val, y_proba) | |
| else: | |
| raise ValueError(f"Scoring {scoring} not implemented in custom Optuna loop") | |
| scores.append(score) | |
| return np.mean(scores) | |
| def optuna_tuning( | |
| model_fn: Callable, | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| n_trials: int = 50, | |
| cv_type: str = "stratified", | |
| n_splits: int = 5, | |
| scoring: str = "f1_macro", | |
| groups: Optional[np.ndarray] = None, | |
| direction: str = "maximize" | |
| ) -> optuna.Study: | |
| cv = get_cv_splitter(cv_type, n_splits, groups) | |
| study = optuna.create_study(direction=direction, sampler=TPESampler(seed=42)) | |
| study.optimize( | |
| lambda trial: _optuna_objective(trial, model_fn, X, y, cv, scoring), | |
| n_trials=n_trials | |
| ) | |
| return study | |
| def hyperopt_tuning( | |
| model_fn: Callable, | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| space: Dict, | |
| max_evals: int = 50, | |
| cv_type: str = "stratified", | |
| n_splits: int = 5, | |
| scoring: str = "f1_macro", | |
| groups: Optional[np.ndarray] = None | |
| ): | |
| cv = get_cv_splitter(cv_type, n_splits, groups) | |
| def objective(params): | |
| model = model_fn(**params) | |
| scores = [] | |
| for train_idx, val_idx in cv.split(X, y): | |
| X_train, X_val = X[train_idx], X[val_idx] | |
| y_train, y_val = y[train_idx], y[val_idx] | |
| model.fit(X_train, y_train) | |
| y_pred = model.predict(X_val) | |
| if scoring == "f1_macro": | |
| score = f1_score(y_val, y_pred, average="macro") | |
| elif scoring == "roc_auc": | |
| y_proba = model.predict_proba(X_val)[:, 1] | |
| score = roc_auc_score(y_val, y_proba) | |
| else: | |
| score = -1 | |
| scores.append(-score) | |
| return {'loss': -np.mean(scores), 'status': STATUS_OK} | |
| trials = Trials() | |
| best = fmin(fn=objective, space=space, algo=tpe.suggest, max_evals=max_evals, trials=trials) | |
| return best, trials | |
| def compute_classification_metrics( | |
| y_true: np.ndarray, | |
| y_pred: np.ndarray, | |
| y_proba: Optional[np.ndarray] = None, | |
| average: str = "macro" | |
| ) -> Dict[str, float]: | |
| metrics = { | |
| "accuracy": accuracy_score(y_true, y_pred), | |
| "precision": precision_score(y_true, y_pred, average=average, zero_division=0), | |
| "recall": recall_score(y_true, y_pred, average=average, zero_division=0), | |
| "f1": f1_score(y_true, y_pred, average=average, zero_division=0), | |
| } | |
| if y_proba is not None: | |
| if len(np.unique(y_true)) == 2: | |
| metrics["roc_auc"] = roc_auc_score(y_true, y_proba[:, 1]) | |
| metrics["pr_auc"] = average_precision_score(y_true, y_proba[:, 1]) | |
| metrics["log_loss"] = log_loss(y_true, y_proba) | |
| else: | |
| try: | |
| metrics["roc_auc"] = roc_auc_score(y_true, y_proba, multi_class="ovr", average=average) | |
| metrics["pr_auc"] = average_precision_score(y_true, y_proba, average=average) | |
| metrics["log_loss"] = log_loss(y_true, y_proba) | |
| except ValueError: | |
| metrics["roc_auc"] = np.nan | |
| metrics["pr_auc"] = np.nan | |
| return metrics | |
| def evaluate_model( | |
| model: BaseEstimator, | |
| X_test: np.ndarray, | |
| y_test: np.ndarray, | |
| average: str = "macro", | |
| return_pred: bool = False | |
| ) -> Union[Dict[str, float], Tuple[Dict[str, float], np.ndarray, Optional[np.ndarray]]]: | |
| y_pred = model.predict(X_test) | |
| y_proba = None | |
| if hasattr(model, "predict_proba"): | |
| y_proba = model.predict_proba(X_test) | |
| metrics = compute_classification_metrics(y_test, y_pred, y_proba, average=average) | |
| if return_pred: | |
| return metrics, y_pred, y_proba | |
| return metrics | |
| def get_early_stopping( | |
| monitor: str = "val_loss", | |
| patience: int = 5, | |
| mode: str = "min", | |
| framework: str = "keras" | |
| ): | |
| if framework == "keras": | |
| from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau | |
| es = EarlyStopping(monitor=monitor, patience=patience, restore_best_weights=True, mode=mode) | |
| reduce_lr = ReduceLROnPlateau(monitor=monitor, factor=0.5, patience=3, min_lr=1e-7, mode=mode) | |
| return [es, reduce_lr] | |
| elif framework == "pytorch": | |
| raise NotImplementedError("PyTorch callbacks require custom training loop") | |
| else: | |
| raise ValueError("framework must be 'keras' or 'pytorch'") | |
| def init_wandb( | |
| project_name: str = "text-classification", | |
| run_name: Optional[str] = None, | |
| config: Optional[Dict] = None | |
| ): | |
| if not WANDB_AVAILABLE: | |
| return None | |
| wandb.init(project=project_name, name=run_name, config=config) | |
| return wandb | |
| def log_metrics_to_wandb(metrics: Dict[str, float]): | |
| if WANDB_AVAILABLE and wandb.run: | |
| wandb.log(metrics) | |
| def suggest_transformer_hparams(trial) -> Dict[str, Any]: | |
| return { | |
| "learning_rate": trial.suggest_float("learning_rate", 1e-6, 1e-4, log=True), | |
| "per_device_train_batch_size": trial.suggest_categorical("batch_size", [8, 16, 32]), | |
| "num_train_epochs": trial.suggest_int("num_train_epochs", 2, 6), | |
| "weight_decay": trial.suggest_float("weight_decay", 0.0, 0.3), | |
| "warmup_ratio": trial.suggest_float("warmup_ratio", 0.0, 0.2), | |
| } | |
| def evaluate_transformer_outputs( | |
| y_true: List[int], | |
| y_pred: List[int], | |
| y_logits: Optional[np.ndarray] = None | |
| ) -> Dict[str, float]: | |
| y_true = np.array(y_true) | |
| y_pred = np.array(y_pred) | |
| if y_logits is not None: | |
| y_proba = torch.softmax(torch.tensor(y_logits), dim=-1).numpy() | |
| else: | |
| y_proba = None | |
| return compute_classification_metrics(y_true, y_pred, y_proba, average="macro") | |
| def confusion_matrix_df(y_true: np.ndarray, y_pred: np.ndarray, labels: Optional[List] = None) -> pd.DataFrame: | |
| cm = confusion_matrix(y_true, y_pred, labels=labels) | |
| if labels is None: | |
| labels = sorted(np.unique(y_true)) | |
| return pd.DataFrame(cm, index=[f"True_{l}" for l in labels], columns=[f"Pred_{l}" for l in labels]) |