text_classificators / src /model_evaluation.py
theformatisvalid's picture
Upload 7 files
2153792 verified
raw
history blame
10.4 kB
from typing import Dict, Any, Union, Callable, Optional, Tuple, List
import numpy as np
import pandas as pd
from collections import defaultdict
import torch
from sklearn.model_selection import (
StratifiedKFold, GroupKFold, TimeSeriesSplit,
GridSearchCV, RandomizedSearchCV
)
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, average_precision_score, log_loss,
confusion_matrix, classification_report
)
from sklearn.base import BaseEstimator
import warnings
warnings.filterwarnings("ignore")
OPTUNA_AVAILABLE = False
HYPEROPT_AVAILABLE = False
try:
import optuna
from optuna.samplers import TPESampler
OPTUNA_AVAILABLE = True
except ImportError:
pass
try:
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
HYPEROPT_AVAILABLE = True
except ImportError:
pass
WANDB_AVAILABLE = False
try:
import wandb
WANDB_AVAILABLE = True
except ImportError:
pass
def get_cv_splitter(
cv_type: str = "stratified",
n_splits: int = 5,
groups: Optional[np.ndarray] = None,
random_state: int = 42
):
if cv_type == "stratified":
return StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
elif cv_type == "group":
if groups is None:
raise ValueError("groups must be provided for GroupKFold")
return GroupKFold(n_splits=n_splits)
elif cv_type == "time":
return TimeSeriesSplit(n_splits=n_splits)
else:
raise ValueError("cv_type must be 'stratified', 'group', or 'time'")
def grid_search_cv(
model: BaseEstimator,
X: np.ndarray,
y: np.ndarray,
param_grid: Dict[str, List],
cv_type: str = "stratified",
n_splits: int = 5,
scoring: str = "f1_macro",
groups: Optional[np.ndarray] = None,
verbose: int = 1
) -> GridSearchCV:
cv = get_cv_splitter(cv_type, n_splits, groups)
search = GridSearchCV(
model, param_grid, cv=cv, scoring=scoring, verbose=verbose, n_jobs=-1
)
search.fit(X, y)
return search
def random_search_cv(
model: BaseEstimator,
X: np.ndarray,
y: np.ndarray,
param_distributions: Dict[str, Any],
n_iter: int = 20,
cv_type: str = "stratified",
n_splits: int = 5,
scoring: str = "f1_macro",
groups: Optional[np.ndarray] = None,
verbose: int = 1
) -> RandomizedSearchCV:
cv = get_cv_splitter(cv_type, n_splits, groups)
search = RandomizedSearchCV(
model, param_distributions, n_iter=n_iter, cv=cv,
scoring=scoring, verbose=verbose, n_jobs=-1, random_state=42
)
search.fit(X, y)
return search
def _optuna_objective(
trial,
model_fn: Callable,
X: np.ndarray,
y: np.ndarray,
cv,
scoring: str = "f1_macro"
) -> float:
if "logistic" in model_fn.__name__.lower():
C = trial.suggest_float("C", 1e-4, 1e2, log=True)
penalty = trial.suggest_categorical("penalty", ["l1", "l2"])
solver = "liblinear" if penalty == "l1" else "lbfgs"
model = model_fn(C=C, penalty=penalty, solver=solver)
elif "random_forest" in model_fn.__name__.lower():
n_estimators = trial.suggest_int("n_estimators", 50, 300)
max_depth = trial.suggest_int("max_depth", 3, 20)
model = model_fn(n_estimators=n_estimators, max_depth=max_depth)
else:
model = model_fn(trial)
scores = []
for train_idx, val_idx in cv.split(X, y):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
model.fit(X_train, y_train)
y_pred = model.predict(X_val)
if scoring == "f1_macro":
score = f1_score(y_val, y_pred, average="macro")
elif scoring == "roc_auc":
y_proba = model.predict_proba(X_val)[:, 1]
score = roc_auc_score(y_val, y_proba)
else:
raise ValueError(f"Scoring {scoring} not implemented in custom Optuna loop")
scores.append(score)
return np.mean(scores)
def optuna_tuning(
model_fn: Callable,
X: np.ndarray,
y: np.ndarray,
n_trials: int = 50,
cv_type: str = "stratified",
n_splits: int = 5,
scoring: str = "f1_macro",
groups: Optional[np.ndarray] = None,
direction: str = "maximize"
) -> optuna.Study:
cv = get_cv_splitter(cv_type, n_splits, groups)
study = optuna.create_study(direction=direction, sampler=TPESampler(seed=42))
study.optimize(
lambda trial: _optuna_objective(trial, model_fn, X, y, cv, scoring),
n_trials=n_trials
)
return study
def hyperopt_tuning(
model_fn: Callable,
X: np.ndarray,
y: np.ndarray,
space: Dict,
max_evals: int = 50,
cv_type: str = "stratified",
n_splits: int = 5,
scoring: str = "f1_macro",
groups: Optional[np.ndarray] = None
):
cv = get_cv_splitter(cv_type, n_splits, groups)
def objective(params):
model = model_fn(**params)
scores = []
for train_idx, val_idx in cv.split(X, y):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
model.fit(X_train, y_train)
y_pred = model.predict(X_val)
if scoring == "f1_macro":
score = f1_score(y_val, y_pred, average="macro")
elif scoring == "roc_auc":
y_proba = model.predict_proba(X_val)[:, 1]
score = roc_auc_score(y_val, y_proba)
else:
score = -1
scores.append(-score)
return {'loss': -np.mean(scores), 'status': STATUS_OK}
trials = Trials()
best = fmin(fn=objective, space=space, algo=tpe.suggest, max_evals=max_evals, trials=trials)
return best, trials
def compute_classification_metrics(
y_true: np.ndarray,
y_pred: np.ndarray,
y_proba: Optional[np.ndarray] = None,
average: str = "macro"
) -> Dict[str, float]:
metrics = {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average=average, zero_division=0),
"recall": recall_score(y_true, y_pred, average=average, zero_division=0),
"f1": f1_score(y_true, y_pred, average=average, zero_division=0),
}
if y_proba is not None:
if len(np.unique(y_true)) == 2:
metrics["roc_auc"] = roc_auc_score(y_true, y_proba[:, 1])
metrics["pr_auc"] = average_precision_score(y_true, y_proba[:, 1])
metrics["log_loss"] = log_loss(y_true, y_proba)
else:
try:
metrics["roc_auc"] = roc_auc_score(y_true, y_proba, multi_class="ovr", average=average)
metrics["pr_auc"] = average_precision_score(y_true, y_proba, average=average)
metrics["log_loss"] = log_loss(y_true, y_proba)
except ValueError:
metrics["roc_auc"] = np.nan
metrics["pr_auc"] = np.nan
return metrics
def evaluate_model(
model: BaseEstimator,
X_test: np.ndarray,
y_test: np.ndarray,
average: str = "macro",
return_pred: bool = False
) -> Union[Dict[str, float], Tuple[Dict[str, float], np.ndarray, Optional[np.ndarray]]]:
y_pred = model.predict(X_test)
y_proba = None
if hasattr(model, "predict_proba"):
y_proba = model.predict_proba(X_test)
metrics = compute_classification_metrics(y_test, y_pred, y_proba, average=average)
if return_pred:
return metrics, y_pred, y_proba
return metrics
def get_early_stopping(
monitor: str = "val_loss",
patience: int = 5,
mode: str = "min",
framework: str = "keras"
):
if framework == "keras":
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
es = EarlyStopping(monitor=monitor, patience=patience, restore_best_weights=True, mode=mode)
reduce_lr = ReduceLROnPlateau(monitor=monitor, factor=0.5, patience=3, min_lr=1e-7, mode=mode)
return [es, reduce_lr]
elif framework == "pytorch":
raise NotImplementedError("PyTorch callbacks require custom training loop")
else:
raise ValueError("framework must be 'keras' or 'pytorch'")
def init_wandb(
project_name: str = "text-classification",
run_name: Optional[str] = None,
config: Optional[Dict] = None
):
if not WANDB_AVAILABLE:
return None
wandb.init(project=project_name, name=run_name, config=config)
return wandb
def log_metrics_to_wandb(metrics: Dict[str, float]):
if WANDB_AVAILABLE and wandb.run:
wandb.log(metrics)
def suggest_transformer_hparams(trial) -> Dict[str, Any]:
return {
"learning_rate": trial.suggest_float("learning_rate", 1e-6, 1e-4, log=True),
"per_device_train_batch_size": trial.suggest_categorical("batch_size", [8, 16, 32]),
"num_train_epochs": trial.suggest_int("num_train_epochs", 2, 6),
"weight_decay": trial.suggest_float("weight_decay", 0.0, 0.3),
"warmup_ratio": trial.suggest_float("warmup_ratio", 0.0, 0.2),
}
def evaluate_transformer_outputs(
y_true: List[int],
y_pred: List[int],
y_logits: Optional[np.ndarray] = None
) -> Dict[str, float]:
y_true = np.array(y_true)
y_pred = np.array(y_pred)
if y_logits is not None:
y_proba = torch.softmax(torch.tensor(y_logits), dim=-1).numpy()
else:
y_proba = None
return compute_classification_metrics(y_true, y_pred, y_proba, average="macro")
def confusion_matrix_df(y_true: np.ndarray, y_pred: np.ndarray, labels: Optional[List] = None) -> pd.DataFrame:
cm = confusion_matrix(y_true, y_pred, labels=labels)
if labels is None:
labels = sorted(np.unique(y_true))
return pd.DataFrame(cm, index=[f"True_{l}" for l in labels], columns=[f"Pred_{l}" for l in labels])