| """ |
| Stable training pipeline: toxic-only augmentation, regularized DistilBERT, |
| high-C LR on TF-IDF, and hybrid soft-voting ensemble. |
| |
| Run from repo root: |
| uv sync --extra hf --extra train |
| uv run python -m src.pipeline.run_stable_pipeline |
| uv run python -m src.pipeline.run_stable_pipeline --skip-augmentation |
| uv run python -m src.pipeline.run_stable_pipeline --bert-only |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import sys |
| from datetime import datetime |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| import yaml |
| from datasets import Dataset |
| from sklearn.metrics import f1_score, roc_auc_score |
| from sklearn.model_selection import train_test_split |
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[2] |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from src.data.loader import load_raw_data |
| from src.evaluation.stable_cv import evaluate_lr_fold, stratified_kfold_cv |
| from src.evaluation.stable_report import write_integrated_report |
| from src.features.augmentation import augment_toxic_train |
| from src.models.hybrid_ensemble import ( |
| StableLRModel, |
| evaluate_ensemble, |
| fit_lr_with_gap_control, |
| save_ensemble_meta, |
| ) |
| from src.models.transformer_trainer import train_distilbert_stable |
| from src.utils.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| def _build_hf_dataset(X: pd.Series, y: pd.Series) -> Dataset: |
| return Dataset.from_pandas( |
| pd.DataFrame({"text": X.values, "label": y.astype(int).values}) |
| ) |
|
|
|
|
| def run_stable_pipeline( |
| *, |
| skip_augmentation: bool = False, |
| bert_only: bool = False, |
| config_path: Path | None = None, |
| ) -> dict: |
| cfg_path = config_path or (PROJECT_ROOT / "configs" / "stable_training.yaml") |
| cfg = yaml.safe_load(open(cfg_path)) |
|
|
| rand = int(cfg["pipeline"]["random_state"]) |
| test_size = float(cfg["pipeline"]["test_size"]) |
| val_size = float(cfg["pipeline"]["val_size"]) |
| target = cfg["data"]["target_binary"] |
| raw_path = PROJECT_ROOT / cfg["data"]["raw_path"] |
|
|
| out_cfg = cfg["output"] |
| reports_dir = PROJECT_ROOT / out_cfg["reports_dir"] |
| bert_dir = PROJECT_ROOT / out_cfg["distilbert_dir"] |
| lr_path = PROJECT_ROOT / out_cfg["lr_path"] |
| meta_path = PROJECT_ROOT / out_cfg["ensemble_meta_path"] |
| reports_dir.mkdir(parents=True, exist_ok=True) |
|
|
| run_id = datetime.now().strftime("%Y%m%d_%H%M%S") |
| logger.info("=" * 60) |
| logger.info(f"STABLE PIPELINE — run={run_id}") |
| logger.info("=" * 60) |
|
|
| df = load_raw_data(raw_path) |
| X = df[cfg["data"].get("text_column", "Text")] |
| y = df[target].astype(int) |
|
|
| X_trainval, X_test, y_trainval, y_test = train_test_split( |
| X, y, test_size=test_size, random_state=rand, stratify=y |
| ) |
| X_train, X_val, y_train, y_val = train_test_split( |
| X_trainval, |
| y_trainval, |
| test_size=val_size, |
| random_state=rand, |
| stratify=y_trainval, |
| ) |
| logger.info(f"Split — train={len(X_train)} val={len(X_val)} test={len(X_test)}") |
|
|
| if skip_augmentation: |
| logger.warning("--skip-augmentation ignored for production; augmentation is required.") |
| cfg_aug = cfg |
| cfg_aug.setdefault("augmentation", {})["enabled"] = True |
|
|
| X_train_aug, y_train_aug = augment_toxic_train(X_train, y_train, cfg_aug, seed=rand) |
| aug_info = { |
| "enabled": True, |
| "strategy": cfg_aug.get("augmentation", {}).get("strategy", "back_translation"), |
| "train_size_before": len(X_train), |
| "train_size_after": len(X_train_aug), |
| "added_samples": len(X_train_aug) - len(X_train), |
| } |
|
|
| y_test_arr = y_test.astype(int).values |
| max_gap = float(cfg["pipeline"].get("max_train_test_gap", 0.05)) |
| cv_folds = int(cfg["pipeline"].get("cv_folds", 5)) |
|
|
| all_metrics: dict = {"run_id": run_id, "augmentation": aug_info} |
|
|
| lr_metrics = None |
| lr_probs_test = None |
| lr_model = None |
| lr_gap_meta: dict = {} |
| ensemble_metrics = None |
|
|
| if not bert_only: |
| lr_cfg = cfg["logistic_regression"] |
| tfidf_cfg = lr_cfg.get("tfidf", {}) |
|
|
| logger.info("=" * 60) |
| logger.info("LR-TFIDF — gap control FIRST (target < 5 pp)") |
| logger.info("=" * 60) |
| use_orig_gap = lr_cfg.get("gap_search", {}).get("use_original_train_for_gap", True) |
| lr_model, lr_gap_meta = fit_lr_with_gap_control( |
| X_train_aug, |
| y_train_aug, |
| X_test, |
| y_test, |
| lr_cfg, |
| tfidf_cfg, |
| max_gap=max_gap, |
| X_train_gap=X_train if use_orig_gap else None, |
| y_train_gap=y_train if use_orig_gap else None, |
| ) |
| lr_model.save(lr_path) |
| all_metrics["lr_gap_search"] = lr_gap_meta |
|
|
| lr_probs_test = lr_model.predict_proba(X_test)[:, 1] |
| lr_preds_test = lr_model.predict(X_test) |
| f1_lr_test = float( |
| f1_score(y_test_arr, lr_preds_test, average="weighted", zero_division=0) |
| ) |
| f1_lr_train = lr_gap_meta["f1_train"] |
| gap_lr = lr_gap_meta["train_test_gap"] |
| lr_metrics = { |
| "model": "LR-TFIDF-stable", |
| "C": lr_gap_meta["C"], |
| "max_features": int(lr_gap_meta.get("max_features", tfidf_cfg.get("max_features", 800))), |
| "min_df": int(lr_gap_meta.get("min_df", tfidf_cfg.get("min_df", 3))), |
| "f1_weighted": round(f1_lr_test, 4), |
| "f1_train": f1_lr_train, |
| "train_test_gap": gap_lr, |
| "train_test_gap_pp": lr_gap_meta["train_test_gap_pp"], |
| "gap_ok": lr_gap_meta["gap_ok"], |
| "roc_auc": round(float(roc_auc_score(y_test_arr, lr_probs_test)), 4), |
| } |
| all_metrics["logistic_regression"] = lr_metrics |
|
|
| tfidf_cv = { |
| **tfidf_cfg, |
| "max_features": lr_gap_meta.get("max_features", tfidf_cfg.get("max_features", 800)), |
| "min_df": lr_gap_meta.get("min_df", tfidf_cfg.get("min_df", 3)), |
| } |
|
|
| def _lr_factory(): |
| return StableLRModel(lr_cfg, tfidf_cv, C=lr_gap_meta["C"]) |
|
|
| logger.info("=" * 60) |
| logger.info(f"LR 5-fold CV (n={cv_folds}) on train+val pool") |
| logger.info("=" * 60) |
| cv_lr = stratified_kfold_cv( |
| X_trainval, |
| y_trainval, |
| n_splits=cv_folds, |
| random_state=rand, |
| fit_predict_fn=lambda xt, yt, xv, yv: evaluate_lr_fold( |
| _lr_factory, |
| xt, |
| yt, |
| xv, |
| yv, |
| augment_fn=augment_toxic_train, |
| cfg=cfg_aug, |
| seed=rand, |
| ), |
| ) |
| all_metrics["cv_logistic_regression"] = cv_lr |
|
|
| logger.info("=" * 60) |
| logger.info("DistilBERT fine-tune (15 epochs, early stop on val F1)") |
| logger.info("=" * 60) |
| hf_train = _build_hf_dataset(X_train_aug, y_train_aug) |
| hf_val = _build_hf_dataset(X_val, y_val) |
| hf_test = _build_hf_dataset(X_test, y_test) |
|
|
| bert_result = train_distilbert_stable( |
| hf_train, |
| hf_val, |
| hf_test, |
| y_test_arr, |
| cfg, |
| bert_dir, |
| seed=rand, |
| ) |
| bert_metrics = bert_result["metrics"] |
| bert_probs_test = bert_result["test_probs"] |
| all_metrics["distilbert"] = bert_metrics |
|
|
| if not bert_only and lr_model is not None: |
| ens_cfg = cfg["ensemble"] |
| ensemble_metrics = evaluate_ensemble( |
| bert_probs_test, |
| lr_probs_test, |
| y_test_arr, |
| bert_weight=float(ens_cfg.get("bert_weight", 0.5)), |
| lr_weight=float(ens_cfg.get("lr_weight", 0.5)), |
| model_name="Hybrid-BERT+LR", |
| ) |
| |
| ens_probs_train_bert = _bert_probs_on_texts(bert_result, X_train_aug) |
| ens_probs_train_lr = lr_model.predict_proba(X_train_aug)[:, 1] |
| ens_train = evaluate_ensemble( |
| ens_probs_train_bert, |
| ens_probs_train_lr, |
| y_train_aug.astype(int).values, |
| bert_weight=float(ens_cfg.get("bert_weight", 0.5)), |
| lr_weight=float(ens_cfg.get("lr_weight", 0.5)), |
| model_name="Hybrid-train", |
| ) |
| gap_ens = abs(ens_train["f1_weighted"] - ensemble_metrics["f1_weighted"]) |
| ensemble_metrics["f1_train"] = ens_train["f1_weighted"] |
| ensemble_metrics["train_test_gap"] = round(gap_ens, 4) |
| ensemble_metrics["train_test_gap_pp"] = round(gap_ens * 100, 2) |
| all_metrics["ensemble"] = ensemble_metrics |
|
|
| save_ensemble_meta( |
| meta_path, |
| { |
| "run_id": run_id, |
| "bert_dir": str(bert_dir), |
| "lr_path": str(lr_path), |
| "ensemble": ens_cfg, |
| }, |
| ) |
|
|
| |
| summary_rows = [bert_metrics] |
| if lr_metrics: |
| summary_rows.append(lr_metrics) |
| if ensemble_metrics: |
| summary_rows.append(ensemble_metrics) |
| pd.DataFrame(summary_rows).to_csv( |
| reports_dir / f"stable_summary_{run_id}.csv", index=False |
| ) |
|
|
| report_path = reports_dir / f"stable_run_{run_id}.json" |
| with open(report_path, "w") as f: |
| json.dump(_json_safe(all_metrics), f, indent=2) |
|
|
| md_path = reports_dir / f"integrated_report_{run_id}.md" |
| write_integrated_report(all_metrics, md_path) |
| logger.info(f"Integrated report: {md_path}") |
|
|
| _print_targets(all_metrics) |
|
|
| return all_metrics |
|
|
|
|
| def _bert_probs_on_texts(bert_result: dict, X: pd.Series) -> np.ndarray: |
| """Run saved trainer on raw texts for train-gap on ensemble.""" |
| trainer = bert_result["trainer"] |
| tokenizer = bert_result["tokenizer"] |
| from datasets import Dataset |
|
|
| ds = Dataset.from_pandas(pd.DataFrame({"text": X.values, "label": [0] * len(X)})) |
|
|
| def _tok(batch): |
| return tokenizer(batch["text"], truncation=True, max_length=128) |
|
|
| tok = ds.map(_tok, batched=True) |
| drop_cols = [c for c in tok.column_names if c not in ("input_ids", "attention_mask", "label")] |
| if drop_cols: |
| tok = tok.remove_columns(drop_cols) |
| tok.set_format("torch") |
| import torch |
|
|
| out = trainer.predict(tok) |
| return torch.softmax(torch.tensor(out.predictions), dim=1)[:, 1].numpy() |
|
|
|
|
| def _json_safe(obj): |
| """Convert numpy scalars/arrays for JSON reporting.""" |
| if isinstance(obj, dict): |
| return {k: _json_safe(v) for k, v in obj.items() if k not in ("ensemble_probs", "ensemble_preds")} |
| if isinstance(obj, (list, tuple)): |
| return [_json_safe(v) for v in obj] |
| if isinstance(obj, np.ndarray): |
| return obj.tolist() |
| if isinstance(obj, (np.floating, np.integer)): |
| return obj.item() |
| return obj |
|
|
|
|
| def _print_targets(all_metrics: dict) -> None: |
| logger.info("=" * 60) |
| for key in ("distilbert", "logistic_regression", "ensemble"): |
| m = all_metrics.get(key) |
| if not m: |
| continue |
| f1 = m.get("f1_weighted", 0) |
| gap = m.get("train_test_gap", m.get("train_test_gap_pp", 999) / 100) |
| if "train_test_gap_pp" in m: |
| gap = m["train_test_gap_pp"] / 100 |
| ok_f1 = "✅" if f1 > 0.80 else "⚠️" |
| ok_gap = "✅" if gap < 0.05 else "⚠️" |
| logger.info( |
| f"{m.get('model', key)}: F1={f1:.4f} {ok_f1} | gap={gap:.4f} {ok_gap}" |
| ) |
| logger.info("=" * 60) |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Stable DistilBERT + LR ensemble pipeline") |
| parser.add_argument( |
| "--skip-augmentation", |
| action="store_true", |
| help="Ignored in production — augmentation is always applied", |
| ) |
| parser.add_argument("--bert-only", action="store_true") |
| parser.add_argument("--config", type=str, default=None) |
| args = parser.parse_args() |
| run_stable_pipeline( |
| skip_augmentation=args.skip_augmentation, |
| bert_only=args.bert_only, |
| config_path=Path(args.config) if args.config else None, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|