""" Phase 5 Expert pipeline: Toxic-BERT (head-only) + bottleneck LR + hybrid ensemble. Run from repo root: uv sync --extra hf --extra train uv run python -m src.pipeline.run_expert_pipeline """ from __future__ import annotations import argparse import json import sys from datetime import datetime from pathlib import Path import numpy as np import pandas as pd import yaml from datasets import Dataset from sklearn.metrics import f1_score, roc_auc_score from sklearn.model_selection import train_test_split PROJECT_ROOT = Path(__file__).resolve().parents[2] sys.path.insert(0, str(PROJECT_ROOT)) from src.data.loader import load_raw_data from src.evaluation.expert_report import write_expert_report from src.evaluation.threshold_tuning import predict_with_threshold, search_best_threshold from src.features.augmentation import augment_toxic_train from src.models.hybrid_ensemble import ( StableLRModel, evaluate_ensemble, fit_lr_with_gap_control, save_ensemble_meta, soft_vote_probs, tune_ensemble_threshold, ) from src.models.transformer_trainer import train_transformer_stable from src.utils.logger import get_logger logger = get_logger(__name__) def _build_hf_dataset(X: pd.Series, y: pd.Series) -> Dataset: return Dataset.from_pandas( pd.DataFrame({"text": X.values, "label": y.astype(int).values}) ) def _lr_metrics_with_threshold( model: StableLRModel, X_train, y_train, X_val, y_val, X_test, y_test, *, gap_meta: dict, ) -> dict: y_val_arr = np.asarray(y_val).astype(int) y_test_arr = np.asarray(y_test).astype(int) y_train_arr = np.asarray(y_train).astype(int) val_probs = model.predict_proba(X_val)[:, 1] threshold, _ = search_best_threshold(y_val_arr, val_probs, metric="f1_toxic") train_probs = model.predict_proba(X_train)[:, 1] test_probs = model.predict_proba(X_test)[:, 1] train_preds = predict_with_threshold(train_probs, threshold) test_preds = predict_with_threshold(test_probs, threshold) f1_toxic_train = float(f1_score(y_train_arr, train_preds, pos_label=1, zero_division=0)) f1_toxic_test = float(f1_score(y_test_arr, test_preds, pos_label=1, zero_division=0)) gap_toxic = abs(f1_toxic_train - f1_toxic_test) f1_weighted_train = float( f1_score(y_train_arr, train_preds, average="weighted", zero_division=0) ) f1_weighted_test = float( f1_score(y_test_arr, test_preds, average="weighted", zero_division=0) ) gap_weighted = abs(f1_weighted_train - f1_weighted_test) return { "model": "LR-TFIDF-expert", "C": gap_meta["C"], "max_features": int(gap_meta.get("max_features", 250)), "min_df": int(gap_meta.get("min_df", 3)), "threshold": round(threshold, 4), "f1_weighted": round(f1_weighted_test, 4), "f1_toxic": round(f1_toxic_test, 4), "f1_toxic_train": round(f1_toxic_train, 4), "train_test_gap_toxic": round(gap_toxic, 4), "train_test_gap_toxic_pp": round(gap_toxic * 100, 2), "gap_toxic_ok": gap_toxic < 0.05, "f1_train": round(f1_weighted_train, 4), "train_test_gap": round(gap_weighted, 4), "train_test_gap_pp": round(gap_weighted * 100, 2), "roc_auc": round(float(roc_auc_score(y_test_arr, test_probs)), 4), "test_probs": test_probs, "val_probs": val_probs, } def run_expert_pipeline(*, config_path: Path | None = None) -> dict: cfg_path = config_path or (PROJECT_ROOT / "configs" / "expert_training.yaml") cfg = yaml.safe_load(open(cfg_path)) rand = int(cfg["pipeline"]["random_state"]) test_size = float(cfg["pipeline"]["test_size"]) val_size = float(cfg["pipeline"]["val_size"]) target = cfg["data"]["target_binary"] raw_path = PROJECT_ROOT / cfg["data"]["raw_path"] out_cfg = cfg["output"] reports_dir = PROJECT_ROOT / out_cfg["reports_dir"] bert_dir = PROJECT_ROOT / out_cfg["transformer_dir"] lr_path = PROJECT_ROOT / out_cfg["lr_path"] meta_path = PROJECT_ROOT / out_cfg["ensemble_meta_path"] reports_dir.mkdir(parents=True, exist_ok=True) run_id = datetime.now().strftime("%Y%m%d_%H%M%S") logger.info("=" * 60) logger.info(f"EXPERT PIPELINE (Phase 5) — run={run_id}") logger.info("=" * 60) df = load_raw_data(raw_path) X = df[cfg["data"].get("text_column", "Text")] y = df[target].astype(int) X_trainval, X_test, y_trainval, y_test = train_test_split( X, y, test_size=test_size, random_state=rand, stratify=y ) X_train, X_val, y_train, y_val = train_test_split( X_trainval, y_trainval, test_size=val_size, random_state=rand, stratify=y_trainval, ) cfg_aug = cfg cfg_aug.setdefault("augmentation", {})["enabled"] = True pivot = cfg_aug.get("augmentation", {}).get("pivot_lang", "de") logger.info(f"Augmentation EN→{pivot.upper()}→EN (toxic only)") X_train_aug, y_train_aug = augment_toxic_train(X_train, y_train, cfg_aug, seed=rand) aug_info = { "enabled": True, "strategy": "back_translation", "pivot_lang": pivot, "train_size_before": len(X_train), "train_size_after": len(X_train_aug), "added_samples": len(X_train_aug) - len(X_train), } y_test_arr = y_test.astype(int).values y_val_arr = y_val.astype(int).values max_gap = float(cfg["pipeline"].get("max_train_test_gap", 0.05)) all_metrics: dict = {"run_id": run_id, "augmentation": aug_info, "config": str(cfg_path)} lr_cfg = cfg["logistic_regression"] tfidf_cfg = lr_cfg.get("tfidf", {}) logger.info("LR-TFIDF (max_features=250) + gap search") use_orig_gap = lr_cfg.get("gap_search", {}).get("use_original_train_for_gap", True) lr_model, lr_gap_meta = fit_lr_with_gap_control( X_train_aug, y_train_aug, X_test, y_test, lr_cfg, tfidf_cfg, max_gap=max_gap, X_train_gap=X_train if use_orig_gap else None, y_train_gap=y_train if use_orig_gap else None, ) lr_model.save(lr_path) all_metrics["lr_gap_search"] = lr_gap_meta lr_metrics = _lr_metrics_with_threshold( lr_model, X_train if use_orig_gap else X_train_aug, y_train if use_orig_gap else y_train_aug, X_val, y_val, X_test, y_test, gap_meta=lr_gap_meta, ) lr_probs_test = lr_metrics.pop("test_probs") lr_probs_val = lr_metrics.pop("val_probs") all_metrics["logistic_regression"] = lr_metrics logger.info("Toxic-BERT — head-only fine-tune + val threshold tuning") hf_train = _build_hf_dataset(X_train_aug, y_train_aug) hf_val = _build_hf_dataset(X_val, y_val) hf_test = _build_hf_dataset(X_test, y_test) bert_result = train_transformer_stable( hf_train, hf_val, hf_test, y_test_arr, y_val_arr, cfg, bert_dir, seed=rand, model_label="Toxic-BERT-expert", ) bert_metrics = bert_result["metrics"] bert_probs_test = bert_result["test_probs"] bert_probs_val = bert_result["val_probs"] all_metrics["transformer"] = bert_metrics ens_cfg = cfg["ensemble"] bw = float(ens_cfg.get("bert_weight", 0.7)) lw = float(ens_cfg.get("lr_weight", 0.3)) ens_th_cfg = ens_cfg.get("threshold_tuning", {}) if ens_th_cfg.get("enabled", True): ens_threshold, _ = tune_ensemble_threshold( bert_probs_val, lr_probs_val, y_val_arr, bert_weight=bw, lr_weight=lw, metric=ens_th_cfg.get("metric", "f1_toxic"), ) else: ens_threshold = 0.5 ensemble_metrics = evaluate_ensemble( bert_probs_test, lr_probs_test, y_test_arr, bert_weight=bw, lr_weight=lw, model_name="Hybrid-ToxicBERT+LR", threshold=ens_threshold, ) ens_probs_train_bert = _bert_probs_on_texts(bert_result, X_train_aug) ens_probs_train_lr = lr_model.predict_proba(X_train_aug)[:, 1] ens_train = evaluate_ensemble( ens_probs_train_bert, ens_probs_train_lr, y_train_aug.astype(int).values, bert_weight=bw, lr_weight=lw, model_name="Hybrid-train", threshold=ens_threshold, ) gap_toxic = abs(ens_train["f1_toxic"] - ensemble_metrics["f1_toxic"]) ensemble_metrics["f1_toxic_train"] = ens_train["f1_toxic"] ensemble_metrics["train_test_gap_toxic"] = round(gap_toxic, 4) ensemble_metrics["train_test_gap_toxic_pp"] = round(gap_toxic * 100, 2) ensemble_metrics["gap_toxic_ok"] = gap_toxic < 0.05 ensemble_metrics["f1_train"] = ens_train["f1_weighted"] ensemble_metrics["train_test_gap"] = round( abs(ens_train["f1_weighted"] - ensemble_metrics["f1_weighted"]), 4 ) ensemble_metrics["train_test_gap_pp"] = round( ensemble_metrics["train_test_gap"] * 100, 2 ) all_metrics["ensemble"] = ensemble_metrics save_ensemble_meta( meta_path, { "run_id": run_id, "bert_dir": str(bert_dir), "lr_path": str(lr_path), "ensemble": ens_cfg, "bert_threshold": bert_metrics.get("threshold"), "lr_threshold": lr_metrics.get("threshold"), "ensemble_threshold": ens_threshold, }, ) summary_rows = [ {k: v for k, v in bert_metrics.items() if k != "gap_toxic_ok"}, lr_metrics, ensemble_metrics, ] pd.DataFrame(summary_rows).to_csv(reports_dir / f"expert_summary_{run_id}.csv", index=False) report_path = reports_dir / f"expert_run_{run_id}.json" with open(report_path, "w") as f: json.dump(_json_safe(all_metrics), f, indent=2) md_path = reports_dir / f"integrated_report_{run_id}.md" write_expert_report(all_metrics, md_path) logger.info(f"Expert report: {md_path}") _print_expert_targets(all_metrics) return all_metrics def _bert_probs_on_texts(bert_result: dict, X: pd.Series) -> np.ndarray: trainer = bert_result["trainer"] tokenizer = bert_result["tokenizer"] from datasets import Dataset ds = Dataset.from_pandas(pd.DataFrame({"text": X.values, "label": [0] * len(X)})) def _tok(batch): return tokenizer(batch["text"], truncation=True, max_length=128) tok = ds.map(_tok, batched=True) drop_cols = [c for c in tok.column_names if c not in ("input_ids", "attention_mask", "label")] if drop_cols: tok = tok.remove_columns(drop_cols) tok.set_format("torch") import torch out = trainer.predict(tok) return torch.softmax(torch.tensor(out.predictions), dim=1)[:, 1].numpy() def _json_safe(obj): if isinstance(obj, dict): return { k: _json_safe(v) for k, v in obj.items() if k not in ("ensemble_probs", "ensemble_preds") } if isinstance(obj, (list, tuple)): return [_json_safe(v) for v in obj] if isinstance(obj, np.ndarray): return obj.tolist() if isinstance(obj, (np.floating, np.integer)): return obj.item() return obj def _print_expert_targets(all_metrics: dict) -> None: logger.info("=" * 60) for key in ("transformer", "logistic_regression", "ensemble"): m = all_metrics.get(key) if not m: continue f1t = m.get("f1_toxic", 0) gap = m.get("train_test_gap_toxic", 0.99) ok_f1 = "✅" if f1t > 0.75 else "⚠️" ok_gap = "✅" if m.get("gap_toxic_ok", gap < 0.05) else "⚠️" logger.info( f"{m.get('model', key)}: F1-toxic={f1t:.4f} {ok_f1} | " f"toxic gap={gap:.4f} {ok_gap} | threshold={m.get('threshold', 0.5)}" ) logger.info("=" * 60) def main(): parser = argparse.ArgumentParser(description="Phase 5 expert Toxic-BERT + LR pipeline") parser.add_argument("--config", type=str, default=None) args = parser.parse_args() run_expert_pipeline( config_path=Path(args.config) if args.config else None, ) if __name__ == "__main__": main()