SignalMod / src /pipeline /run_stable_pipeline.py
Mirae Kang
feat: implement new models and improve UI, #23
46cc63a
"""
Stable training pipeline: toxic-only augmentation, regularized DistilBERT,
high-C LR on TF-IDF, and hybrid soft-voting ensemble.
Run from repo root:
uv sync --extra hf --extra train
uv run python -m src.pipeline.run_stable_pipeline
uv run python -m src.pipeline.run_stable_pipeline --skip-augmentation
uv run python -m src.pipeline.run_stable_pipeline --bert-only
"""
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
import numpy as np
import pandas as pd
import yaml
from datasets import Dataset
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.model_selection import train_test_split
PROJECT_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(PROJECT_ROOT))
from src.data.loader import load_raw_data
from src.evaluation.stable_cv import evaluate_lr_fold, stratified_kfold_cv
from src.evaluation.stable_report import write_integrated_report
from src.features.augmentation import augment_toxic_train
from src.models.hybrid_ensemble import (
StableLRModel,
evaluate_ensemble,
fit_lr_with_gap_control,
save_ensemble_meta,
)
from src.models.transformer_trainer import train_distilbert_stable
from src.utils.logger import get_logger
logger = get_logger(__name__)
def _build_hf_dataset(X: pd.Series, y: pd.Series) -> Dataset:
return Dataset.from_pandas(
pd.DataFrame({"text": X.values, "label": y.astype(int).values})
)
def run_stable_pipeline(
*,
skip_augmentation: bool = False,
bert_only: bool = False,
config_path: Path | None = None,
) -> dict:
cfg_path = config_path or (PROJECT_ROOT / "configs" / "stable_training.yaml")
cfg = yaml.safe_load(open(cfg_path))
rand = int(cfg["pipeline"]["random_state"])
test_size = float(cfg["pipeline"]["test_size"])
val_size = float(cfg["pipeline"]["val_size"])
target = cfg["data"]["target_binary"]
raw_path = PROJECT_ROOT / cfg["data"]["raw_path"]
out_cfg = cfg["output"]
reports_dir = PROJECT_ROOT / out_cfg["reports_dir"]
bert_dir = PROJECT_ROOT / out_cfg["distilbert_dir"]
lr_path = PROJECT_ROOT / out_cfg["lr_path"]
meta_path = PROJECT_ROOT / out_cfg["ensemble_meta_path"]
reports_dir.mkdir(parents=True, exist_ok=True)
run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
logger.info("=" * 60)
logger.info(f"STABLE PIPELINE — run={run_id}")
logger.info("=" * 60)
df = load_raw_data(raw_path)
X = df[cfg["data"].get("text_column", "Text")]
y = df[target].astype(int)
X_trainval, X_test, y_trainval, y_test = train_test_split(
X, y, test_size=test_size, random_state=rand, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_trainval,
y_trainval,
test_size=val_size,
random_state=rand,
stratify=y_trainval,
)
logger.info(f"Split — train={len(X_train)} val={len(X_val)} test={len(X_test)}")
if skip_augmentation:
logger.warning("--skip-augmentation ignored for production; augmentation is required.")
cfg_aug = cfg
cfg_aug.setdefault("augmentation", {})["enabled"] = True
X_train_aug, y_train_aug = augment_toxic_train(X_train, y_train, cfg_aug, seed=rand)
aug_info = {
"enabled": True,
"strategy": cfg_aug.get("augmentation", {}).get("strategy", "back_translation"),
"train_size_before": len(X_train),
"train_size_after": len(X_train_aug),
"added_samples": len(X_train_aug) - len(X_train),
}
y_test_arr = y_test.astype(int).values
max_gap = float(cfg["pipeline"].get("max_train_test_gap", 0.05))
cv_folds = int(cfg["pipeline"].get("cv_folds", 5))
all_metrics: dict = {"run_id": run_id, "augmentation": aug_info}
lr_metrics = None
lr_probs_test = None
lr_model = None
lr_gap_meta: dict = {}
ensemble_metrics = None
if not bert_only:
lr_cfg = cfg["logistic_regression"]
tfidf_cfg = lr_cfg.get("tfidf", {})
logger.info("=" * 60)
logger.info("LR-TFIDF — gap control FIRST (target < 5 pp)")
logger.info("=" * 60)
use_orig_gap = lr_cfg.get("gap_search", {}).get("use_original_train_for_gap", True)
lr_model, lr_gap_meta = fit_lr_with_gap_control(
X_train_aug,
y_train_aug,
X_test,
y_test,
lr_cfg,
tfidf_cfg,
max_gap=max_gap,
X_train_gap=X_train if use_orig_gap else None,
y_train_gap=y_train if use_orig_gap else None,
)
lr_model.save(lr_path)
all_metrics["lr_gap_search"] = lr_gap_meta
lr_probs_test = lr_model.predict_proba(X_test)[:, 1]
lr_preds_test = lr_model.predict(X_test)
f1_lr_test = float(
f1_score(y_test_arr, lr_preds_test, average="weighted", zero_division=0)
)
f1_lr_train = lr_gap_meta["f1_train"]
gap_lr = lr_gap_meta["train_test_gap"]
lr_metrics = {
"model": "LR-TFIDF-stable",
"C": lr_gap_meta["C"],
"max_features": int(lr_gap_meta.get("max_features", tfidf_cfg.get("max_features", 800))),
"min_df": int(lr_gap_meta.get("min_df", tfidf_cfg.get("min_df", 3))),
"f1_weighted": round(f1_lr_test, 4),
"f1_train": f1_lr_train,
"train_test_gap": gap_lr,
"train_test_gap_pp": lr_gap_meta["train_test_gap_pp"],
"gap_ok": lr_gap_meta["gap_ok"],
"roc_auc": round(float(roc_auc_score(y_test_arr, lr_probs_test)), 4),
}
all_metrics["logistic_regression"] = lr_metrics
tfidf_cv = {
**tfidf_cfg,
"max_features": lr_gap_meta.get("max_features", tfidf_cfg.get("max_features", 800)),
"min_df": lr_gap_meta.get("min_df", tfidf_cfg.get("min_df", 3)),
}
def _lr_factory():
return StableLRModel(lr_cfg, tfidf_cv, C=lr_gap_meta["C"])
logger.info("=" * 60)
logger.info(f"LR 5-fold CV (n={cv_folds}) on train+val pool")
logger.info("=" * 60)
cv_lr = stratified_kfold_cv(
X_trainval,
y_trainval,
n_splits=cv_folds,
random_state=rand,
fit_predict_fn=lambda xt, yt, xv, yv: evaluate_lr_fold(
_lr_factory,
xt,
yt,
xv,
yv,
augment_fn=augment_toxic_train,
cfg=cfg_aug,
seed=rand,
),
)
all_metrics["cv_logistic_regression"] = cv_lr
logger.info("=" * 60)
logger.info("DistilBERT fine-tune (15 epochs, early stop on val F1)")
logger.info("=" * 60)
hf_train = _build_hf_dataset(X_train_aug, y_train_aug)
hf_val = _build_hf_dataset(X_val, y_val)
hf_test = _build_hf_dataset(X_test, y_test)
bert_result = train_distilbert_stable(
hf_train,
hf_val,
hf_test,
y_test_arr,
cfg,
bert_dir,
seed=rand,
)
bert_metrics = bert_result["metrics"]
bert_probs_test = bert_result["test_probs"]
all_metrics["distilbert"] = bert_metrics
if not bert_only and lr_model is not None:
ens_cfg = cfg["ensemble"]
ensemble_metrics = evaluate_ensemble(
bert_probs_test,
lr_probs_test,
y_test_arr,
bert_weight=float(ens_cfg.get("bert_weight", 0.5)),
lr_weight=float(ens_cfg.get("lr_weight", 0.5)),
model_name="Hybrid-BERT+LR",
)
# Train gap for ensemble (in-sample)
ens_probs_train_bert = _bert_probs_on_texts(bert_result, X_train_aug)
ens_probs_train_lr = lr_model.predict_proba(X_train_aug)[:, 1]
ens_train = evaluate_ensemble(
ens_probs_train_bert,
ens_probs_train_lr,
y_train_aug.astype(int).values,
bert_weight=float(ens_cfg.get("bert_weight", 0.5)),
lr_weight=float(ens_cfg.get("lr_weight", 0.5)),
model_name="Hybrid-train",
)
gap_ens = abs(ens_train["f1_weighted"] - ensemble_metrics["f1_weighted"])
ensemble_metrics["f1_train"] = ens_train["f1_weighted"]
ensemble_metrics["train_test_gap"] = round(gap_ens, 4)
ensemble_metrics["train_test_gap_pp"] = round(gap_ens * 100, 2)
all_metrics["ensemble"] = ensemble_metrics
save_ensemble_meta(
meta_path,
{
"run_id": run_id,
"bert_dir": str(bert_dir),
"lr_path": str(lr_path),
"ensemble": ens_cfg,
},
)
# Reports (CSV summary without matplotlib dependency)
summary_rows = [bert_metrics]
if lr_metrics:
summary_rows.append(lr_metrics)
if ensemble_metrics:
summary_rows.append(ensemble_metrics)
pd.DataFrame(summary_rows).to_csv(
reports_dir / f"stable_summary_{run_id}.csv", index=False
)
report_path = reports_dir / f"stable_run_{run_id}.json"
with open(report_path, "w") as f:
json.dump(_json_safe(all_metrics), f, indent=2)
md_path = reports_dir / f"integrated_report_{run_id}.md"
write_integrated_report(all_metrics, md_path)
logger.info(f"Integrated report: {md_path}")
_print_targets(all_metrics)
return all_metrics
def _bert_probs_on_texts(bert_result: dict, X: pd.Series) -> np.ndarray:
"""Run saved trainer on raw texts for train-gap on ensemble."""
trainer = bert_result["trainer"]
tokenizer = bert_result["tokenizer"]
from datasets import Dataset
ds = Dataset.from_pandas(pd.DataFrame({"text": X.values, "label": [0] * len(X)}))
def _tok(batch):
return tokenizer(batch["text"], truncation=True, max_length=128)
tok = ds.map(_tok, batched=True)
drop_cols = [c for c in tok.column_names if c not in ("input_ids", "attention_mask", "label")]
if drop_cols:
tok = tok.remove_columns(drop_cols)
tok.set_format("torch")
import torch
out = trainer.predict(tok)
return torch.softmax(torch.tensor(out.predictions), dim=1)[:, 1].numpy()
def _json_safe(obj):
"""Convert numpy scalars/arrays for JSON reporting."""
if isinstance(obj, dict):
return {k: _json_safe(v) for k, v in obj.items() if k not in ("ensemble_probs", "ensemble_preds")}
if isinstance(obj, (list, tuple)):
return [_json_safe(v) for v in obj]
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, (np.floating, np.integer)):
return obj.item()
return obj
def _print_targets(all_metrics: dict) -> None:
logger.info("=" * 60)
for key in ("distilbert", "logistic_regression", "ensemble"):
m = all_metrics.get(key)
if not m:
continue
f1 = m.get("f1_weighted", 0)
gap = m.get("train_test_gap", m.get("train_test_gap_pp", 999) / 100)
if "train_test_gap_pp" in m:
gap = m["train_test_gap_pp"] / 100
ok_f1 = "✅" if f1 > 0.80 else "⚠️"
ok_gap = "✅" if gap < 0.05 else "⚠️"
logger.info(
f"{m.get('model', key)}: F1={f1:.4f} {ok_f1} | gap={gap:.4f} {ok_gap}"
)
logger.info("=" * 60)
def main():
parser = argparse.ArgumentParser(description="Stable DistilBERT + LR ensemble pipeline")
parser.add_argument(
"--skip-augmentation",
action="store_true",
help="Ignored in production — augmentation is always applied",
)
parser.add_argument("--bert-only", action="store_true")
parser.add_argument("--config", type=str, default=None)
args = parser.parse_args()
run_stable_pipeline(
skip_augmentation=args.skip_augmentation,
bert_only=args.bert_only,
config_path=Path(args.config) if args.config else None,
)
if __name__ == "__main__":
main()