| """ |
| Final Squeeze β Performance Push (full Toxic-BERT unfreeze, TTA, micro-LR anchor). |
| |
| uv run python -m src.pipeline.run_performance_push_pipeline |
| uv run python -m src.pipeline.run_performance_push_pipeline --skip-augmentation |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import sys |
| from datetime import datetime |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| import yaml |
| from datasets import Dataset |
| from sklearn.metrics import f1_score, roc_auc_score |
| from sklearn.model_selection import train_test_split |
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[2] |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from src.data.dual_loader import load_dual_track_data |
| from src.evaluation.hybrid_clean_report import write_hybrid_clean_report |
| from src.evaluation.threshold_tuning import predict_with_threshold, search_best_threshold |
| from src.features.metadata_features import DEFAULT_METADATA_COLUMNS |
| from src.features.text_preprocessor import TextPreprocessor |
| from src.models.hybrid_ensemble import evaluate_ensemble, save_ensemble_meta, tune_ensemble_threshold |
| from src.models.metadata_lr import MetadataLRModel, fit_metadata_lr_with_gap_control |
| from src.models.transformer_trainer import train_transformer_stable |
| from src.pipeline.run_hybrid_clean_pipeline import ( |
| _branch_metrics, |
| _build_hf_dataset, |
| _json_safe, |
| _meta_frame, |
| augment_dual_track, |
| ) |
| from src.utils.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| def run_performance_push_pipeline( |
| *, |
| config_path: Path | None = None, |
| skip_augmentation: bool = False, |
| ) -> dict: |
| cfg_path = config_path or (PROJECT_ROOT / "configs" / "performance_push_training.yaml") |
| cfg = yaml.safe_load(open(cfg_path)) |
|
|
| rand = int(cfg["pipeline"]["random_state"]) |
| test_size = float(cfg["pipeline"]["test_size"]) |
| val_size = float(cfg["pipeline"]["val_size"]) |
| max_gap = float(cfg["pipeline"].get("max_train_test_gap", 0.045)) |
| target_f1 = float(cfg["pipeline"].get("target_f1_weighted", 0.80)) |
| target_col = cfg["data"]["target_binary"] |
| text_col = cfg["data"].get("text_column", "Text") |
|
|
| out_cfg = cfg["output"] |
| reports_dir = PROJECT_ROOT / out_cfg["reports_dir"] |
| bert_dir = PROJECT_ROOT / out_cfg["transformer_dir"] |
| lr_path = PROJECT_ROOT / out_cfg["lr_path"] |
| meta_path = PROJECT_ROOT / out_cfg["ensemble_meta_path"] |
| reports_dir.mkdir(parents=True, exist_ok=True) |
|
|
| run_id = datetime.now().strftime("%Y%m%d_%H%M%S") |
| pipeline_name = cfg.get("pipeline", {}).get("name", "performance_push") |
| tr_label = cfg.get("transformer", {}).get("model_label", "Toxic-BERT-push") |
| logger.info("=" * 60) |
| logger.info(f"{pipeline_name.upper().replace('_', ' ')} β run={run_id}") |
| logger.info("=" * 60) |
|
|
| df = load_dual_track_data( |
| PROJECT_ROOT / cfg["data"]["raw_path"], |
| processed_preprocessed=cfg["data"]["processed_preprocessed"], |
| processed_stats=cfg["data"]["processed_stats"], |
| target=target_col, |
| text_column=text_col, |
| id_column=cfg["data"].get("id_column", "CommentId"), |
| features_config=cfg["data"]["features_config"], |
| project_root=PROJECT_ROOT, |
| write_preprocessed_if_missing=False, |
| ) |
|
|
| y = df[target_col].astype(int) |
| idx_trainval, idx_test = train_test_split( |
| df.index, test_size=test_size, random_state=rand, stratify=y |
| ) |
| y_trainval = y.loc[idx_trainval] |
| idx_train, idx_val = train_test_split( |
| idx_trainval, test_size=val_size, random_state=rand, stratify=y_trainval |
| ) |
|
|
| def _slice(index): |
| return { |
| "raw": df.loc[index, text_col], |
| "clean": df.loc[index, "clean_text"], |
| "meta": _meta_frame(df.loc[index]), |
| "y": y.loc[index], |
| } |
|
|
| tr, va, te = _slice(idx_train), _slice(idx_val), _slice(idx_test) |
| preprocessor = TextPreprocessor( |
| config_path=str(PROJECT_ROOT / cfg["data"]["features_config"]) |
| ) |
|
|
| aug_info = {"enabled": False} |
| if not skip_augmentation and cfg.get("augmentation", {}).get("enabled", True): |
| logger.info("Dual-track augmentation") |
| tr["raw"], tr["clean"], tr["meta"], tr["y"] = augment_dual_track( |
| tr["raw"], tr["clean"], tr["meta"], tr["y"], cfg, preprocessor, seed=rand |
| ) |
| aug_info = { |
| "enabled": True, |
| "pivot_lang": cfg["augmentation"].get("pivot_lang", "de"), |
| "train_size_after": len(tr["y"]), |
| } |
|
|
| y_test_arr = te["y"].astype(int).values |
| y_val_arr = va["y"].astype(int).values |
| max_gap_pp = max_gap * 100 |
|
|
| all_metrics: dict = { |
| "run_id": run_id, |
| "pipeline": pipeline_name, |
| "config": str(cfg_path), |
| "target_f1_weighted": target_f1, |
| "max_train_test_gap_pp": max_gap_pp, |
| "augmentation": aug_info, |
| } |
|
|
| |
| lr_cfg = cfg["logistic_regression"] |
| tfidf_cfg = lr_cfg.get("tfidf", {}) |
| use_orig_gap = lr_cfg.get("gap_search", {}).get("use_original_train_for_gap", True) |
|
|
| logger.info("Metadata LR β clean_text anchor, max_features=300, Cβ0.01") |
| lr_model, lr_gap_meta = fit_metadata_lr_with_gap_control( |
| tr["clean"], |
| tr["meta"], |
| tr["y"], |
| te["clean"], |
| te["meta"], |
| te["y"], |
| lr_cfg, |
| tfidf_cfg, |
| max_gap=max_gap, |
| X_train_gap_clean=df.loc[idx_train, "clean_text"] if use_orig_gap else tr["clean"], |
| meta_train_gap=_meta_frame(df.loc[idx_train]) if use_orig_gap else tr["meta"], |
| y_train_gap=y.loc[idx_train] if use_orig_gap else tr["y"], |
| ) |
| lr_model.save(lr_path) |
| all_metrics["lr_gap_search"] = lr_gap_meta |
|
|
| lr_val_probs = lr_model.predict_proba(va["clean"], va["meta"])[:, 1] |
| lr_test_probs = lr_model.predict_proba(te["clean"], te["meta"])[:, 1] |
| lr_train_probs = lr_model.predict_proba( |
| df.loc[idx_train, "clean_text"] if use_orig_gap else tr["clean"], |
| _meta_frame(df.loc[idx_train]) if use_orig_gap else tr["meta"], |
| )[:, 1] |
|
|
| lr_metrics = _branch_metrics( |
| y.loc[idx_train] if use_orig_gap else tr["y"], |
| te["y"], |
| va["y"], |
| lr_train_probs, |
| lr_val_probs, |
| lr_test_probs, |
| model_name="LR-clean+meta-push", |
| gap_meta=lr_gap_meta, |
| ) |
| all_metrics["logistic_regression"] = lr_metrics |
|
|
| |
| tr_cfg = cfg.get("transformer", {}) |
| logger.info( |
| f"Training Toxic-BERT β {tr_cfg.get('freeze_mode', 'full')} unfreeze, " |
| f"lr={tr_cfg.get('learning_rate', 5e-6)}, gapβ€{max_gap_pp:.1f}pp, TTA on test" |
| ) |
| hf_train = _build_hf_dataset(tr["raw"], tr["y"]) |
| hf_val = _build_hf_dataset(va["raw"], va["y"]) |
| hf_test = _build_hf_dataset(te["raw"], te["y"]) |
|
|
| bert_result = train_transformer_stable( |
| hf_train, |
| hf_val, |
| hf_test, |
| y_test_arr, |
| y_val_arr, |
| cfg, |
| bert_dir, |
| seed=rand, |
| model_label=tr_label, |
| ) |
| bert_metrics = bert_result["metrics"] |
| bert_val_probs = bert_result["val_probs"] |
| bert_test_probs = bert_result["test_probs"] |
| bert_train_probs = infer_train_probs(bert_result, tr["raw"]) |
| all_metrics["transformer"] = bert_metrics |
|
|
| |
| ens_cfg = cfg["ensemble"] |
| bw = float(ens_cfg.get("bert_weight", 0.95)) |
| lw = float(ens_cfg.get("lr_weight", 0.05)) |
| logger.info(f"Fixed ensemble weights β BERT={bw} LR={lw}") |
|
|
| th_cfg = ens_cfg.get("threshold_tuning", {}) |
| if th_cfg.get("enabled", True): |
| ens_threshold, val_f1_at_t = tune_ensemble_threshold( |
| bert_val_probs, |
| lr_val_probs, |
| y_val_arr, |
| bert_weight=bw, |
| lr_weight=lw, |
| metric=th_cfg.get("metric", "f1_weighted"), |
| min_threshold=float(th_cfg.get("min_threshold", 0.30)), |
| max_threshold=float(th_cfg.get("max_threshold", 0.70)), |
| step=float(th_cfg.get("step", 0.01)), |
| ) |
| logger.info(f"Ensemble threshold={ens_threshold:.2f} val_f1_weighted={val_f1_at_t:.4f}") |
| else: |
| ens_threshold = 0.5 |
| val_f1_at_t = None |
|
|
| ensemble_metrics = evaluate_ensemble( |
| bert_test_probs, |
| lr_test_probs, |
| y_test_arr, |
| bert_weight=bw, |
| lr_weight=lw, |
| model_name=f"{pipeline_name.replace('_', ' ').title()}-Hybrid", |
| threshold=ens_threshold, |
| ) |
|
|
| from src.models.hybrid_ensemble import soft_vote_probs |
|
|
| ens_train_probs = soft_vote_probs(bert_train_probs, lr_model.predict_proba(tr["clean"], tr["meta"])[:, 1], bw, lw) |
| ens_train_preds = predict_with_threshold(ens_train_probs, ens_threshold) |
| y_tr_arr = tr["y"].astype(int).values |
| f1_train_ens = float(f1_score(y_tr_arr, ens_train_preds, average="weighted", zero_division=0)) |
| gap_ens = abs(f1_train_ens - ensemble_metrics["f1_weighted"]) |
| ensemble_metrics["f1_train"] = round(f1_train_ens, 4) |
| ensemble_metrics["train_test_gap"] = round(gap_ens, 4) |
| ensemble_metrics["train_test_gap_pp"] = round(gap_ens * 100, 2) |
| ensemble_metrics["gap_ok"] = gap_ens <= max_gap |
| ensemble_metrics["bert_weight"] = bw |
| ensemble_metrics["lr_weight"] = lw |
| ensemble_metrics["val_f1_weighted_at_threshold"] = round(val_f1_at_t, 4) if val_f1_at_t else None |
| ensemble_metrics["target_f1_hit"] = ensemble_metrics["f1_weighted"] >= target_f1 |
| all_metrics["ensemble"] = ensemble_metrics |
|
|
| save_ensemble_meta( |
| meta_path, |
| { |
| "run_id": run_id, |
| "bert_dir": str(bert_dir), |
| "lr_path": str(lr_path), |
| "weights": {"bert": bw, "lr": lw, "fixed": True}, |
| "thresholds": { |
| "bert": bert_metrics.get("threshold"), |
| "lr": lr_metrics.get("threshold"), |
| "ensemble": ens_threshold, |
| }, |
| }, |
| ) |
|
|
| report_path = reports_dir / f"{pipeline_name}_run_{run_id}.json" |
| with open(report_path, "w") as f: |
| json.dump(_json_safe(all_metrics), f, indent=2) |
|
|
| md_path = reports_dir / f"integrated_report_{run_id}.md" |
| write_hybrid_clean_report(all_metrics, md_path) |
| logger.info(f"Report: {md_path}") |
|
|
| _print_summary(all_metrics, target_f1, max_gap_pp) |
| return all_metrics |
|
|
|
|
| def infer_train_probs(bert_result: dict, X_raw: pd.Series) -> np.ndarray: |
| import torch |
| from datasets import Dataset |
|
|
| trainer = bert_result["trainer"] |
| tokenizer = bert_result["tokenizer"] |
| ds = Dataset.from_pandas(pd.DataFrame({"text": X_raw.values, "label": [0] * len(X_raw)})) |
|
|
| def _tok(batch): |
| return tokenizer(batch["text"], truncation=True, max_length=128) |
|
|
| tok = ds.map(_tok, batched=True) |
| drop = [c for c in tok.column_names if c not in ("input_ids", "attention_mask", "label")] |
| if drop: |
| tok = tok.remove_columns(drop) |
| tok.set_format("torch") |
| out = trainer.predict(tok) |
| return torch.softmax(torch.tensor(out.predictions), dim=1)[:, 1].numpy() |
|
|
|
|
| def _print_summary(metrics: dict, target: float, max_gap_pp: float) -> None: |
| logger.info("=" * 60) |
| ens = metrics.get("ensemble", {}) |
| hit = ens.get("f1_weighted", 0) >= target |
| logger.info( |
| f"INTEGRATED F1 weighted={ens.get('f1_weighted', 0):.4f} " |
| f"{'β
TARGET' if hit else 'β οΈ below'} (target {target})" |
| ) |
| for key in ("transformer", "logistic_regression", "ensemble"): |
| m = metrics.get(key, {}) |
| if m: |
| ok = m.get("gap_ok", m.get("train_test_gap_pp", 99) <= max_gap_pp) |
| logger.info( |
| f" {m.get('model', key)}: F1w={m.get('f1_weighted')} " |
| f"gap_pp={m.get('train_test_gap_pp')} gap_ok={ok}" |
| ) |
| logger.info("=" * 60) |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Performance Push pipeline") |
| parser.add_argument("--config", type=str, default=None) |
| parser.add_argument("--skip-augmentation", action="store_true") |
| args = parser.parse_args() |
| run_performance_push_pipeline( |
| config_path=Path(args.config) if args.config else None, |
| skip_augmentation=args.skip_augmentation, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|