""" Hybrid ensemble: regularized DistilBERT probabilities + TF-IDF logistic regression. """ from __future__ import annotations import json from pathlib import Path import numpy as np from sklearn.linear_model import LogisticRegression from sklearn.metrics import f1_score, roc_auc_score from src.evaluation.threshold_tuning import predict_with_threshold, search_best_threshold from sklearn.pipeline import Pipeline from sklearn.feature_extraction.text import TfidfVectorizer from src.utils.logger import get_logger logger = get_logger(__name__) class StableLRModel: """Regularized LR on TF-IDF (stable_training.yaml).""" def __init__(self, lr_cfg: dict, tfidf_cfg: dict, *, C: float | None = None): ngram = tuple(tfidf_cfg.get("ngram_range", [1, 2])) self.pipeline = Pipeline( [ ( "tfidf", TfidfVectorizer( max_features=int(tfidf_cfg.get("max_features", 5000)), ngram_range=ngram, sublinear_tf=bool(tfidf_cfg.get("sublinear_tf", True)), min_df=int(tfidf_cfg.get("min_df", 3)), analyzer="word", strip_accents="unicode", ), ), ( "clf", LogisticRegression( C=float(C if C is not None else lr_cfg.get("C", 0.05)), max_iter=int(lr_cfg.get("max_iter", 2000)), class_weight=lr_cfg.get("class_weight", "balanced"), solver=lr_cfg.get("solver", "lbfgs"), random_state=42, ), ), ] ) self.is_fitted = False def fit(self, X_train, y_train): logger.info(f"Training stable LR — C={self.pipeline.named_steps['clf'].C}") self.pipeline.fit(X_train, y_train) self.is_fitted = True return self @property def C(self) -> float: return float(self.pipeline.named_steps["clf"].C) def set_C(self, c: float) -> None: self.pipeline.named_steps["clf"].C = float(c) def train_test_gap(self, X_train, y_train, X_test, y_test) -> tuple[float, float, float]: """Return (f1_train, f1_test, gap) using weighted F1.""" preds_train = self.predict(X_train) preds_test = self.predict(X_test) y_tr = np.asarray(y_train).astype(int) y_te = np.asarray(y_test).astype(int) f1_train = float(f1_score(y_tr, preds_train, average="weighted", zero_division=0)) f1_test = float(f1_score(y_te, preds_test, average="weighted", zero_division=0)) return f1_train, f1_test, abs(f1_train - f1_test) def predict(self, X): return self.pipeline.predict(X) def predict_proba(self, X) -> np.ndarray: return self.pipeline.predict_proba(X) def save(self, path: str | Path) -> None: import joblib path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) joblib.dump(self.pipeline, path) logger.info(f"Stable LR saved: {path}") @classmethod def load(cls, path: str | Path) -> "StableLRModel": import joblib inst = cls.__new__(cls) inst.pipeline = joblib.load(path) inst.is_fitted = True return inst def fit_lr_with_gap_control( X_train, y_train, X_test, y_test, lr_cfg: dict, tfidf_cfg: dict, *, max_gap: float = 0.05, X_train_gap=None, y_train_gap=None, ) -> tuple[StableLRModel, dict]: """ Fit LR on augmented train; tune regularization until |train F1 - test F1| < max_gap. """ gap_cfg = lr_cfg.get("gap_search", {}) X_gap = X_train_gap if X_train_gap is not None else X_train y_gap = y_train_gap if y_train_gap is not None else y_train if not gap_cfg.get("enabled", True): grid = [{"C": float(lr_cfg.get("C", 0.05)), **tfidf_cfg}] else: grid = gap_cfg.get("param_grid") or [ {"C": c, **tfidf_cfg} for c in gap_cfg.get("C_candidates", [lr_cfg.get("C", 0.05)]) ] best: StableLRModel | None = None best_meta: dict = {} best_gap = float("inf") for params in grid: merged_tfidf = {**tfidf_cfg, **{k: v for k, v in params.items() if k != "C"}} c = float(params.get("C", lr_cfg.get("C", 0.05))) model = StableLRModel(lr_cfg, merged_tfidf, C=c) model.fit(X_train, y_train) f1_train, f1_test, gap = model.train_test_gap(X_gap, y_gap, X_test, y_test) logger.info( f"LR gap search — C={c} max_features={merged_tfidf.get('max_features')} " f"min_df={merged_tfidf.get('min_df')} train_f1={f1_train:.4f} " f"test_f1={f1_test:.4f} gap={gap:.4f}" ) meta = { "C": c, "max_features": int(merged_tfidf.get("max_features", 800)), "min_df": int(merged_tfidf.get("min_df", 3)), "f1_train": round(f1_train, 4), "f1_test": round(f1_test, 4), "train_test_gap": round(gap, 4), "train_test_gap_pp": round(gap * 100, 2), "gap_ok": gap < max_gap, } if gap < best_gap: best, best_meta = model, meta best_gap = gap if gap < max_gap: logger.info(f"LR gap OK at C={c}") break if not best_meta.get("gap_ok"): logger.warning( f"LR gap still {best_meta['train_test_gap']:.4f} after grid search; " f"using best gap C={best_meta['C']}" ) return best, best_meta # type: ignore[return-value] def soft_vote_probs( prob_a: np.ndarray, prob_b: np.ndarray, weight_a: float = 0.5, weight_b: float = 0.5, ) -> np.ndarray: total = weight_a + weight_b return (weight_a * prob_a + weight_b * prob_b) / total def evaluate_ensemble( bert_probs: np.ndarray, lr_probs: np.ndarray, y_true: np.ndarray, *, bert_weight: float = 0.5, lr_weight: float = 0.5, model_name: str = "Hybrid-ensemble", threshold: float = 0.5, ) -> dict: """Combine probabilities and compute binary metrics.""" combined = soft_vote_probs(bert_probs, lr_probs, bert_weight, lr_weight) preds = predict_with_threshold(combined, threshold) y = np.asarray(y_true).astype(int) f1_test = float(f1_score(y, preds, average="weighted", zero_division=0)) f1_toxic = float(f1_score(y, preds, pos_label=1, zero_division=0)) return { "model": model_name, "threshold": round(threshold, 4), "f1_weighted": round(f1_test, 4), "f1_toxic": round(f1_toxic, 4), "roc_auc": round(float(roc_auc_score(y, combined)), 4), "fp": int(((y == 0) & (preds == 1)).sum()), "fn": int(((y == 1) & (preds == 0)).sum()), "ensemble_probs": combined, "ensemble_preds": preds, } def tune_ensemble_threshold( bert_probs: np.ndarray, lr_probs: np.ndarray, y_val: np.ndarray, *, bert_weight: float = 0.5, lr_weight: float = 0.5, metric: str = "f1_toxic", min_threshold: float = 0.05, max_threshold: float = 0.95, step: float = 0.01, ) -> tuple[float, float]: """Search ensemble threshold on validation soft-voted probabilities.""" combined = soft_vote_probs(bert_probs, lr_probs, bert_weight, lr_weight) return search_best_threshold( y_val, combined, metric=metric, min_threshold=min_threshold, max_threshold=max_threshold, step=step, ) def compute_performance_weights( bert_probs_val: np.ndarray, lr_probs_val: np.ndarray, y_val: np.ndarray, *, bert_threshold: float = 0.33, lr_threshold: float = 0.5, metric: str = "f1_weighted", min_lr_weight: float = 0.15, max_lr_weight: float = 0.45, ) -> tuple[float, float, dict]: """ Set soft-vote weights proportional to validation F1 (per branch threshold). """ y = np.asarray(y_val).astype(int) bert_preds = predict_with_threshold(bert_probs_val, bert_threshold) lr_preds = predict_with_threshold(lr_probs_val, lr_threshold) if metric == "f1_toxic": bert_score = float(f1_score(y, bert_preds, pos_label=1, zero_division=0)) lr_score = float(f1_score(y, lr_preds, pos_label=1, zero_division=0)) else: bert_score = float(f1_score(y, bert_preds, average="weighted", zero_division=0)) lr_score = float(f1_score(y, lr_preds, average="weighted", zero_division=0)) total = bert_score + lr_score if total <= 0: bw, lw = 0.7, 0.3 else: lw = lr_score / total lw = float(np.clip(lw, min_lr_weight, max_lr_weight)) bw = 1.0 - lw return bw, lw, { "bert_val_score": round(bert_score, 4), "lr_val_score": round(lr_score, 4), "bert_weight": round(bw, 4), "lr_weight": round(lw, 4), "weight_metric": metric, } def save_ensemble_meta(path: Path, meta: dict) -> None: path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: json.dump(meta, f, indent=2)