v121rc_exp1 / plot_results_chatgptpro.py
Linksome's picture
Add files using upload-large-folder tool
ac94d57 verified
#!/usr/bin/env python3
"""
plot_v121rc_exp1.py
End-to-end plotting for the v121rc_exp1 experiment.
It scans /workspace/v121rc_exp1/{A..I} for *_results.json files produced by runX.py,
computes accuracy / validity / format metrics per checkpoint step, and generates
a comprehensive set of publication-ready figures to answer:
1) Training prompt information entropy (HNO1 vs HNO2 vs HNO3)
2) Evaluation set hardness (P1..P5, R1..R3, A1..A4)
3) Training label context abundance/clarity (0-shot vs CoT vs Fake CoT)
4) Training duration/optimization steps (learning curves + scaling-law-style fits)
Usage:
python plot_v121rc_exp1.py \
--root /workspace/v121rc_exp1 \
--outdir /workspace/v121rc_exp1/FIGURES \
--export_csv
Important:
- If you have run cross-evaluations (i.e., a config directory contains results for multiple
eval_source_tag / eval_hno), the script *by default* filters to the "train-aligned"
evaluation sets:
eval_hno == train_hno and eval_source_tag matches the config's training variant.
This matches your RUNME.sh pattern (evaluate each model under its own matching eval set).
If you want to include all eval files, pass --use_all_eval_files.
Notes:
- The script is robust to missing/incomplete files; it will skip what it can't parse.
- It never assumes a fixed set of checkpoints; it infers step_* keys in each file.
"""
from __future__ import annotations
import argparse
import json
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
import numpy as np
import pandas as pd
# Headless-safe plotting
import matplotlib
matplotlib.use("Agg") # noqa: E402
import matplotlib.pyplot as plt # noqa: E402
# ----------------------------
# Experiment metadata (edit if you add more configs)
# ----------------------------
CONFIG_META: Dict[str, Dict[str, str]] = {
"A": {"hno": "HNO3", "variant": "0-shot"},
"B": {"hno": "HNO3", "variant": "CoT"},
"C": {"hno": "HNO3", "variant": "Fake CoT"},
"D": {"hno": "HNO2", "variant": "0-shot"},
"E": {"hno": "HNO2", "variant": "CoT"},
"F": {"hno": "HNO2", "variant": "Fake CoT"},
"G": {"hno": "HNO1", "variant": "0-shot"},
"H": {"hno": "HNO1", "variant": "CoT"},
"I": {"hno": "HNO1", "variant": "Fake CoT"},
}
FAMILY_ORDER = ["base", "paraphrase", "reverse", "aggregate"]
FAMILY_TO_NICE = {
"base": "ID (train-like)",
"paraphrase": "Paraphrase (P avg)",
"reverse": "Reverse (R avg)",
"aggregate": "Aggregate (A avg)",
}
# Eval template parsing: ..._P1, ..._R2, ..._A4
TEMPLATE_SUFFIX_RE = re.compile(r"_(?P<kind>[PRA])(?P<idx>\d+)$")
HNO_RE = re.compile(r"(HNO[123])", re.IGNORECASE)
# ----------------------------
# Variant↔tag mapping (for "train-aligned" filtering)
# ----------------------------
def expected_source_tag(train_variant: str) -> str:
"""
Map training variant to eval_source_tag used by your eval file naming convention.
"""
v = (train_variant or "").strip().lower()
if v in {"0-shot", "0shot", "wo_reasoning"}:
return "wo_reasoning"
if v in {"fake cot", "fake_cot", "fake"}:
return "fake_reasoning"
if v in {"cot", "reasoning"}:
return "reasoning"
return "unknown"
def filter_train_aligned(df: pd.DataFrame) -> pd.DataFrame:
"""
Keep only rows where:
- eval_hno == train_hno
- eval_source_tag matches expected_source_tag(train_variant)
"""
if df.empty:
return df
exp_tag = df["train_variant"].astype(str).map(expected_source_tag)
aligned = (df["eval_hno"].astype(str) == df["train_hno"].astype(str)) & (df["eval_source_tag"].astype(str) == exp_tag)
return df[aligned].copy()
# ----------------------------
# Plot styling (clean & paper-friendly)
# ----------------------------
def set_matplotlib_style() -> None:
plt.rcParams.update(
{
"figure.dpi": 150,
"savefig.dpi": 300,
"font.size": 11,
"axes.titlesize": 13,
"axes.labelsize": 11,
"legend.fontsize": 10,
"xtick.labelsize": 10,
"ytick.labelsize": 10,
"axes.grid": True,
"grid.alpha": 0.25,
"axes.spines.top": False,
"axes.spines.right": False,
"legend.frameon": True,
"legend.framealpha": 0.9,
}
)
def ensure_dir(p: Path) -> None:
p.mkdir(parents=True, exist_ok=True)
def savefig(fig: plt.Figure, path: Path) -> None:
ensure_dir(path.parent)
fig.tight_layout()
fig.savefig(path, bbox_inches="tight")
plt.close(fig)
# ----------------------------
# Data extraction
# ----------------------------
def iter_result_files(root: Path, configs: Optional[List[str]] = None) -> Iterable[Tuple[str, Path]]:
"""
Yield (config_letter, path_to_results_json) for all *_results.json under config dirs.
"""
if configs is None:
configs = sorted([d.name for d in root.iterdir() if d.is_dir() and len(d.name) == 1 and d.name.isalpha()])
for cfg in configs:
cfg_dir = root / cfg
if not cfg_dir.exists():
continue
for p in sorted((cfg_dir / "PandaEval12_2_results").rglob("*_results.json")):
yield cfg, p
@dataclass(frozen=True)
class EvalFileInfo:
eval_stem: str # filename without extension, e.g. HNO3_eval_wo_reasoning_P1
base_stem: str # stem without _P#/_R#/_A#, e.g. HNO3_eval_wo_reasoning
family: str # base/paraphrase/reverse/aggregate
template: str # BASE or P1 etc
hno: str # HNO1/HNO2/HNO3
source_tag: str # wo_reasoning / fake_reasoning / reasoning (inferred)
def parse_eval_file_info(results_path: Path) -> EvalFileInfo:
"""
Parse evaluation filename conventions from the corresponding *_results.json file.
Example results filename:
HNO3_eval_wo_reasoning_P2_results.json
"""
name = results_path.name
if not name.endswith("_results.json"):
raise ValueError(f"Not a results file: {name}")
eval_stem = name[: -len("_results.json")] # strip suffix
# Classify template / family
m = TEMPLATE_SUFFIX_RE.search(eval_stem)
if m:
kind = m.group("kind")
idx = int(m.group("idx"))
template = f"{kind}{idx}"
family = {"P": "paraphrase", "R": "reverse", "A": "aggregate"}[kind]
base_stem = eval_stem[: m.start()]
else:
template = "BASE"
family = "base"
base_stem = eval_stem
# Parse HNO
m2 = HNO_RE.search(base_stem)
hno = m2.group(1).upper() if m2 else "UNKNOWN"
# Infer source tag
s = base_stem.lower()
if "wo_reasoning" in s:
source_tag = "wo_reasoning"
elif "fake_reasoning" in s:
source_tag = "fake_reasoning"
else:
# this corresponds to the CoT-flavored "train.json" -> "eval.json"
source_tag = "reasoning"
return EvalFileInfo(
eval_stem=eval_stem,
base_stem=base_stem,
family=family,
template=template,
hno=hno,
source_tag=source_tag,
)
def safe_json_load(path: Path) -> Optional[Any]:
try:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return None
def infer_steps(entries: List[Dict[str, Any]]) -> List[int]:
steps = set()
for e in entries:
for k in e.keys():
if k.startswith("step_"):
try:
steps.add(int(k.split("_", 1)[1]))
except Exception:
pass
return sorted(steps)
def compute_metrics_for_step(entries: List[Dict[str, Any]], step: int) -> Dict[str, Any]:
"""
Compute accuracy and auxiliary stats for a given checkpoint step.
"""
key = f"step_{step}"
n = len(entries)
if n == 0:
return {
"n": 0,
"accuracy": np.nan,
"accuracy_valid": np.nan,
"valid_rate": np.nan,
"invalid_rate": np.nan,
"strict_binary_rate": np.nan,
"has_reasoning_rate": np.nan,
"has_answer_tag_rate": np.nan,
"mean_output_chars": np.nan,
}
acc = np.zeros(n, dtype=float)
valid = np.zeros(n, dtype=bool)
strict_binary = np.zeros(n, dtype=bool)
has_reasoning = np.zeros(n, dtype=bool)
has_answer_tag = np.zeros(n, dtype=bool)
out_chars = np.zeros(n, dtype=float)
for i, e in enumerate(entries):
v = e.get(key) or {}
label = v.get("label", "")
output = v.get("output", "")
# Be defensive: None -> ""
if output is None:
output = ""
if not isinstance(output, str):
output = str(output)
valid[i] = label in ("Yes", "No")
strict_binary[i] = output.strip() in ("Yes", "No")
has_reasoning[i] = "Reasoning" in output
has_answer_tag[i] = "Answer" in output
try:
acc[i] = float(v.get("accuracy", 0))
except Exception:
acc[i] = 0.0
out_chars[i] = float(len(output))
valid_rate = float(valid.mean())
invalid_rate = float(1.0 - valid_rate)
accuracy = float(acc.mean())
if valid.any():
accuracy_valid = float(acc[valid].mean())
else:
accuracy_valid = float("nan")
return {
"n": int(n),
"accuracy": accuracy,
"accuracy_valid": accuracy_valid,
"valid_rate": valid_rate,
"invalid_rate": invalid_rate,
"strict_binary_rate": float(strict_binary.mean()),
"has_reasoning_rate": float(has_reasoning.mean()),
"has_answer_tag_rate": float(has_answer_tag.mean()),
"mean_output_chars": float(out_chars.mean()),
}
def load_all_metrics(root: Path, configs: Optional[List[str]] = None) -> pd.DataFrame:
"""
Build a tidy DataFrame with one row per:
(config, eval_file, family, template, step)
Columns include accuracy, validity rates, format/compliance metrics.
"""
rows: List[Dict[str, Any]] = []
for cfg, res_path in iter_result_files(root, configs=configs):
info = parse_eval_file_info(res_path)
data = safe_json_load(res_path)
if not isinstance(data, list):
continue
steps = infer_steps(data)
if not steps:
continue
meta = CONFIG_META.get(cfg, {})
train_hno = meta.get("hno", info.hno)
variant = meta.get("variant", "UNKNOWN")
for step in steps:
m = compute_metrics_for_step(data, step)
rows.append(
{
"config": cfg,
"train_hno": train_hno,
"train_variant": variant,
"eval_hno": info.hno,
"eval_source_tag": info.source_tag,
"eval_stem": info.eval_stem,
"eval_base_stem": info.base_stem,
"eval_family": info.family,
"eval_template": info.template,
"step": int(step),
**m,
}
)
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
# Useful sort keys
df["train_hno"] = pd.Categorical(df["train_hno"], categories=["HNO1", "HNO2", "HNO3"], ordered=True)
df["train_variant"] = pd.Categorical(df["train_variant"], categories=["0-shot", "CoT", "Fake CoT"], ordered=True)
df["eval_family"] = pd.Categorical(df["eval_family"], categories=FAMILY_ORDER, ordered=True)
# Template order: BASE first, then P1..P5, R1..R3, A1..A4
def template_sort_key(t: str) -> int:
if t == "BASE":
return 0
m = re.match(r"([PRA])(\d+)$", t)
if not m:
return 10_000
kind, idx = m.group(1), int(m.group(2))
base = {"P": 100, "R": 200, "A": 300}.get(kind, 1000)
return base + idx
df["template_sort"] = df["eval_template"].map(template_sort_key)
df = df.sort_values(["config", "eval_stem", "step"]).reset_index(drop=True)
return df
# ----------------------------
# Derived / aggregated views
# ----------------------------
def compute_family_averages(df: pd.DataFrame) -> pd.DataFrame:
"""
Collapse P1..P5 into paraphrase avg, R1..R3 into reverse avg, A1..A4 into aggregate avg.
Keeps base as-is.
Returns a DataFrame with eval_family_agg in {base, paraphrase, reverse, aggregate}.
"""
if df.empty:
return df
group_cols = ["config", "train_hno", "train_variant", "eval_hno", "eval_source_tag", "step", "eval_family"]
metric_cols = [
"accuracy",
"accuracy_valid",
"valid_rate",
"invalid_rate",
"strict_binary_rate",
"has_reasoning_rate",
"has_answer_tag_rate",
"mean_output_chars",
]
fam_df = (
df.groupby(group_cols, dropna=False)[metric_cols + ["n"]]
.agg({**{c: "mean" for c in metric_cols}, "n": "sum"})
.reset_index()
.rename(columns={"eval_family": "eval_family_agg"})
)
fam_df["eval_family_agg"] = pd.Categorical(fam_df["eval_family_agg"], categories=FAMILY_ORDER, ordered=True)
return fam_df
def select_final_step(df: pd.DataFrame, by: List[str]) -> pd.DataFrame:
"""
For each group defined by 'by', keep the row at the maximum step.
"""
if df.empty:
return df
idx = df.groupby(by)["step"].idxmax()
return df.loc[idx].copy().reset_index(drop=True)
def add_generalization_gaps(fam_df: pd.DataFrame) -> pd.DataFrame:
"""
Compute gaps relative to base (ID) at each step:
gap_paraphrase = acc_base - acc_paraphrase
gap_reverse = acc_base - acc_reverse
gap_aggregate = acc_base - acc_aggregate
Returns wide DataFrame keyed by (config, step).
"""
if fam_df.empty:
return fam_df
key_cols = ["config", "train_hno", "train_variant", "eval_hno", "eval_source_tag", "step"]
pivot = fam_df.pivot_table(index=key_cols, columns="eval_family_agg", values="accuracy", aggfunc="mean")
pivot = pivot.reset_index()
for fam in FAMILY_ORDER:
if fam not in pivot.columns:
pivot[fam] = np.nan
pivot["gap_paraphrase"] = pivot["base"] - pivot["paraphrase"]
pivot["gap_reverse"] = pivot["base"] - pivot["reverse"]
pivot["gap_aggregate"] = pivot["base"] - pivot["aggregate"]
return pivot
# ----------------------------
# Plotting helpers
# ----------------------------
def lineplot(ax: plt.Axes, x: np.ndarray, y: np.ndarray, label: str, marker: str = "o") -> None:
ax.plot(x, y, marker=marker, linewidth=2, markersize=4, label=label)
def plot_learning_curves_by_hno(fam_df: pd.DataFrame, outdir: Path) -> None:
"""
For each entropy (HNO1/2/3): accuracy vs step for variants (0-shot, CoT, Fake CoT).
Now does this for ALL eval families: base/paraphrase/reverse/aggregate.
Produces both accuracy and invalid_rate curves.
"""
ensure_dir(outdir)
if fam_df.empty:
return
for fam in FAMILY_ORDER: # base, paraphrase, reverse, aggregate
df_fam = fam_df[fam_df["eval_family_agg"] == fam].copy()
if df_fam.empty:
continue
for hno in ["HNO1", "HNO2", "HNO3"]:
sub = df_fam[df_fam["train_hno"] == hno]
if sub.empty:
continue
# -----------------
# Accuracy curves
# -----------------
fig, ax = plt.subplots(figsize=(8.0, 4.6))
for variant in ["0-shot", "CoT", "Fake CoT"]:
s2 = sub[sub["train_variant"] == variant].sort_values("step")
if s2.empty:
continue
s3 = s2.groupby("step")["accuracy"].mean().reset_index()
lineplot(ax, s3["step"].to_numpy(), s3["accuracy"].to_numpy(), label=variant)
ax.set_title(f"{hno}: {FAMILY_TO_NICE.get(fam, fam)} accuracy vs optimization steps")
ax.set_xlabel("Optimization step (checkpoint)")
ax.set_ylabel("Accuracy")
ax.set_ylim(0.0, 1.02)
ax.legend(loc="lower right")
savefig(fig, outdir / f"learning_curve_{hno}_{fam}_accuracy.png")
# -----------------
# Invalid rate curves
# -----------------
fig, ax = plt.subplots(figsize=(8.0, 4.6))
for variant in ["0-shot", "CoT", "Fake CoT"]:
s2 = sub[sub["train_variant"] == variant].sort_values("step")
if s2.empty:
continue
s3 = s2.groupby("step")["invalid_rate"].mean().reset_index()
lineplot(ax, s3["step"].to_numpy(), s3["invalid_rate"].to_numpy(), label=variant)
ax.set_title(f"{hno}: {FAMILY_TO_NICE.get(fam, fam)} invalid-output rate vs optimization steps")
ax.set_xlabel("Optimization step (checkpoint)")
ax.set_ylabel("Invalid rate (label not uniquely extracted)")
ax.set_ylim(0.0, 1.02)
ax.legend(loc="upper right")
savefig(fig, outdir / f"learning_curve_{hno}_{fam}_invalid_rate.png")
def short_cfg_label(cfg: str) -> str:
meta = CONFIG_META.get(cfg, {})
hno = str(meta.get("hno", "?"))
var = str(meta.get("variant", "?")).lower()
h = f"H{hno[-1]}" if hno.startswith("HNO") and hno[-1].isdigit() else hno
if "0-shot" in var or "0shot" in var:
v = "0"
elif "fake" in var:
v = "F"
elif "cot" in var or "reason" in var:
v = "C"
else:
v = meta.get("variant", "?")
return f"{cfg}({h},{v})"
def plot_entropy_comparison_by_variant(fam_df: pd.DataFrame, outdir: Path) -> None:
"""
Q1: For each training variant, compare HNO1/2/3 accuracy vs step.
Now does this for ALL eval families: base/paraphrase/reverse/aggregate.
"""
ensure_dir(outdir)
if fam_df.empty:
return
for variant in ["0-shot", "CoT", "Fake CoT"]:
vdf = fam_df[fam_df["train_variant"] == variant].copy()
if vdf.empty:
continue
for fam in FAMILY_ORDER: # base, paraphrase, reverse, aggregate
sub = vdf[vdf["eval_family_agg"] == fam].copy()
if sub.empty:
continue
fig, ax = plt.subplots(figsize=(8.0, 4.6))
for hno in ["HNO1", "HNO2", "HNO3"]:
s2 = sub[sub["train_hno"] == hno].sort_values("step")
if s2.empty:
continue
s3 = s2.groupby("step")["accuracy"].mean().reset_index()
lineplot(ax, s3["step"].to_numpy(), s3["accuracy"].to_numpy(), label=hno)
ax.set_title(f"{variant}: Entropy comparison on {FAMILY_TO_NICE.get(fam, fam)} accuracy")
ax.set_xlabel("Optimization step (checkpoint)")
ax.set_ylabel("Accuracy")
ax.set_ylim(0.0, 1.02)
ax.legend(loc="lower right")
savefig(
fig,
outdir / f"entropy_comparison_{variant.replace(' ', '_')}_{fam}_accuracy.png"
)
def plot_family_curves_per_config(fam_df: pd.DataFrame, outdir: Path) -> None:
"""
For each config A..I: accuracy vs step for each family (base/paraphrase/reverse/aggregate).
Produces one figure per config.
"""
ensure_dir(outdir)
if fam_df.empty:
return
for cfg, sub in fam_df.groupby("config"):
fig, ax = plt.subplots(figsize=(8.2, 4.8))
sub = sub.sort_values(["eval_family_agg", "step"])
for fam in FAMILY_ORDER:
s2 = sub[sub["eval_family_agg"] == fam].sort_values("step")
if s2.empty:
continue
s3 = s2.groupby("step")["accuracy"].mean().reset_index()
lineplot(ax, s3["step"].to_numpy(), s3["accuracy"].to_numpy(), label=FAMILY_TO_NICE.get(fam, fam))
meta = CONFIG_META.get(cfg, {})
ax.set_title(f"Config {cfg} ({meta.get('hno','?')}, {meta.get('variant','?')}): Accuracy by eval family")
ax.set_xlabel("Optimization step (checkpoint)")
ax.set_ylabel("Accuracy")
ax.set_ylim(0.0, 1.02)
ax.legend(loc="lower right")
savefig(fig, outdir / f"config_{cfg}_family_curves_accuracy.png")
def plot_eval_hardness_heatmaps(df: pd.DataFrame, outdir: Path) -> None:
"""
Heatmaps of final-step accuracy and invalid rate:
rows = eval_template (BASE, P1..P5, R1..R3, A1..A4)
cols = configs (A..I)
"""
ensure_dir(outdir)
if df.empty:
return
final = select_final_step(df, by=["config", "eval_stem", "eval_template"])
if final.empty:
return
final2 = final.groupby(["config", "eval_template"], dropna=False)[["accuracy", "invalid_rate"]].mean().reset_index()
final2["template_sort"] = final2["eval_template"].map(
lambda t: (0 if t == "BASE" else (100 if t.startswith("P") else 200 if t.startswith("R") else 300 if t.startswith("A") else 999))
+ (int(t[1:]) if len(t) > 1 and t[1:].isdigit() else 0)
)
final2 = final2.sort_values("template_sort")
templates = final2["eval_template"].unique().tolist()
configs = sorted(final2["config"].unique().tolist())
def heatmap(metric: str, title: str, fname: str) -> None:
pivot = final2.pivot(index="eval_template", columns="config", values=metric).reindex(index=templates, columns=configs)
mat = pivot.to_numpy()
fig, ax = plt.subplots(figsize=(10.5, max(4.5, 0.35 * len(templates))))
im = ax.imshow(mat, aspect="auto")
ax.set_title(title)
ax.set_xlabel("Config")
ax.set_ylabel("Eval template")
# ax.set_xticks(np.arange(len(configs)))
# ax.set_xticklabels([f"{c}\n({CONFIG_META.get(c,{}).get('hno','?')},{CONFIG_META.get(c,{}).get('variant','?')})" for c in configs])
ax.set_xticks(np.arange(len(configs)))
# single-line labels are much easier to read on heatmaps
labels = [
f"{c} ({CONFIG_META.get(c,{}).get('hno','?')},{CONFIG_META.get(c,{}).get('variant','?')})"
for c in configs
]
ax.set_xticklabels(labels, rotation=30, ha="right", rotation_mode="anchor")
ax.tick_params(axis="x", labelsize=8)
ax.set_yticks(np.arange(len(templates)))
ax.set_yticklabels(templates)
cbar = fig.colorbar(im, ax=ax)
cbar.set_label(metric)
for i in range(mat.shape[0]):
for j in range(mat.shape[1]):
v = mat[i, j]
if np.isfinite(v):
ax.text(j, i, f"{v:.2f}", ha="center", va="center", fontsize=8)
savefig(fig, outdir / fname)
heatmap("accuracy", "Final-step accuracy by template and config", "heatmap_final_accuracy_templates_x_configs.png")
heatmap("invalid_rate", "Final-step invalid-output rate by template and config", "heatmap_final_invalid_rate_templates_x_configs.png")
def plot_template_difficulty_bars(df: pd.DataFrame, outdir: Path) -> None:
"""
Bar chart: mean final-step accuracy across configs for each template,
with std-dev error bars across configs.
"""
ensure_dir(outdir)
if df.empty:
return
final = select_final_step(df, by=["config", "eval_stem", "eval_template"])
if final.empty:
return
final2 = final.groupby(["config", "eval_template"], dropna=False)[["accuracy"]].mean().reset_index()
stats = final2.groupby("eval_template")["accuracy"].agg(["mean", "std", "count"]).reset_index()
stats["stderr"] = stats["std"] / np.sqrt(stats["count"].clip(lower=1))
def sort_key(t: str) -> int:
if t == "BASE":
return 0
m = re.match(r"([PRA])(\d+)$", t)
if not m:
return 9999
kind, idx = m.group(1), int(m.group(2))
base = {"P": 100, "R": 200, "A": 300}.get(kind, 1000)
return base + idx
stats["sort"] = stats["eval_template"].map(sort_key)
stats = stats.sort_values(["mean", "sort"]).reset_index(drop=True)
fig, ax = plt.subplots(figsize=(9.5, max(4.0, 0.35 * len(stats))))
y = np.arange(len(stats))
ax.barh(y, stats["mean"].to_numpy(), xerr=stats["stderr"].to_numpy(), capsize=3)
ax.set_yticks(y)
ax.set_yticklabels(stats["eval_template"].tolist())
ax.invert_yaxis()
ax.set_xlabel("Final-step accuracy (mean across configs)")
ax.set_title("Evaluation template hardness (lower accuracy = harder)")
ax.set_xlim(0.0, 1.02)
savefig(fig, outdir / "template_hardness_final_accuracy_barh.png")
def plot_paraphrase_reverse_aggregate_indices(df: pd.DataFrame, outdir: Path) -> None:
"""
For each entropy group (HNO1/2/3), plot final accuracy as a function of:
- paraphrase index (P1..P5)
- reverse index (R1..R3)
- aggregate index (A1..A4)
"""
ensure_dir(outdir)
if df.empty:
return
final = select_final_step(df, by=["config", "eval_stem", "eval_template"])
if final.empty:
return
final = final.groupby(["config", "train_hno", "train_variant", "eval_template"], dropna=False)["accuracy"].mean().reset_index()
def plot_family(kind: str, max_idx: int, title: str, fname: str) -> None:
fam_templates = [f"{kind}{i}" for i in range(1, max_idx + 1)]
sub = final[final["eval_template"].isin(fam_templates)].copy()
if sub.empty:
return
sub["idx"] = sub["eval_template"].str.replace(kind, "", regex=False).astype(int)
for hno in ["HNO1", "HNO2", "HNO3"]:
hsub = sub[sub["train_hno"] == hno]
if hsub.empty:
continue
fig, ax = plt.subplots(figsize=(8.0, 4.6))
for variant in ["0-shot", "CoT", "Fake CoT"]:
vsub = hsub[hsub["train_variant"] == variant]
if vsub.empty:
continue
curve = vsub.groupby("idx")["accuracy"].mean().reindex(range(1, max_idx + 1)).reset_index()
lineplot(ax, curve["idx"].to_numpy(), curve["accuracy"].to_numpy(), label=variant, marker="o")
ax.set_title(f"{hno}: {title}")
ax.set_xlabel(f"{kind}-template index")
ax.set_ylabel("Final-step accuracy")
ax.set_xticks(range(1, max_idx + 1))
ax.set_ylim(0.0, 1.02)
ax.legend(loc="lower right")
savefig(fig, outdir / f"{fname}_{hno}.png")
plot_family("P", 5, "Paraphrase hardness curve (P1..P5)", "paraphrase_hardness_curve")
plot_family("R", 3, "Reverse hardness curve (R1..R3)", "reverse_hardness_curve")
plot_family("A", 4, "Aggregate hardness curve (A1..A4)", "aggregate_hardness_curve")
def plot_generalization_gaps(gaps_df: pd.DataFrame, outdir: Path) -> None:
"""
Generalization gaps vs step: base - {paraphrase, reverse, aggregate}.
"""
ensure_dir(outdir)
if gaps_df.empty:
return
for hno in ["HNO1", "HNO2", "HNO3"]:
sub = gaps_df[gaps_df["train_hno"] == hno].copy()
if sub.empty:
continue
for gap_col, nice in [
("gap_paraphrase", "Gap: ID - Paraphrase"),
("gap_reverse", "Gap: ID - Reverse"),
("gap_aggregate", "Gap: ID - Aggregate"),
]:
fig, ax = plt.subplots(figsize=(8.0, 4.6))
for variant in ["0-shot", "CoT", "Fake CoT"]:
s2 = sub[sub["train_variant"] == variant].sort_values("step")
if s2.empty:
continue
s3 = s2.groupby("step")[gap_col].mean().reset_index()
lineplot(ax, s3["step"].to_numpy(), s3[gap_col].to_numpy(), label=variant)
ax.set_title(f"{hno}: {nice} vs optimization steps")
ax.set_xlabel("Optimization step (checkpoint)")
ax.set_ylabel("Accuracy gap")
ax.axhline(0.0, linewidth=1)
ax.legend(loc="upper right")
savefig(fig, outdir / f"generalization_gap_{gap_col}_{hno}.png")
final = select_final_step(gaps_df, by=["config"])
if not final.empty:
fig, ax = plt.subplots(figsize=(10.0, 4.6))
x = np.arange(len(final))
width = 0.25
ax.bar(x - width, final["gap_paraphrase"].to_numpy(), width=width, label="ID - Paraphrase")
ax.bar(x, final["gap_reverse"].to_numpy(), width=width, label="ID - Reverse")
ax.bar(x + width, final["gap_aggregate"].to_numpy(), width=width, label="ID - Aggregate")
# ax.set_xticks(x)
# ax.set_xticklabels([f"{c}\n({CONFIG_META.get(c,{}).get('hno','?')},{CONFIG_META.get(c,{}).get('variant','?')})" for c in final["config"].tolist()])
ax.set_xticks(x)
labels = [short_cfg_label(c) for c in final["config"].tolist()]
ax.set_xticklabels(labels, rotation=30, ha="right", rotation_mode="anchor")
ax.tick_params(axis="x", labelsize=10)
fig.subplots_adjust(bottom=0.22) # extra room for rotated labels
ax.set_ylabel("Final-step accuracy gap")
ax.set_title("Final-step generalization gaps (ID minus harder eval families)")
ax.axhline(0.0, linewidth=1)
ax.legend(loc="upper right")
savefig(fig, outdir / "generalization_gaps_final_by_config.png")
def fit_power_law(steps: np.ndarray, errors: np.ndarray) -> Tuple[float, float, float]:
"""
Fit a power law: error ≈ a * step^{-b}
Returns (a, b, r2).
"""
eps = 1e-6
steps = steps.astype(float)
errors = np.clip(errors.astype(float), eps, 1.0)
x = np.log10(steps)
y = np.log10(errors)
m, c = np.polyfit(x, y, 1)
y_hat = m * x + c
ss_res = float(np.sum((y - y_hat) ** 2))
ss_tot = float(np.sum((y - y.mean()) ** 2))
r2 = 1.0 - ss_res / ss_tot if ss_tot > 0 else float("nan")
a = 10 ** c
b = -m
return float(a), float(b), float(r2)
def plot_scaling_law(fam_df: pd.DataFrame, outdir: Path) -> pd.DataFrame:
"""
Scaling-law-style analysis:
- For each (config, eval_family_agg), fit error = 1-accuracy as a power law in step.
- Plot log-log error curves per family
- Plot exponent and R² summaries
"""
def short_cfg_label(cfg: str, hno: str, variant: str) -> str:
h = str(hno).strip()
v = str(variant).strip().lower()
hshort = f"H{h[-1]}" if h.startswith("HNO") and h[-1].isdigit() else h
if "0-shot" in v or "0shot" in v:
vshort = "0"
elif "fake" in v:
vshort = "F"
elif "cot" in v or "reason" in v:
vshort = "C"
else:
vshort = variant
return f"{cfg}({hshort},{vshort})"
ensure_dir(outdir)
if fam_df.empty:
return pd.DataFrame()
fit_rows = []
for (cfg, fam), sub in fam_df.groupby(["config", "eval_family_agg"]):
sub = sub.sort_values("step")
if sub["step"].nunique() < 3:
continue
steps = sub["step"].to_numpy(dtype=float)
acc = sub["accuracy"].to_numpy(dtype=float)
err = 1.0 - acc
mask = np.isfinite(err) & np.isfinite(steps) & (steps >= 1000)
if mask.sum() < 3:
continue
a, b, r2 = fit_power_law(steps[mask], err[mask])
fit_rows.append(
{
"config": cfg,
"train_hno": CONFIG_META.get(cfg, {}).get("hno", "UNKNOWN"),
"train_variant": CONFIG_META.get(cfg, {}).get("variant", "UNKNOWN"),
"eval_family": str(fam),
"a": a,
"b": b,
"r2": r2,
}
)
fits = pd.DataFrame(fit_rows)
if fits.empty:
return fits
fit_map = {}
for r in fits.itertuples(index=False):
# r.eval_family is categorical/string; normalize to str
fit_map[(r.config, str(r.eval_family))] = (float(r.a), float(r.b))
for fam in FAMILY_ORDER:
sub = fam_df[fam_df["eval_family_agg"] == fam].copy()
if sub.empty:
continue
fig, ax = plt.subplots(figsize=(8.2, 5.0))
for cfg, csub in sub.groupby("config"):
csub = csub.sort_values("step")
steps = csub["step"].to_numpy(dtype=float)
err = 1.0 - csub["accuracy"].to_numpy(dtype=float)
err = np.clip(err, 1e-6, 1.0)
# ax.plot(steps, err, marker="o", linewidth=2, markersize=4, label=cfg)
line, = ax.plot(
steps,
err,
marker="o",
linestyle="-",
linewidth=2,
markersize=4,
label=cfg,
)
# Overlay fitted power-law line if available: err_hat = a * step^(-b)
ab = fit_map.get((cfg, str(fam)))
if ab is not None:
a, b = ab
# Use a smooth step grid spanning the observed range (>=1000)
smin = max(1000.0, float(np.nanmin(steps)))
smax = float(np.nanmax(steps))
if np.isfinite(smin) and np.isfinite(smax) and smax > smin:
grid = np.logspace(np.log10(smin), np.log10(smax), 100)
err_hat = np.clip(a * (grid ** (-b)), 1e-6, 1.0)
# Match the empirical curve's color (derived from the line we just drew)
ax.plot(
grid,
err_hat,
linestyle="--",
linewidth=1.6,
color=line.get_color(),
)
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_title(f"Scaling view: error vs step (log-log), family = {FAMILY_TO_NICE.get(fam,fam)}")
ax.set_xlabel("Optimization step (log scale)")
ax.set_ylabel("Error = 1 - accuracy (log scale)")
ax.legend(title="Config", loc="upper right", ncol=3)
savefig(fig, outdir / f"scaling_loglog_error_family_{fam}.png")
fits["eval_family"] = pd.Categorical(fits["eval_family"], categories=FAMILY_ORDER, ordered=True)
for fam in FAMILY_ORDER:
s = fits[fits["eval_family"] == fam].sort_values(["train_hno", "train_variant", "config"])
if s.empty:
continue
fig, ax = plt.subplots(figsize=(10.0, 4.6))
x = np.arange(len(s))
ax.bar(x, s["b"].to_numpy())
ax.set_xticks(x)
# ax.set_xticklabels([f"{c}\n({h},{v})" for c, h, v in zip(s["config"], s["train_hno"], s["train_variant"])])
labels = [short_cfg_label(c, h, v) for c, h, v in zip(s["config"], s["train_hno"], s["train_variant"])]
ax.set_xticklabels(labels, rotation=30, ha="right", rotation_mode="anchor")
ax.tick_params(axis="x", labelsize=9)
ax.set_ylabel("Power-law exponent b (higher = faster error decay)")
ax.set_title(f"Fitted scaling exponent by config (family = {FAMILY_TO_NICE.get(fam,fam)})")
savefig(fig, outdir / f"scaling_exponent_b_family_{fam}.png")
for fam in FAMILY_ORDER:
s = fits[fits["eval_family"] == fam].sort_values(["train_hno", "train_variant", "config"])
if s.empty:
continue
fig, ax = plt.subplots(figsize=(10.0, 4.0))
x = np.arange(len(s))
ax.bar(x, s["r2"].to_numpy())
ax.set_xticks(x)
ax.set_xticklabels([f"{c}" for c in s["config"]])
ax.set_ylim(0.0, 1.02)
ax.set_ylabel("R² (log-log fit)")
ax.set_title(f"Power-law fit quality (family = {FAMILY_TO_NICE.get(fam,fam)})")
savefig(fig, outdir / f"scaling_fit_r2_family_{fam}.png")
return fits
def plot_format_compliance(fam_df: pd.DataFrame, outdir: Path) -> None:
"""
Plot output-format metrics vs step on the base (ID) eval family.
"""
ensure_dir(outdir)
if fam_df.empty:
return
base = fam_df[fam_df["eval_family_agg"] == "base"].copy()
if base.empty:
return
for metric, ylabel in [
("strict_binary_rate", "Rate (output exactly 'Yes' or 'No')"),
("has_reasoning_rate", "Rate (output contains 'Reasoning')"),
("has_answer_tag_rate", "Rate (output contains 'Answer')"),
("mean_output_chars", "Mean output length (characters)"),
]:
fig, ax = plt.subplots(figsize=(9.0, 4.8))
for cfg, sub in base.groupby("config"):
sub = sub.sort_values("step")
ax.plot(sub["step"], sub[metric], marker="o", linewidth=2, markersize=4, label=cfg)
ax.set_title(f"Base eval: {metric} vs optimization step (train-aligned)")
ax.set_xlabel("Optimization step (checkpoint)")
ax.set_ylabel(ylabel)
if metric != "mean_output_chars":
ax.set_ylim(0.0, 1.02)
ax.legend(title="Config", loc="best", ncol=3)
savefig(fig, outdir / f"format_{metric}_vs_step_all_configs.png")
def plot_family_summary_bars(fam_df: pd.DataFrame, outdir: Path) -> None:
"""
Final-step grouped bar chart: for each config, accuracy by eval family.
Robustness:
- Uses the final checkpoint *per config* (not per (config,family) group).
- If a family is missing at that final step for a config, it falls back to the latest
available step for that (config,family).
- Avoids "blank plot" when the pivot ends up all-NaN.
"""
ensure_dir(outdir)
if fam_df.empty:
return
# 1) "Final step" per config (what you usually mean by "final checkpoint")
final_step = fam_df.groupby("config", dropna=False)["step"].max().rename("final_step").reset_index()
# Try to take each (config,family) at config's final step
merged = fam_df.merge(final_step, on="config", how="left")
at_final = merged[merged["step"] == merged["final_step"]].copy()
# 2) Fallback: if a (config,family) is missing at final_step, use its latest available step
fallback = select_final_step(fam_df, by=["config", "eval_family_agg"])
have_pairs = set(zip(at_final["config"], at_final["eval_family_agg"]))
need = fallback[~fallback.apply(lambda r: (r["config"], r["eval_family_agg"]) in have_pairs, axis=1)].copy()
final = pd.concat([at_final, need], ignore_index=True)
if final.empty:
return
# Pivot to configs x families
pivot = final.pivot_table(
index="config",
columns="eval_family_agg",
values="accuracy",
aggfunc="mean",
).reindex(columns=FAMILY_ORDER)
# If everything is NaN, that's the "blank" figure symptom
if np.all(~np.isfinite(pivot.to_numpy(dtype=float))):
# Save a diagnostic figure instead of a blank one
fig, ax = plt.subplots(figsize=(11.0, 4.8))
ax.axis("off")
ax.text(
0.01,
0.5,
"No finite accuracy values available for final-step family summary.\n"
"Check that *_results.json contains per-step 'accuracy' fields and that filtering didn't drop everything.",
fontsize=12,
va="center",
)
savefig(fig, outdir / "final_accuracy_by_config_and_family_grouped_bars.png")
return
configs = pivot.index.tolist()
x = np.arange(len(configs))
width = 0.2
fig, ax = plt.subplots(figsize=(11.8, 5.2))
for i, fam in enumerate(FAMILY_ORDER):
y = pivot[fam].to_numpy(dtype=float)
ax.bar(
x + (i - (len(FAMILY_ORDER) - 1) / 2) * width,
y,
width=width,
label=FAMILY_TO_NICE.get(fam, fam),
)
ax.set_xticks(x)
ax.set_xticklabels(
[f"{c}\n({CONFIG_META.get(c,{}).get('hno','?')},{CONFIG_META.get(c,{}).get('variant','?')})" for c in configs],
rotation=30,
ha="right",
)
ax.set_ylabel("Final-step accuracy")
ax.set_ylim(0.0, 1.02)
ax.set_title("Final-step accuracy by config and evaluation family (train-aligned)")
ax.legend(loc="lower right", ncol=2)
savefig(fig, outdir / "final_accuracy_by_config_and_family_grouped_bars.png")
def plot_id_vs_ood_scatter(fam_df: pd.DataFrame, outdir: Path) -> None:
"""
Scatter plots (final checkpoint):
x-axis: ID (base) accuracy
y-axis: {Paraphrase, Reverse, Aggregate} accuracy
This is a compact way to visualize "learning vs performance" and whether improvements
in train-like accuracy translate to harder evaluation performance.
"""
ensure_dir(outdir)
if fam_df.empty:
return
final = select_final_step(fam_df, by=["config", "eval_family_agg"])
if final.empty:
return
pivot = final.pivot(index="config", columns="eval_family_agg", values="accuracy").reindex(columns=FAMILY_ORDER)
if "base" not in pivot.columns:
return
x = pivot["base"].to_numpy(dtype=float)
for fam in ["paraphrase", "reverse", "aggregate"]:
if fam not in pivot.columns:
continue
y = pivot[fam].to_numpy(dtype=float)
fig, ax = plt.subplots(figsize=(6.8, 5.2))
ax.scatter(x, y)
# annotate points with config labels
for i, cfg in enumerate(pivot.index.tolist()):
ax.text(x[i], y[i], f" {cfg}", va="center", fontsize=10)
ax.set_title(f"Final checkpoint: ID vs {FAMILY_TO_NICE.get(fam, fam)}")
ax.set_xlabel("ID accuracy (BASE)")
ax.set_ylabel(f"{FAMILY_TO_NICE.get(fam, fam)} accuracy")
ax.set_xlim(0.0, 1.02)
ax.set_ylim(0.0, 1.02)
ax.plot([0, 1], [0, 1], linewidth=1) # y=x reference
savefig(fig, outdir / f"scatter_final_ID_vs_{fam}.png")
def compute_time_to_fraction_of_final(fam_df: pd.DataFrame, fraction: float = 0.9) -> pd.DataFrame:
"""
For each (config, family): earliest step where accuracy reaches `fraction * final_accuracy`.
"""
rows = []
for (cfg, fam), sub in fam_df.groupby(["config", "eval_family_agg"]):
sub = sub.sort_values("step")
if sub.empty:
continue
final_acc = float(sub.loc[sub["step"].idxmax(), "accuracy"])
target = fraction * final_acc
reached = sub[sub["accuracy"] >= target]
step_reach = int(reached["step"].min()) if not reached.empty else int(sub["step"].max())
rows.append(
{
"config": cfg,
"train_hno": CONFIG_META.get(cfg, {}).get("hno", "UNKNOWN"),
"train_variant": CONFIG_META.get(cfg, {}).get("variant", "UNKNOWN"),
"eval_family": str(fam),
"final_accuracy": final_acc,
"target_fraction": fraction,
"target_accuracy": target,
"step_to_reach": step_reach,
}
)
return pd.DataFrame(rows)
def plot_time_to_fraction(time_df: pd.DataFrame, outdir: Path) -> None:
ensure_dir(outdir)
if time_df.empty:
return
time_df["eval_family"] = pd.Categorical(time_df["eval_family"], categories=FAMILY_ORDER, ordered=True)
frac = float(time_df["target_fraction"].iloc[0]) if "target_fraction" in time_df.columns else 0.9
frac_pct = int(round(frac * 100))
for fam in FAMILY_ORDER:
sub = time_df[time_df["eval_family"] == fam].sort_values(["train_hno", "train_variant", "config"])
if sub.empty:
continue
fig, ax = plt.subplots(figsize=(10.0, 4.5))
x = np.arange(len(sub))
ax.bar(x, sub["step_to_reach"].to_numpy())
ax.set_xticks(x)
ax.set_xticklabels([f"{c}\n({h},{v})" for c, h, v in zip(sub["config"], sub["train_hno"], sub["train_variant"])])
ax.set_ylabel(f"Step to reach {frac_pct}% of final accuracy")
ax.set_title(f"Optimization speed / sample efficiency (family = {FAMILY_TO_NICE.get(fam,fam)})")
savefig(fig, outdir / f"time_to_{frac_pct}pct_family_{fam}.png")
# ----------------------------
# Main
# ----------------------------
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--root", type=str, default="/workspace/v121rc_exp1", help="Root containing A..I result dirs.")
parser.add_argument("--outdir", type=str, default="/workspace/v121rc_exp1/FIGURES", help="Where to save figures.")
parser.add_argument(
"--configs",
type=str,
default="",
help="Comma-separated configs to include (default: auto-discover). Example: A,B,C",
)
parser.add_argument(
"--export_csv",
action="store_true",
help="Also export tidy CSVs used for plotting (recommended).",
)
parser.add_argument(
"--use_all_eval_files",
action="store_true",
help="If set, do NOT filter to train-aligned eval sets (keeps all eval files found).",
)
parser.add_argument(
"--time_fraction",
type=float,
default=0.9,
help="Fraction of final accuracy to measure time-to-threshold (Q4).",
)
args = parser.parse_args()
root = Path(args.root)
outdir = Path(args.outdir)
configs = [c.strip() for c in args.configs.split(",") if c.strip()] or None
set_matplotlib_style()
df_all = load_all_metrics(root, configs=configs)
if df_all.empty:
print("No metrics found. Did you point --root to a directory containing *_results.json files?")
return
# Filter to train-aligned eval sets by default
df = df_all if args.use_all_eval_files else filter_train_aligned(df_all)
if df.empty:
print("No train-aligned metrics found (after filtering).")
print("Try --use_all_eval_files if you intentionally ran cross-evals.")
return
fam_df = compute_family_averages(df)
gaps_df = add_generalization_gaps(fam_df)
# Export CSV snapshots
if args.export_csv:
ensure_dir(outdir)
df_all.to_csv(outdir / "metrics_long_per_template_ALL.csv", index=False)
df.to_csv(outdir / "metrics_long_per_template_TRAIN_ALIGNED.csv", index=False)
fam_df.to_csv(outdir / "metrics_long_family_averages_TRAIN_ALIGNED.csv", index=False)
gaps_df.to_csv(outdir / "metrics_generalization_gaps_TRAIN_ALIGNED.csv", index=False)
final_template = select_final_step(df, by=["config", "eval_stem", "eval_template"])
final_template.to_csv(outdir / "metrics_final_per_template_TRAIN_ALIGNED.csv", index=False)
final_family = select_final_step(fam_df, by=["config", "eval_family_agg"])
final_family.to_csv(outdir / "metrics_final_family_averages_TRAIN_ALIGNED.csv", index=False)
# ----------------------------
# Generate figures
# ----------------------------
plot_learning_curves_by_hno(fam_df, outdir / "Q34_learning_curves_by_hno")
plot_entropy_comparison_by_variant(fam_df, outdir / "Q1_entropy_comparisons_by_variant")
plot_family_curves_per_config(fam_df, outdir / "Q234_family_curves_per_config")
plot_eval_hardness_heatmaps(df, outdir / "Q2_hardness_heatmaps")
plot_template_difficulty_bars(df, outdir / "Q2_template_difficulty")
plot_paraphrase_reverse_aggregate_indices(df, outdir / "Q2_family_indices")
plot_family_summary_bars(fam_df, outdir / "Q23_final_summaries")
plot_generalization_gaps(gaps_df, outdir / "Q24_generalization_gaps")
plot_id_vs_ood_scatter(fam_df, outdir / "Q24_learning_vs_performance")
fits = plot_scaling_law(fam_df, outdir / "Q4_scaling_law")
if args.export_csv and not fits.empty:
fits.to_csv(outdir / "scaling_powerlaw_fits_TRAIN_ALIGNED.csv", index=False)
time_df = compute_time_to_fraction_of_final(fam_df, fraction=args.time_fraction)
if args.export_csv and not time_df.empty:
frac_pct = int(round(args.time_fraction * 100))
time_df.to_csv(outdir / f"time_to_{frac_pct}pct_TRAIN_ALIGNED.csv", index=False)
plot_time_to_fraction(time_df, outdir / "Q4_time_to_threshold")
plot_format_compliance(fam_df, outdir / "Q3_format_compliance")
print(f"Done. Figures saved under: {outdir}")
if __name__ == "__main__":
main()