SignalMod / src /pipeline /run_expert_pipeline.py
Mirae Kang
feat: implement new models and improve UI, #23
46cc63a
"""
Phase 5 Expert pipeline: Toxic-BERT (head-only) + bottleneck LR + hybrid ensemble.
Run from repo root:
uv sync --extra hf --extra train
uv run python -m src.pipeline.run_expert_pipeline
"""
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
import numpy as np
import pandas as pd
import yaml
from datasets import Dataset
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.model_selection import train_test_split
PROJECT_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(PROJECT_ROOT))
from src.data.loader import load_raw_data
from src.evaluation.expert_report import write_expert_report
from src.evaluation.threshold_tuning import predict_with_threshold, search_best_threshold
from src.features.augmentation import augment_toxic_train
from src.models.hybrid_ensemble import (
StableLRModel,
evaluate_ensemble,
fit_lr_with_gap_control,
save_ensemble_meta,
soft_vote_probs,
tune_ensemble_threshold,
)
from src.models.transformer_trainer import train_transformer_stable
from src.utils.logger import get_logger
logger = get_logger(__name__)
def _build_hf_dataset(X: pd.Series, y: pd.Series) -> Dataset:
return Dataset.from_pandas(
pd.DataFrame({"text": X.values, "label": y.astype(int).values})
)
def _lr_metrics_with_threshold(
model: StableLRModel,
X_train,
y_train,
X_val,
y_val,
X_test,
y_test,
*,
gap_meta: dict,
) -> dict:
y_val_arr = np.asarray(y_val).astype(int)
y_test_arr = np.asarray(y_test).astype(int)
y_train_arr = np.asarray(y_train).astype(int)
val_probs = model.predict_proba(X_val)[:, 1]
threshold, _ = search_best_threshold(y_val_arr, val_probs, metric="f1_toxic")
train_probs = model.predict_proba(X_train)[:, 1]
test_probs = model.predict_proba(X_test)[:, 1]
train_preds = predict_with_threshold(train_probs, threshold)
test_preds = predict_with_threshold(test_probs, threshold)
f1_toxic_train = float(f1_score(y_train_arr, train_preds, pos_label=1, zero_division=0))
f1_toxic_test = float(f1_score(y_test_arr, test_preds, pos_label=1, zero_division=0))
gap_toxic = abs(f1_toxic_train - f1_toxic_test)
f1_weighted_train = float(
f1_score(y_train_arr, train_preds, average="weighted", zero_division=0)
)
f1_weighted_test = float(
f1_score(y_test_arr, test_preds, average="weighted", zero_division=0)
)
gap_weighted = abs(f1_weighted_train - f1_weighted_test)
return {
"model": "LR-TFIDF-expert",
"C": gap_meta["C"],
"max_features": int(gap_meta.get("max_features", 250)),
"min_df": int(gap_meta.get("min_df", 3)),
"threshold": round(threshold, 4),
"f1_weighted": round(f1_weighted_test, 4),
"f1_toxic": round(f1_toxic_test, 4),
"f1_toxic_train": round(f1_toxic_train, 4),
"train_test_gap_toxic": round(gap_toxic, 4),
"train_test_gap_toxic_pp": round(gap_toxic * 100, 2),
"gap_toxic_ok": gap_toxic < 0.05,
"f1_train": round(f1_weighted_train, 4),
"train_test_gap": round(gap_weighted, 4),
"train_test_gap_pp": round(gap_weighted * 100, 2),
"roc_auc": round(float(roc_auc_score(y_test_arr, test_probs)), 4),
"test_probs": test_probs,
"val_probs": val_probs,
}
def run_expert_pipeline(*, config_path: Path | None = None) -> dict:
cfg_path = config_path or (PROJECT_ROOT / "configs" / "expert_training.yaml")
cfg = yaml.safe_load(open(cfg_path))
rand = int(cfg["pipeline"]["random_state"])
test_size = float(cfg["pipeline"]["test_size"])
val_size = float(cfg["pipeline"]["val_size"])
target = cfg["data"]["target_binary"]
raw_path = PROJECT_ROOT / cfg["data"]["raw_path"]
out_cfg = cfg["output"]
reports_dir = PROJECT_ROOT / out_cfg["reports_dir"]
bert_dir = PROJECT_ROOT / out_cfg["transformer_dir"]
lr_path = PROJECT_ROOT / out_cfg["lr_path"]
meta_path = PROJECT_ROOT / out_cfg["ensemble_meta_path"]
reports_dir.mkdir(parents=True, exist_ok=True)
run_id = datetime.now().strftime("%Y%m%d_%H%M%S")
logger.info("=" * 60)
logger.info(f"EXPERT PIPELINE (Phase 5) — run={run_id}")
logger.info("=" * 60)
df = load_raw_data(raw_path)
X = df[cfg["data"].get("text_column", "Text")]
y = df[target].astype(int)
X_trainval, X_test, y_trainval, y_test = train_test_split(
X, y, test_size=test_size, random_state=rand, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_trainval,
y_trainval,
test_size=val_size,
random_state=rand,
stratify=y_trainval,
)
cfg_aug = cfg
cfg_aug.setdefault("augmentation", {})["enabled"] = True
pivot = cfg_aug.get("augmentation", {}).get("pivot_lang", "de")
logger.info(f"Augmentation EN→{pivot.upper()}→EN (toxic only)")
X_train_aug, y_train_aug = augment_toxic_train(X_train, y_train, cfg_aug, seed=rand)
aug_info = {
"enabled": True,
"strategy": "back_translation",
"pivot_lang": pivot,
"train_size_before": len(X_train),
"train_size_after": len(X_train_aug),
"added_samples": len(X_train_aug) - len(X_train),
}
y_test_arr = y_test.astype(int).values
y_val_arr = y_val.astype(int).values
max_gap = float(cfg["pipeline"].get("max_train_test_gap", 0.05))
all_metrics: dict = {"run_id": run_id, "augmentation": aug_info, "config": str(cfg_path)}
lr_cfg = cfg["logistic_regression"]
tfidf_cfg = lr_cfg.get("tfidf", {})
logger.info("LR-TFIDF (max_features=250) + gap search")
use_orig_gap = lr_cfg.get("gap_search", {}).get("use_original_train_for_gap", True)
lr_model, lr_gap_meta = fit_lr_with_gap_control(
X_train_aug,
y_train_aug,
X_test,
y_test,
lr_cfg,
tfidf_cfg,
max_gap=max_gap,
X_train_gap=X_train if use_orig_gap else None,
y_train_gap=y_train if use_orig_gap else None,
)
lr_model.save(lr_path)
all_metrics["lr_gap_search"] = lr_gap_meta
lr_metrics = _lr_metrics_with_threshold(
lr_model,
X_train if use_orig_gap else X_train_aug,
y_train if use_orig_gap else y_train_aug,
X_val,
y_val,
X_test,
y_test,
gap_meta=lr_gap_meta,
)
lr_probs_test = lr_metrics.pop("test_probs")
lr_probs_val = lr_metrics.pop("val_probs")
all_metrics["logistic_regression"] = lr_metrics
logger.info("Toxic-BERT — head-only fine-tune + val threshold tuning")
hf_train = _build_hf_dataset(X_train_aug, y_train_aug)
hf_val = _build_hf_dataset(X_val, y_val)
hf_test = _build_hf_dataset(X_test, y_test)
bert_result = train_transformer_stable(
hf_train,
hf_val,
hf_test,
y_test_arr,
y_val_arr,
cfg,
bert_dir,
seed=rand,
model_label="Toxic-BERT-expert",
)
bert_metrics = bert_result["metrics"]
bert_probs_test = bert_result["test_probs"]
bert_probs_val = bert_result["val_probs"]
all_metrics["transformer"] = bert_metrics
ens_cfg = cfg["ensemble"]
bw = float(ens_cfg.get("bert_weight", 0.7))
lw = float(ens_cfg.get("lr_weight", 0.3))
ens_th_cfg = ens_cfg.get("threshold_tuning", {})
if ens_th_cfg.get("enabled", True):
ens_threshold, _ = tune_ensemble_threshold(
bert_probs_val,
lr_probs_val,
y_val_arr,
bert_weight=bw,
lr_weight=lw,
metric=ens_th_cfg.get("metric", "f1_toxic"),
)
else:
ens_threshold = 0.5
ensemble_metrics = evaluate_ensemble(
bert_probs_test,
lr_probs_test,
y_test_arr,
bert_weight=bw,
lr_weight=lw,
model_name="Hybrid-ToxicBERT+LR",
threshold=ens_threshold,
)
ens_probs_train_bert = _bert_probs_on_texts(bert_result, X_train_aug)
ens_probs_train_lr = lr_model.predict_proba(X_train_aug)[:, 1]
ens_train = evaluate_ensemble(
ens_probs_train_bert,
ens_probs_train_lr,
y_train_aug.astype(int).values,
bert_weight=bw,
lr_weight=lw,
model_name="Hybrid-train",
threshold=ens_threshold,
)
gap_toxic = abs(ens_train["f1_toxic"] - ensemble_metrics["f1_toxic"])
ensemble_metrics["f1_toxic_train"] = ens_train["f1_toxic"]
ensemble_metrics["train_test_gap_toxic"] = round(gap_toxic, 4)
ensemble_metrics["train_test_gap_toxic_pp"] = round(gap_toxic * 100, 2)
ensemble_metrics["gap_toxic_ok"] = gap_toxic < 0.05
ensemble_metrics["f1_train"] = ens_train["f1_weighted"]
ensemble_metrics["train_test_gap"] = round(
abs(ens_train["f1_weighted"] - ensemble_metrics["f1_weighted"]), 4
)
ensemble_metrics["train_test_gap_pp"] = round(
ensemble_metrics["train_test_gap"] * 100, 2
)
all_metrics["ensemble"] = ensemble_metrics
save_ensemble_meta(
meta_path,
{
"run_id": run_id,
"bert_dir": str(bert_dir),
"lr_path": str(lr_path),
"ensemble": ens_cfg,
"bert_threshold": bert_metrics.get("threshold"),
"lr_threshold": lr_metrics.get("threshold"),
"ensemble_threshold": ens_threshold,
},
)
summary_rows = [
{k: v for k, v in bert_metrics.items() if k != "gap_toxic_ok"},
lr_metrics,
ensemble_metrics,
]
pd.DataFrame(summary_rows).to_csv(reports_dir / f"expert_summary_{run_id}.csv", index=False)
report_path = reports_dir / f"expert_run_{run_id}.json"
with open(report_path, "w") as f:
json.dump(_json_safe(all_metrics), f, indent=2)
md_path = reports_dir / f"integrated_report_{run_id}.md"
write_expert_report(all_metrics, md_path)
logger.info(f"Expert report: {md_path}")
_print_expert_targets(all_metrics)
return all_metrics
def _bert_probs_on_texts(bert_result: dict, X: pd.Series) -> np.ndarray:
trainer = bert_result["trainer"]
tokenizer = bert_result["tokenizer"]
from datasets import Dataset
ds = Dataset.from_pandas(pd.DataFrame({"text": X.values, "label": [0] * len(X)}))
def _tok(batch):
return tokenizer(batch["text"], truncation=True, max_length=128)
tok = ds.map(_tok, batched=True)
drop_cols = [c for c in tok.column_names if c not in ("input_ids", "attention_mask", "label")]
if drop_cols:
tok = tok.remove_columns(drop_cols)
tok.set_format("torch")
import torch
out = trainer.predict(tok)
return torch.softmax(torch.tensor(out.predictions), dim=1)[:, 1].numpy()
def _json_safe(obj):
if isinstance(obj, dict):
return {
k: _json_safe(v)
for k, v in obj.items()
if k not in ("ensemble_probs", "ensemble_preds")
}
if isinstance(obj, (list, tuple)):
return [_json_safe(v) for v in obj]
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, (np.floating, np.integer)):
return obj.item()
return obj
def _print_expert_targets(all_metrics: dict) -> None:
logger.info("=" * 60)
for key in ("transformer", "logistic_regression", "ensemble"):
m = all_metrics.get(key)
if not m:
continue
f1t = m.get("f1_toxic", 0)
gap = m.get("train_test_gap_toxic", 0.99)
ok_f1 = "✅" if f1t > 0.75 else "⚠️"
ok_gap = "✅" if m.get("gap_toxic_ok", gap < 0.05) else "⚠️"
logger.info(
f"{m.get('model', key)}: F1-toxic={f1t:.4f} {ok_f1} | "
f"toxic gap={gap:.4f} {ok_gap} | threshold={m.get('threshold', 0.5)}"
)
logger.info("=" * 60)
def main():
parser = argparse.ArgumentParser(description="Phase 5 expert Toxic-BERT + LR pipeline")
parser.add_argument("--config", type=str, default=None)
args = parser.parse_args()
run_expert_pipeline(
config_path=Path(args.config) if args.config else None,
)
if __name__ == "__main__":
main()