SignalMod / src /experiments /notebook_13_sprints.py
Mirae Kang
feat: implement new models and improve UI, #23
46cc63a
"""
Notebook 13 — Hyper-optimization sprints (5-fold CV, gap < 5%, F1 > 0.80).
uv run python -m src.experiments.notebook_13_sprints
"""
from __future__ import annotations
import json
import re
import sys
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from datasets import Dataset
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score, roc_auc_score
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.preprocessing import StandardScaler
from transformers import (
AutoModelForSequenceClassification,
AutoTokenizer,
DataCollatorWithPadding,
Trainer,
TrainingArguments,
set_seed,
)
PROJECT_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(PROJECT_ROOT))
from src.data.dual_loader import load_dual_track_data
from src.evaluation.threshold_tuning import predict_with_threshold, search_best_threshold
from src.features.augmentation import (
back_translate_texts,
deduplicate_by_cosine,
toxic_back_translation,
)
from src.features.metadata_features import extract_metadata_features
from src.models.transformer_trainer import (
compute_hf_metrics,
freeze_head_only,
logits_to_toxic_prob,
)
from src.pipeline.run_hybrid_clean_pipeline import _meta_frame
from src.utils.logger import get_logger
logger = get_logger(__name__)
MODEL_ID = "unitary/toxic-bert"
ARTIFACT_DIR = PROJECT_ROOT / "models" / "notebook_13"
REPORT_DIR = PROJECT_ROOT / "reports" / "notebook_13"
MAX_GAP = 0.05
TARGET_F1 = 0.80
N_FOLDS = 5
RANDOM_STATE = 42
PIVOTS = ("de", "fr", "es")
TTA_WEIGHTS = (0.50, 0.25, 0.25) # original, DE, FR
@dataclass
class FoldMetrics:
fold: int
f1_train: float
f1_test: float
f1_val: float
gap: float
gap_pp: float
gap_ok: bool
threshold: float
roc_auc: float
def _gap_ok(gap: float) -> bool:
return gap <= MAX_GAP
def _summarize_folds(folds: list[FoldMetrics]) -> dict:
tests = [f.f1_test for f in folds]
gaps = [f.gap_pp for f in folds]
return {
"f1_test_mean": round(float(np.mean(tests)), 4),
"f1_test_std": round(float(np.std(tests)), 4),
"f1_test_min": round(float(np.min(tests)), 4),
"f1_test_max": round(float(np.max(tests)), 4),
"gap_pp_mean": round(float(np.mean(gaps)), 2),
"gap_pp_max": round(float(np.max(gaps)), 2),
"all_gap_ok": all(f.gap_ok for f in folds),
"target_f1_hit": float(np.mean(tests)) >= TARGET_F1,
"folds": [
{
"fold": f.fold,
"f1_train": f.f1_train,
"f1_test": f.f1_test,
"f1_val": f.f1_val,
"train_test_gap_pp": f.gap_pp,
"gap_ok": f.gap_ok,
"threshold": f.threshold,
"roc_auc": f.roc_auc,
}
for f in folds
],
}
def _score_split(
y_train: np.ndarray,
y_val: np.ndarray,
y_test: np.ndarray,
p_train: np.ndarray,
p_val: np.ndarray,
p_test: np.ndarray,
*,
fold: int,
min_t: float = 0.05,
max_t: float = 0.95,
step: float = 0.01,
) -> FoldMetrics:
threshold, val_f1 = search_best_threshold(
y_val, p_val, metric="f1_weighted", min_threshold=min_t, max_threshold=max_t, step=step
)
pred_train = predict_with_threshold(p_train, threshold)
pred_test = predict_with_threshold(p_test, threshold)
f1_train = float(f1_score(y_train, pred_train, average="weighted", zero_division=0))
f1_test = float(f1_score(y_test, pred_test, average="weighted", zero_division=0))
gap = abs(f1_train - f1_test)
try:
auc = float(roc_auc_score(y_test, p_test))
except ValueError:
auc = 0.0
return FoldMetrics(
fold=fold,
f1_train=round(f1_train, 4),
f1_test=round(f1_test, 4),
f1_val=round(val_f1, 4),
gap=round(gap, 4),
gap_pp=round(gap * 100, 2),
gap_ok=_gap_ok(gap),
threshold=round(threshold, 4),
roc_auc=round(auc, 4),
)
def _load_data() -> tuple[pd.DataFrame, pd.Series, np.ndarray]:
cfg_data = {
"raw_path": "data/raw/youtoxic_english_1000.csv",
"processed_preprocessed": "data/processed/v2/comments_preprocessed.csv",
"processed_stats": "data/processed/v2/comments_with_stats.csv",
"features_config": "configs/features.yaml",
}
df = load_dual_track_data(
PROJECT_ROOT / cfg_data["raw_path"],
processed_preprocessed=cfg_data["processed_preprocessed"],
processed_stats=cfg_data["processed_stats"],
target="IsToxic",
text_column="Text",
project_root=PROJECT_ROOT,
write_preprocessed_if_missing=False,
)
y = df["IsToxic"].astype(int)
texts = df["Text"].astype(str).values
return df, y, texts
def _extended_meta(df: pd.DataFrame) -> pd.DataFrame:
text = df["Text"].fillna("").astype(str)
base = extract_metadata_features(df, text_column="Text")
emoji_pat = re.compile(
"["
"\U0001f300-\U0001f9ff"
"\U0001f600-\U0001f64f"
"]+",
flags=re.UNICODE,
)
length = text.str.len().clip(lower=1)
base = base.copy()
base["emoji_count"] = text.apply(lambda s: len(emoji_pat.findall(s)))
base["punctuation_density"] = text.str.count(r"[^\w\s]") / length
return base.astype(float)
def _load_frozen_model(device: torch.device):
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_ID)
for p in model.parameters():
p.requires_grad = False
model.eval()
model.to(device)
return model, tokenizer
def _predict_probs(
model,
tokenizer,
texts: list[str],
*,
max_length: int = 128,
batch_size: int = 16,
) -> np.ndarray:
device = next(model.parameters()).device
probs: list[float] = []
model.eval()
with torch.no_grad():
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
enc = tokenizer(
batch,
truncation=True,
max_length=max_length,
padding=True,
return_tensors="pt",
)
enc = {k: v.to(device) for k, v in enc.items()}
logits = model(**enc).logits
probs.extend(logits_to_toxic_prob(logits).tolist())
return np.array(probs, dtype=float)
def _extract_cls_features(
model,
tokenizer,
texts: list[str],
*,
max_length: int = 128,
batch_size: int = 16,
) -> np.ndarray:
device = next(model.parameters()).device
rows: list[np.ndarray] = []
model.eval()
with torch.no_grad():
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
enc = tokenizer(
batch,
truncation=True,
max_length=max_length,
padding=True,
return_tensors="pt",
)
enc = {k: v.to(device) for k, v in enc.items()}
hidden = model.bert(**enc).last_hidden_state[:, 0, :].cpu().numpy()
rows.append(hidden)
return np.vstack(rows)
def _ensure_global_augment_cache(texts: np.ndarray, y: np.ndarray, cache_path: Path) -> None:
"""Build DE/FR/ES toxic augmentations once for the full dataset (cached)."""
if cache_path.exists():
return
toxic_idx = np.where(y == 1)[0]
ref_texts = texts[toxic_idx].tolist()
ref_labels = y[toxic_idx].tolist()
all_syn_t: list[str] = []
all_syn_l: list[int] = []
all_src: list[int] = []
for pivot in PIVOTS:
logger.info(f"Global augment — pivot={pivot} ({len(ref_texts)} toxic)")
syn_t, syn_l = toxic_back_translation(
ref_texts,
ref_labels,
pivot_lang=pivot,
rate_limit_every=40,
rate_limit_sleep_sec=0.5,
seed=RANDOM_STATE,
)
for t, lab in zip(syn_t, syn_l, strict=False):
all_syn_t.append(t)
all_syn_l.append(int(lab))
all_src.append(-1)
if all_syn_t:
all_syn_t, all_syn_l = deduplicate_by_cosine(
all_syn_t,
all_syn_l,
ref_texts,
threshold=0.95,
)
all_src = all_src[: len(all_syn_t)]
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_text(
json.dumps({"texts": all_syn_t, "labels": all_syn_l, "reference_size": len(ref_texts)})
)
def _augmented_train_set(
texts: np.ndarray,
y: np.ndarray,
train_idx: np.ndarray,
cache_path: Path,
) -> tuple[list[str], list[int]]:
"""Original train fold + global synthetic toxic samples (shared pool)."""
_ensure_global_augment_cache(texts, y, cache_path)
cached = json.loads(cache_path.read_text())
tr_texts = texts[train_idx].tolist()
tr_labels = y[train_idx].tolist()
syn_t = cached.get("texts", [])
syn_l = cached.get("labels", [])
return tr_texts + syn_t, tr_labels + [int(v) for v in syn_l]
def _train_head_only_fold(
train_texts: list[str],
train_labels: list[int],
val_texts: list[str],
val_labels: list[int],
output_dir: Path,
*,
seed: int,
max_epochs: int = 4,
) -> tuple:
set_seed(seed)
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
model = AutoModelForSequenceClassification.from_pretrained(
MODEL_ID, num_labels=2, ignore_mismatched_sizes=True
)
model.config.problem_type = "single_label_classification"
freeze_head_only(model)
def _tok(batch):
return tokenizer(batch["text"], truncation=True, max_length=128)
def _prep(texts, labels):
ds = Dataset.from_dict({"text": texts, "label": labels})
tok = ds.map(_tok, batched=True)
drop = [c for c in tok.column_names if c not in ("input_ids", "attention_mask", "label")]
if drop:
tok = tok.remove_columns(drop)
tok.set_format("torch")
return tok
tok_train = _prep(train_texts, train_labels)
tok_val = _prep(val_texts, val_labels)
args = TrainingArguments(
output_dir=str(output_dir),
learning_rate=2e-5,
num_train_epochs=max_epochs,
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
eval_strategy="epoch",
save_strategy="no",
logging_steps=50,
report_to="none",
seed=seed,
)
trainer = Trainer(
model=model,
args=args,
train_dataset=tok_train,
eval_dataset=tok_val,
data_collator=DataCollatorWithPadding(tokenizer),
compute_metrics=compute_hf_metrics,
)
trainer.train()
return model, tokenizer
def run_experiment_1(df: pd.DataFrame, texts: np.ndarray, y: np.ndarray) -> dict:
logger.info("=" * 60)
logger.info("Experiment 1 — Multi-pivot back-translation + head-only")
logger.info("=" * 60)
cache = ARTIFACT_DIR / "aug_multi_pivot_global.json"
skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=RANDOM_STATE)
folds: list[FoldMetrics] = []
for fold, (train_idx, test_idx) in enumerate(skf.split(texts, y)):
inner_idx, val_idx = train_test_split(
train_idx,
test_size=0.15,
random_state=RANDOM_STATE + fold,
stratify=y[train_idx],
)
tr_texts, tr_labels = _augmented_train_set(texts, y, inner_idx, cache)
va_texts = texts[val_idx].tolist()
te_texts = texts[test_idx].tolist()
out = ARTIFACT_DIR / f"exp1_head_fold{fold}"
model, tokenizer = _train_head_only_fold(
tr_texts, tr_labels, va_texts, y[val_idx].tolist(), out, seed=RANDOM_STATE + fold
)
p_train = _predict_probs(model, tokenizer, tr_texts)
p_val = _predict_probs(model, tokenizer, va_texts)
p_test = _predict_probs(model, tokenizer, te_texts)
folds.append(
_score_split(
np.asarray(tr_labels),
y[val_idx],
y[test_idx],
p_train,
p_val,
p_test,
fold=fold,
)
)
logger.info(f" Fold {fold}: F1_test={folds[-1].f1_test} gap_pp={folds[-1].gap_pp}")
summary = _summarize_folds(folds)
summary["experiment"] = "exp1_multi_pivot_head"
summary["status"] = "PASS" if summary["all_gap_ok"] and summary["target_f1_hit"] else "PARTIAL"
return summary
def run_experiment_2(texts: np.ndarray, y: np.ndarray) -> dict:
logger.info("=" * 60)
logger.info("Experiment 2 — Advanced TTA (frozen golden baseline)")
logger.info("=" * 60)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model, tokenizer = _load_frozen_model(device)
skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=RANDOM_STATE)
folds: list[FoldMetrics] = []
w0, w1, w2 = TTA_WEIGHTS
for fold, (train_idx, test_idx) in enumerate(skf.split(texts, y)):
inner_idx, val_idx = train_test_split(
train_idx,
test_size=0.15,
random_state=RANDOM_STATE + fold,
stratify=y[train_idx],
)
tr_list = texts[inner_idx].tolist()
va_list = texts[val_idx].tolist()
te_list = texts[test_idx].tolist()
p_tr0 = _predict_probs(model, tokenizer, tr_list)
p_va0 = _predict_probs(model, tokenizer, va_list)
p_te0 = _predict_probs(model, tokenizer, te_list)
tr_de = back_translate_texts(tr_list, pivot_lang="de", rate_limit_every=40, rate_limit_sleep_sec=0.3)
va_de = back_translate_texts(va_list, pivot_lang="de", rate_limit_every=40, rate_limit_sleep_sec=0.3)
te_de = back_translate_texts(te_list, pivot_lang="de", rate_limit_every=40, rate_limit_sleep_sec=0.3)
p_tr1 = _predict_probs(model, tokenizer, tr_de)
p_va1 = _predict_probs(model, tokenizer, va_de)
p_te1 = _predict_probs(model, tokenizer, te_de)
tr_fr = back_translate_texts(tr_list, pivot_lang="fr", rate_limit_every=40, rate_limit_sleep_sec=0.3)
va_fr = back_translate_texts(va_list, pivot_lang="fr", rate_limit_every=40, rate_limit_sleep_sec=0.3)
te_fr = back_translate_texts(te_list, pivot_lang="fr", rate_limit_every=40, rate_limit_sleep_sec=0.3)
p_tr2 = _predict_probs(model, tokenizer, tr_fr)
p_va2 = _predict_probs(model, tokenizer, va_fr)
p_te2 = _predict_probs(model, tokenizer, te_fr)
p_train = w0 * p_tr0 + w1 * p_tr1 + w2 * p_tr2
p_val = w0 * p_va0 + w1 * p_va1 + w2 * p_va2
p_test = w0 * p_te0 + w1 * p_te1 + w2 * p_te2
folds.append(
_score_split(
y[inner_idx], y[val_idx], y[test_idx], p_train, p_val, p_test, fold=fold
)
)
logger.info(f" Fold {fold}: F1_test={folds[-1].f1_test} gap_pp={folds[-1].gap_pp}")
summary = _summarize_folds(folds)
summary["experiment"] = "exp2_advanced_tta"
summary["tta_weights"] = list(TTA_WEIGHTS)
summary["status"] = "PASS" if summary["all_gap_ok"] and summary["target_f1_hit"] else "PARTIAL"
return summary
def run_experiment_3(df: pd.DataFrame, texts: np.ndarray, y: np.ndarray) -> dict:
logger.info("=" * 60)
logger.info("Experiment 3 — Meta-feature stacking (CLS + style meta)")
logger.info("=" * 60)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model, tokenizer = _load_frozen_model(device)
meta_all = _extended_meta(df).values
skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=RANDOM_STATE)
folds: list[FoldMetrics] = []
for fold, (train_idx, test_idx) in enumerate(skf.split(texts, y)):
inner_idx, val_idx = train_test_split(
train_idx,
test_size=0.15,
random_state=RANDOM_STATE + fold,
stratify=y[train_idx],
)
cls_train = _extract_cls_features(model, tokenizer, texts[inner_idx].tolist())
cls_val = _extract_cls_features(model, tokenizer, texts[val_idx].tolist())
cls_test = _extract_cls_features(model, tokenizer, texts[test_idx].tolist())
X_train = np.hstack([cls_train, meta_all[inner_idx]])
X_val = np.hstack([cls_val, meta_all[val_idx]])
X_test = np.hstack([cls_test, meta_all[test_idx]])
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_val_s = scaler.transform(X_val)
X_test_s = scaler.transform(X_test)
clf = LogisticRegression(C=0.01, max_iter=3000, class_weight="balanced", solver="lbfgs")
clf.fit(X_train_s, y[inner_idx])
p_train = clf.predict_proba(X_train_s)[:, 1]
p_val = clf.predict_proba(X_val_s)[:, 1]
p_test = clf.predict_proba(X_test_s)[:, 1]
folds.append(
_score_split(
y[inner_idx], y[val_idx], y[test_idx], p_train, p_val, p_test, fold=fold
)
)
logger.info(f" Fold {fold}: F1_test={folds[-1].f1_test} gap_pp={folds[-1].gap_pp}")
summary = _summarize_folds(folds)
summary["experiment"] = "exp3_meta_stacking"
summary["lr_C"] = 0.01
summary["status"] = "PASS" if summary["all_gap_ok"] and summary["target_f1_hit"] else "PARTIAL"
return summary
def _tta_probs(model, tokenizer, tr_list, va_list, te_list) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
w0, w1, w2 = TTA_WEIGHTS
p_tr0 = _predict_probs(model, tokenizer, tr_list)
p_va0 = _predict_probs(model, tokenizer, va_list)
p_te0 = _predict_probs(model, tokenizer, te_list)
p_tr1 = _predict_probs(
model, tokenizer, back_translate_texts(tr_list, pivot_lang="de", rate_limit_every=40, rate_limit_sleep_sec=0.2)
)
p_va1 = _predict_probs(
model, tokenizer, back_translate_texts(va_list, pivot_lang="de", rate_limit_every=40, rate_limit_sleep_sec=0.2)
)
p_te1 = _predict_probs(
model, tokenizer, back_translate_texts(te_list, pivot_lang="de", rate_limit_every=40, rate_limit_sleep_sec=0.2)
)
p_tr2 = _predict_probs(
model, tokenizer, back_translate_texts(tr_list, pivot_lang="fr", rate_limit_every=40, rate_limit_sleep_sec=0.2)
)
p_va2 = _predict_probs(
model, tokenizer, back_translate_texts(va_list, pivot_lang="fr", rate_limit_every=40, rate_limit_sleep_sec=0.2)
)
p_te2 = _predict_probs(
model, tokenizer, back_translate_texts(te_list, pivot_lang="fr", rate_limit_every=40, rate_limit_sleep_sec=0.2)
)
p_train = w0 * p_tr0 + w1 * p_tr1 + w2 * p_tr2
p_val = w0 * p_va0 + w1 * p_va1 + w2 * p_va2
p_test = w0 * p_te0 + w1 * p_te1 + w2 * p_te2
return p_train, p_val, p_test
def _stacking_probs(
model,
tokenizer,
meta_all: np.ndarray,
y: np.ndarray,
inner_idx: np.ndarray,
val_idx: np.ndarray,
test_idx: np.ndarray,
texts: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
scaler = StandardScaler()
X_train = np.hstack(
[_extract_cls_features(model, tokenizer, texts[inner_idx].tolist()), meta_all[inner_idx]]
)
X_val = np.hstack(
[_extract_cls_features(model, tokenizer, texts[val_idx].tolist()), meta_all[val_idx]]
)
X_test = np.hstack(
[_extract_cls_features(model, tokenizer, texts[test_idx].tolist()), meta_all[test_idx]]
)
X_train_s = scaler.fit_transform(X_train)
clf = LogisticRegression(C=0.01, max_iter=3000, class_weight="balanced", solver="lbfgs")
clf.fit(X_train_s, y[inner_idx])
return (
clf.predict_proba(X_train_s)[:, 1],
clf.predict_proba(scaler.transform(X_val))[:, 1],
clf.predict_proba(scaler.transform(X_test))[:, 1],
)
def run_experiment_4(
best_key: str,
texts: np.ndarray,
y: np.ndarray,
df: pd.DataFrame,
) -> dict:
logger.info("=" * 60)
logger.info(f"Experiment 4 — Ultra-fine threshold on best sprint: {best_key}")
logger.info("=" * 60)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
cache = ARTIFACT_DIR / "aug_multi_pivot_global.json"
skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=RANDOM_STATE)
folds: list[FoldMetrics] = []
for fold, (train_idx, test_idx) in enumerate(skf.split(texts, y)):
inner_idx, val_idx = train_test_split(
train_idx,
test_size=0.15,
random_state=RANDOM_STATE + fold,
stratify=y[train_idx],
)
tr_list = texts[inner_idx].tolist()
va_list = texts[val_idx].tolist()
te_list = texts[test_idx].tolist()
if best_key == "exp2_advanced_tta":
model, tokenizer = _load_frozen_model(device)
p_train, p_val, p_test = _tta_probs(model, tokenizer, tr_list, va_list, te_list)
y_train_arr = y[inner_idx]
elif best_key == "exp3_meta_stacking":
model, tokenizer = _load_frozen_model(device)
meta_all = _extended_meta(df).values
p_train, p_val, p_test = _stacking_probs(
model, tokenizer, meta_all, y, inner_idx, val_idx, test_idx, texts
)
y_train_arr = y[inner_idx]
else:
tr_texts, tr_labels = _augmented_train_set(texts, y, inner_idx, cache)
out = ARTIFACT_DIR / f"exp4_head_fold{fold}"
model, tokenizer = _train_head_only_fold(
tr_texts,
tr_labels,
va_list,
y[val_idx].tolist(),
out,
seed=RANDOM_STATE + fold,
)
p_train = _predict_probs(model, tokenizer, tr_texts)
p_val = _predict_probs(model, tokenizer, va_list)
p_test = _predict_probs(model, tokenizer, te_list)
y_train_arr = np.asarray(tr_labels)
folds.append(
_score_split(
y_train_arr,
y[val_idx],
y[test_idx],
p_train,
p_val,
p_test,
fold=fold,
min_t=0.05,
max_t=0.30,
step=0.001,
)
)
logger.info(
f" Fold {fold}: F1_test={folds[-1].f1_test} t={folds[-1].threshold} gap_pp={folds[-1].gap_pp}"
)
summary = _summarize_folds(folds)
summary["experiment"] = "exp4_ultra_fine_threshold"
summary["base_experiment"] = best_key
summary["threshold_range"] = [0.05, 0.30, 0.001]
summary["status"] = "PASS" if summary["all_gap_ok"] and summary["target_f1_hit"] else "PARTIAL"
return summary
def run_golden_baseline_cv(texts: np.ndarray, y: np.ndarray) -> dict:
logger.info("Golden Baseline reference (5-fold CV, frozen)")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model, tokenizer = _load_frozen_model(device)
skf = StratifiedKFold(n_splits=N_FOLDS, shuffle=True, random_state=RANDOM_STATE)
folds: list[FoldMetrics] = []
for fold, (train_idx, test_idx) in enumerate(skf.split(texts, y)):
inner_idx, val_idx = train_test_split(
train_idx, test_size=0.15, random_state=RANDOM_STATE + fold, stratify=y[train_idx]
)
p_train = _predict_probs(model, tokenizer, texts[inner_idx].tolist())
p_val = _predict_probs(model, tokenizer, texts[val_idx].tolist())
p_test = _predict_probs(model, tokenizer, texts[test_idx].tolist())
folds.append(
_score_split(y[inner_idx], y[val_idx], y[test_idx], p_train, p_val, p_test, fold=fold)
)
summary = _summarize_folds(folds)
summary["experiment"] = "golden_baseline_cv"
return summary
def main() -> None:
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)
REPORT_DIR.mkdir(parents=True, exist_ok=True)
set_seed(RANDOM_STATE)
df, y_series, texts = _load_data()
y = y_series.values
results = {
"target_f1_weighted": TARGET_F1,
"max_gap_pp": MAX_GAP * 100,
"n_folds": N_FOLDS,
"golden_baseline_cv": run_golden_baseline_cv(texts, y),
}
results["exp2"] = run_experiment_2(texts, y)
results["exp3"] = run_experiment_3(df, texts, y)
results["exp1"] = run_experiment_1(df, texts, y)
candidates = {
"exp1_multi_pivot_head": results["exp1"]["f1_test_mean"],
"exp2_advanced_tta": results["exp2"]["f1_test_mean"],
"exp3_meta_stacking": results["exp3"]["f1_test_mean"],
}
best_key = max(candidates, key=candidates.get)
results["best_experiment"] = best_key
results["exp4"] = run_experiment_4(best_key, texts, y, df)
comparison = []
for label, key in [
("Golden Baseline (CV)", "golden_baseline_cv"),
("Exp1 Multi-Pivot + Head", "exp1"),
("Exp2 Advanced TTA", "exp2"),
("Exp3 Meta Stacking", "exp3"),
("Exp4 Ultra-Fine Thresh", "exp4"),
]:
block = results[key]
comparison.append(
{
"sprint": label,
"f1_test_mean": block["f1_test_mean"],
"f1_test_std": block["f1_test_std"],
"gap_pp_mean": block["gap_pp_mean"],
"gap_pp_max": block["gap_pp_max"],
"all_gap_ok": block["all_gap_ok"],
"f1_target_hit": block["target_f1_hit"],
"status": "PASS" if block["all_gap_ok"] and block["target_f1_hit"] else (
"FAIL_GAP" if not block["all_gap_ok"] else "FAIL_F1"
),
}
)
results["comparison_table"] = comparison
out_json = REPORT_DIR / "sprint_results.json"
out_json.write_text(json.dumps(results, indent=2))
logger.info(f"Saved {out_json}")
lines = [
"# Notebook 13 — Sprint Comparison",
"",
"| Sprint | Mean F1 (test) | Gap pp (mean) | Gap OK | F1 ≥ 0.80 | Status |",
"|--------|----------------|---------------|--------|-----------|--------|",
]
for row in comparison:
lines.append(
f"| {row['sprint']} | {row['f1_test_mean']:.4f} ± {row['f1_test_std']:.4f} | "
f"{row['gap_pp_mean']:.2f} | {'✅' if row['all_gap_ok'] else '❌'} | "
f"{'✅' if row['f1_target_hit'] else '❌'} | {row['status']} |"
)
(REPORT_DIR / "comparison_table.md").write_text("\n".join(lines))
logger.info("Done.")
if __name__ == "__main__":
main()