from typing import Dict, Any, Union, Callable, Optional, Tuple, List import numpy as np import pandas as pd from collections import defaultdict import torch from sklearn.model_selection import ( StratifiedKFold, GroupKFold, TimeSeriesSplit, GridSearchCV, RandomizedSearchCV ) from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, average_precision_score, log_loss, confusion_matrix, classification_report ) from sklearn.base import BaseEstimator import warnings warnings.filterwarnings("ignore") OPTUNA_AVAILABLE = False HYPEROPT_AVAILABLE = False try: import optuna from optuna.samplers import TPESampler OPTUNA_AVAILABLE = True except ImportError: pass try: from hyperopt import fmin, tpe, hp, Trials, STATUS_OK HYPEROPT_AVAILABLE = True except ImportError: pass WANDB_AVAILABLE = False try: import wandb WANDB_AVAILABLE = True except ImportError: pass def get_cv_splitter( cv_type: str = "stratified", n_splits: int = 5, groups: Optional[np.ndarray] = None, random_state: int = 42 ): if cv_type == "stratified": return StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state) elif cv_type == "group": if groups is None: raise ValueError("groups must be provided for GroupKFold") return GroupKFold(n_splits=n_splits) elif cv_type == "time": return TimeSeriesSplit(n_splits=n_splits) else: raise ValueError("cv_type must be 'stratified', 'group', or 'time'") def grid_search_cv( model: BaseEstimator, X: np.ndarray, y: np.ndarray, param_grid: Dict[str, List], cv_type: str = "stratified", n_splits: int = 5, scoring: str = "f1_macro", groups: Optional[np.ndarray] = None, verbose: int = 1 ) -> GridSearchCV: cv = get_cv_splitter(cv_type, n_splits, groups) search = GridSearchCV( model, param_grid, cv=cv, scoring=scoring, verbose=verbose, n_jobs=-1 ) search.fit(X, y) return search def random_search_cv( model: BaseEstimator, X: np.ndarray, y: np.ndarray, param_distributions: Dict[str, Any], n_iter: int = 20, cv_type: str = "stratified", n_splits: int = 5, scoring: str = "f1_macro", groups: Optional[np.ndarray] = None, verbose: int = 1 ) -> RandomizedSearchCV: cv = get_cv_splitter(cv_type, n_splits, groups) search = RandomizedSearchCV( model, param_distributions, n_iter=n_iter, cv=cv, scoring=scoring, verbose=verbose, n_jobs=-1, random_state=42 ) search.fit(X, y) return search def _optuna_objective( trial, model_fn: Callable, X: np.ndarray, y: np.ndarray, cv, scoring: str = "f1_macro" ) -> float: if "logistic" in model_fn.__name__.lower(): C = trial.suggest_float("C", 1e-4, 1e2, log=True) penalty = trial.suggest_categorical("penalty", ["l1", "l2"]) solver = "liblinear" if penalty == "l1" else "lbfgs" model = model_fn(C=C, penalty=penalty, solver=solver) elif "random_forest" in model_fn.__name__.lower(): n_estimators = trial.suggest_int("n_estimators", 50, 300) max_depth = trial.suggest_int("max_depth", 3, 20) model = model_fn(n_estimators=n_estimators, max_depth=max_depth) else: model = model_fn(trial) scores = [] for train_idx, val_idx in cv.split(X, y): X_train, X_val = X[train_idx], X[val_idx] y_train, y_val = y[train_idx], y[val_idx] model.fit(X_train, y_train) y_pred = model.predict(X_val) if scoring == "f1_macro": score = f1_score(y_val, y_pred, average="macro") elif scoring == "roc_auc": y_proba = model.predict_proba(X_val)[:, 1] score = roc_auc_score(y_val, y_proba) else: raise ValueError(f"Scoring {scoring} not implemented in custom Optuna loop") scores.append(score) return np.mean(scores) def optuna_tuning( model_fn: Callable, X: np.ndarray, y: np.ndarray, n_trials: int = 50, cv_type: str = "stratified", n_splits: int = 5, scoring: str = "f1_macro", groups: Optional[np.ndarray] = None, direction: str = "maximize" ) -> optuna.Study: cv = get_cv_splitter(cv_type, n_splits, groups) study = optuna.create_study(direction=direction, sampler=TPESampler(seed=42)) study.optimize( lambda trial: _optuna_objective(trial, model_fn, X, y, cv, scoring), n_trials=n_trials ) return study def hyperopt_tuning( model_fn: Callable, X: np.ndarray, y: np.ndarray, space: Dict, max_evals: int = 50, cv_type: str = "stratified", n_splits: int = 5, scoring: str = "f1_macro", groups: Optional[np.ndarray] = None ): cv = get_cv_splitter(cv_type, n_splits, groups) def objective(params): model = model_fn(**params) scores = [] for train_idx, val_idx in cv.split(X, y): X_train, X_val = X[train_idx], X[val_idx] y_train, y_val = y[train_idx], y[val_idx] model.fit(X_train, y_train) y_pred = model.predict(X_val) if scoring == "f1_macro": score = f1_score(y_val, y_pred, average="macro") elif scoring == "roc_auc": y_proba = model.predict_proba(X_val)[:, 1] score = roc_auc_score(y_val, y_proba) else: score = -1 scores.append(-score) return {'loss': -np.mean(scores), 'status': STATUS_OK} trials = Trials() best = fmin(fn=objective, space=space, algo=tpe.suggest, max_evals=max_evals, trials=trials) return best, trials def compute_classification_metrics( y_true: np.ndarray, y_pred: np.ndarray, y_proba: Optional[np.ndarray] = None, average: str = "macro" ) -> Dict[str, float]: metrics = { "accuracy": accuracy_score(y_true, y_pred), "precision": precision_score(y_true, y_pred, average=average, zero_division=0), "recall": recall_score(y_true, y_pred, average=average, zero_division=0), "f1": f1_score(y_true, y_pred, average=average, zero_division=0), } if y_proba is not None: if len(np.unique(y_true)) == 2: metrics["roc_auc"] = roc_auc_score(y_true, y_proba[:, 1]) metrics["pr_auc"] = average_precision_score(y_true, y_proba[:, 1]) metrics["log_loss"] = log_loss(y_true, y_proba) else: try: metrics["roc_auc"] = roc_auc_score(y_true, y_proba, multi_class="ovr", average=average) metrics["pr_auc"] = average_precision_score(y_true, y_proba, average=average) metrics["log_loss"] = log_loss(y_true, y_proba) except ValueError: metrics["roc_auc"] = np.nan metrics["pr_auc"] = np.nan return metrics def evaluate_model( model: BaseEstimator, X_test: np.ndarray, y_test: np.ndarray, average: str = "macro", return_pred: bool = False ) -> Union[Dict[str, float], Tuple[Dict[str, float], np.ndarray, Optional[np.ndarray]]]: y_pred = model.predict(X_test) y_proba = None if hasattr(model, "predict_proba"): y_proba = model.predict_proba(X_test) metrics = compute_classification_metrics(y_test, y_pred, y_proba, average=average) if return_pred: return metrics, y_pred, y_proba return metrics def get_early_stopping( monitor: str = "val_loss", patience: int = 5, mode: str = "min", framework: str = "keras" ): if framework == "keras": from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau es = EarlyStopping(monitor=monitor, patience=patience, restore_best_weights=True, mode=mode) reduce_lr = ReduceLROnPlateau(monitor=monitor, factor=0.5, patience=3, min_lr=1e-7, mode=mode) return [es, reduce_lr] elif framework == "pytorch": raise NotImplementedError("PyTorch callbacks require custom training loop") else: raise ValueError("framework must be 'keras' or 'pytorch'") def init_wandb( project_name: str = "text-classification", run_name: Optional[str] = None, config: Optional[Dict] = None ): if not WANDB_AVAILABLE: return None wandb.init(project=project_name, name=run_name, config=config) return wandb def log_metrics_to_wandb(metrics: Dict[str, float]): if WANDB_AVAILABLE and wandb.run: wandb.log(metrics) def suggest_transformer_hparams(trial) -> Dict[str, Any]: return { "learning_rate": trial.suggest_float("learning_rate", 1e-6, 1e-4, log=True), "per_device_train_batch_size": trial.suggest_categorical("batch_size", [8, 16, 32]), "num_train_epochs": trial.suggest_int("num_train_epochs", 2, 6), "weight_decay": trial.suggest_float("weight_decay", 0.0, 0.3), "warmup_ratio": trial.suggest_float("warmup_ratio", 0.0, 0.2), } def evaluate_transformer_outputs( y_true: List[int], y_pred: List[int], y_logits: Optional[np.ndarray] = None ) -> Dict[str, float]: y_true = np.array(y_true) y_pred = np.array(y_pred) if y_logits is not None: y_proba = torch.softmax(torch.tensor(y_logits), dim=-1).numpy() else: y_proba = None return compute_classification_metrics(y_true, y_pred, y_proba, average="macro") def confusion_matrix_df(y_true: np.ndarray, y_pred: np.ndarray, labels: Optional[List] = None) -> pd.DataFrame: cm = confusion_matrix(y_true, y_pred, labels=labels) if labels is None: labels = sorted(np.unique(y_true)) return pd.DataFrame(cm, index=[f"True_{l}" for l in labels], columns=[f"Pred_{l}" for l in labels])