SignalMod / src /pipeline /run_performance_push_pipeline.py
Mirae Kang
feat: implement new models and improve UI, #23
46cc63a
"""
Final Squeeze β€” Performance Push (full Toxic-BERT unfreeze, TTA, micro-LR anchor).
uv run python -m src.pipeline.run_performance_push_pipeline
uv run python -m src.pipeline.run_performance_push_pipeline --skip-augmentation
"""
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
import numpy as np
import pandas as pd
import yaml
from datasets import Dataset
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.model_selection import train_test_split
PROJECT_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(PROJECT_ROOT))
from src.data.dual_loader import load_dual_track_data
from src.evaluation.hybrid_clean_report import write_hybrid_clean_report
from src.evaluation.threshold_tuning import predict_with_threshold, search_best_threshold
from src.features.metadata_features import DEFAULT_METADATA_COLUMNS
from src.features.text_preprocessor import TextPreprocessor
from src.models.hybrid_ensemble import evaluate_ensemble, save_ensemble_meta, tune_ensemble_threshold
from src.models.metadata_lr import MetadataLRModel, fit_metadata_lr_with_gap_control
from src.models.transformer_trainer import train_transformer_stable
from src.pipeline.run_hybrid_clean_pipeline import (
_branch_metrics,
_build_hf_dataset,
_json_safe,
_meta_frame,
augment_dual_track,
)
from src.utils.logger import get_logger
logger = get_logger(__name__)
def run_performance_push_pipeline(
*,
config_path: Path | None = None,
skip_augmentation: bool = False,
) -> dict:
cfg_path = config_path or (PROJECT_ROOT / "configs" / "performance_push_training.yaml")
cfg = yaml.safe_load(open(cfg_path))
rand = int(cfg["pipeline"]["random_state"])
test_size = float(cfg["pipeline"]["test_size"])
val_size = float(cfg["pipeline"]["val_size"])
max_gap = float(cfg["pipeline"].get("max_train_test_gap", 0.045))
target_f1 = float(cfg["pipeline"].get("target_f1_weighted", 0.80))
target_col = cfg["data"]["target_binary"]
text_col = cfg["data"].get("text_column", "Text")
out_cfg = cfg["output"]
reports_dir = PROJECT_ROOT / out_cfg["reports_dir"]
bert_dir = PROJECT_ROOT / out_cfg["transformer_dir"]
lr_path = PROJECT_ROOT / out_cfg["lr_path"]
meta_path = PROJECT_ROOT / out_cfg["ensemble_meta_path"]
reports_dir.mkdir(parents=True, exist_ok=True)
run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
pipeline_name = cfg.get("pipeline", {}).get("name", "performance_push")
tr_label = cfg.get("transformer", {}).get("model_label", "Toxic-BERT-push")
logger.info("=" * 60)
logger.info(f"{pipeline_name.upper().replace('_', ' ')} β€” run={run_id}")
logger.info("=" * 60)
df = load_dual_track_data(
PROJECT_ROOT / cfg["data"]["raw_path"],
processed_preprocessed=cfg["data"]["processed_preprocessed"],
processed_stats=cfg["data"]["processed_stats"],
target=target_col,
text_column=text_col,
id_column=cfg["data"].get("id_column", "CommentId"),
features_config=cfg["data"]["features_config"],
project_root=PROJECT_ROOT,
write_preprocessed_if_missing=False,
)
y = df[target_col].astype(int)
idx_trainval, idx_test = train_test_split(
df.index, test_size=test_size, random_state=rand, stratify=y
)
y_trainval = y.loc[idx_trainval]
idx_train, idx_val = train_test_split(
idx_trainval, test_size=val_size, random_state=rand, stratify=y_trainval
)
def _slice(index):
return {
"raw": df.loc[index, text_col],
"clean": df.loc[index, "clean_text"],
"meta": _meta_frame(df.loc[index]),
"y": y.loc[index],
}
tr, va, te = _slice(idx_train), _slice(idx_val), _slice(idx_test)
preprocessor = TextPreprocessor(
config_path=str(PROJECT_ROOT / cfg["data"]["features_config"])
)
aug_info = {"enabled": False}
if not skip_augmentation and cfg.get("augmentation", {}).get("enabled", True):
logger.info("Dual-track augmentation")
tr["raw"], tr["clean"], tr["meta"], tr["y"] = augment_dual_track(
tr["raw"], tr["clean"], tr["meta"], tr["y"], cfg, preprocessor, seed=rand
)
aug_info = {
"enabled": True,
"pivot_lang": cfg["augmentation"].get("pivot_lang", "de"),
"train_size_after": len(tr["y"]),
}
y_test_arr = te["y"].astype(int).values
y_val_arr = va["y"].astype(int).values
max_gap_pp = max_gap * 100
all_metrics: dict = {
"run_id": run_id,
"pipeline": pipeline_name,
"config": str(cfg_path),
"target_f1_weighted": target_f1,
"max_train_test_gap_pp": max_gap_pp,
"augmentation": aug_info,
}
# ── LR (clean_text + metadata, C=0.01, max_features=800) ─────────────────
lr_cfg = cfg["logistic_regression"]
tfidf_cfg = lr_cfg.get("tfidf", {})
use_orig_gap = lr_cfg.get("gap_search", {}).get("use_original_train_for_gap", True)
logger.info("Metadata LR β€” clean_text anchor, max_features=300, Cβ‰ˆ0.01")
lr_model, lr_gap_meta = fit_metadata_lr_with_gap_control(
tr["clean"],
tr["meta"],
tr["y"],
te["clean"],
te["meta"],
te["y"],
lr_cfg,
tfidf_cfg,
max_gap=max_gap,
X_train_gap_clean=df.loc[idx_train, "clean_text"] if use_orig_gap else tr["clean"],
meta_train_gap=_meta_frame(df.loc[idx_train]) if use_orig_gap else tr["meta"],
y_train_gap=y.loc[idx_train] if use_orig_gap else tr["y"],
)
lr_model.save(lr_path)
all_metrics["lr_gap_search"] = lr_gap_meta
lr_val_probs = lr_model.predict_proba(va["clean"], va["meta"])[:, 1]
lr_test_probs = lr_model.predict_proba(te["clean"], te["meta"])[:, 1]
lr_train_probs = lr_model.predict_proba(
df.loc[idx_train, "clean_text"] if use_orig_gap else tr["clean"],
_meta_frame(df.loc[idx_train]) if use_orig_gap else tr["meta"],
)[:, 1]
lr_metrics = _branch_metrics(
y.loc[idx_train] if use_orig_gap else tr["y"],
te["y"],
va["y"],
lr_train_probs,
lr_val_probs,
lr_test_probs,
model_name="LR-clean+meta-push",
gap_meta=lr_gap_meta,
)
all_metrics["logistic_regression"] = lr_metrics
# ── Toxic-BERT β€” full unfreeze, micro-LR, TTA on test ───────────────────
tr_cfg = cfg.get("transformer", {})
logger.info(
f"Training Toxic-BERT β€” {tr_cfg.get('freeze_mode', 'full')} unfreeze, "
f"lr={tr_cfg.get('learning_rate', 5e-6)}, gap≀{max_gap_pp:.1f}pp, TTA on test"
)
hf_train = _build_hf_dataset(tr["raw"], tr["y"])
hf_val = _build_hf_dataset(va["raw"], va["y"])
hf_test = _build_hf_dataset(te["raw"], te["y"])
bert_result = train_transformer_stable(
hf_train,
hf_val,
hf_test,
y_test_arr,
y_val_arr,
cfg,
bert_dir,
seed=rand,
model_label=tr_label,
)
bert_metrics = bert_result["metrics"]
bert_val_probs = bert_result["val_probs"]
bert_test_probs = bert_result["test_probs"]
bert_train_probs = infer_train_probs(bert_result, tr["raw"])
all_metrics["transformer"] = bert_metrics
# ── Fixed 0.95 / 0.05 ensemble + val threshold grid 0.30–0.70 ───────────
ens_cfg = cfg["ensemble"]
bw = float(ens_cfg.get("bert_weight", 0.95))
lw = float(ens_cfg.get("lr_weight", 0.05))
logger.info(f"Fixed ensemble weights β€” BERT={bw} LR={lw}")
th_cfg = ens_cfg.get("threshold_tuning", {})
if th_cfg.get("enabled", True):
ens_threshold, val_f1_at_t = tune_ensemble_threshold(
bert_val_probs,
lr_val_probs,
y_val_arr,
bert_weight=bw,
lr_weight=lw,
metric=th_cfg.get("metric", "f1_weighted"),
min_threshold=float(th_cfg.get("min_threshold", 0.30)),
max_threshold=float(th_cfg.get("max_threshold", 0.70)),
step=float(th_cfg.get("step", 0.01)),
)
logger.info(f"Ensemble threshold={ens_threshold:.2f} val_f1_weighted={val_f1_at_t:.4f}")
else:
ens_threshold = 0.5
val_f1_at_t = None
ensemble_metrics = evaluate_ensemble(
bert_test_probs,
lr_test_probs,
y_test_arr,
bert_weight=bw,
lr_weight=lw,
model_name=f"{pipeline_name.replace('_', ' ').title()}-Hybrid",
threshold=ens_threshold,
)
from src.models.hybrid_ensemble import soft_vote_probs
ens_train_probs = soft_vote_probs(bert_train_probs, lr_model.predict_proba(tr["clean"], tr["meta"])[:, 1], bw, lw)
ens_train_preds = predict_with_threshold(ens_train_probs, ens_threshold)
y_tr_arr = tr["y"].astype(int).values
f1_train_ens = float(f1_score(y_tr_arr, ens_train_preds, average="weighted", zero_division=0))
gap_ens = abs(f1_train_ens - ensemble_metrics["f1_weighted"])
ensemble_metrics["f1_train"] = round(f1_train_ens, 4)
ensemble_metrics["train_test_gap"] = round(gap_ens, 4)
ensemble_metrics["train_test_gap_pp"] = round(gap_ens * 100, 2)
ensemble_metrics["gap_ok"] = gap_ens <= max_gap
ensemble_metrics["bert_weight"] = bw
ensemble_metrics["lr_weight"] = lw
ensemble_metrics["val_f1_weighted_at_threshold"] = round(val_f1_at_t, 4) if val_f1_at_t else None
ensemble_metrics["target_f1_hit"] = ensemble_metrics["f1_weighted"] >= target_f1
all_metrics["ensemble"] = ensemble_metrics
save_ensemble_meta(
meta_path,
{
"run_id": run_id,
"bert_dir": str(bert_dir),
"lr_path": str(lr_path),
"weights": {"bert": bw, "lr": lw, "fixed": True},
"thresholds": {
"bert": bert_metrics.get("threshold"),
"lr": lr_metrics.get("threshold"),
"ensemble": ens_threshold,
},
},
)
report_path = reports_dir / f"{pipeline_name}_run_{run_id}.json"
with open(report_path, "w") as f:
json.dump(_json_safe(all_metrics), f, indent=2)
md_path = reports_dir / f"integrated_report_{run_id}.md"
write_hybrid_clean_report(all_metrics, md_path)
logger.info(f"Report: {md_path}")
_print_summary(all_metrics, target_f1, max_gap_pp)
return all_metrics
def infer_train_probs(bert_result: dict, X_raw: pd.Series) -> np.ndarray:
import torch
from datasets import Dataset
trainer = bert_result["trainer"]
tokenizer = bert_result["tokenizer"]
ds = Dataset.from_pandas(pd.DataFrame({"text": X_raw.values, "label": [0] * len(X_raw)}))
def _tok(batch):
return tokenizer(batch["text"], truncation=True, max_length=128)
tok = ds.map(_tok, batched=True)
drop = [c for c in tok.column_names if c not in ("input_ids", "attention_mask", "label")]
if drop:
tok = tok.remove_columns(drop)
tok.set_format("torch")
out = trainer.predict(tok)
return torch.softmax(torch.tensor(out.predictions), dim=1)[:, 1].numpy()
def _print_summary(metrics: dict, target: float, max_gap_pp: float) -> None:
logger.info("=" * 60)
ens = metrics.get("ensemble", {})
hit = ens.get("f1_weighted", 0) >= target
logger.info(
f"INTEGRATED F1 weighted={ens.get('f1_weighted', 0):.4f} "
f"{'βœ… TARGET' if hit else '⚠️ below'} (target {target})"
)
for key in ("transformer", "logistic_regression", "ensemble"):
m = metrics.get(key, {})
if m:
ok = m.get("gap_ok", m.get("train_test_gap_pp", 99) <= max_gap_pp)
logger.info(
f" {m.get('model', key)}: F1w={m.get('f1_weighted')} "
f"gap_pp={m.get('train_test_gap_pp')} gap_ok={ok}"
)
logger.info("=" * 60)
def main():
parser = argparse.ArgumentParser(description="Performance Push pipeline")
parser.add_argument("--config", type=str, default=None)
parser.add_argument("--skip-augmentation", action="store_true")
args = parser.parse_args()
run_performance_push_pipeline(
config_path=Path(args.config) if args.config else None,
skip_augmentation=args.skip_augmentation,
)
if __name__ == "__main__":
main()