v2 / scripts /13_analyze_and_report.py
JulianHJR's picture
Upload folder using huggingface_hub
e53f10b verified
"""
Final analysis: aggregate sweep log, plot curves, produce Markdown report.
"""
import sys
import argparse
import json
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
import numpy as np
from configs.paths import (
ensure_dirs, LOGS_DIR, RESULTS_DIR,
SWEEP_RR_JSON, SWEEP_PQS_JSON, SWEEP_CURVES_FIG,
DOWNSTREAM_ACC_JSON, FINAL_REPORT,
)
from configs.model import ALPHA_SWEEP, DIRECTION_VERSIONS
from src.utils import setup_logger, read_jsonl, read_json, write_json
SWEEP_LOG = RESULTS_DIR / "sweep_log.jsonl"
def aggregate_sweep(records):
"""
Aggregate per-record sweep log into summary dicts:
{dim: {version: {alpha: {"rr_mean": ..., "pqs_mean": ..., "collapse": ..., "n": ...}}}}
Note: alpha is stored as a string key. force_* versions have alpha key 'None'
(skipped in monotonicity / sort-by-alpha analysis but still listed in tables).
"""
groups = {}
for r in records:
dim = r.get("dim")
ver = r.get("version")
alpha = r.get("alpha")
if dim is None or ver is None:
continue
# Use a canonical key. For force_* (alpha=None) we still group it.
a_key = "None" if alpha is None else str(alpha)
groups.setdefault(dim, {}).setdefault(ver, {}).setdefault(a_key, []).append(r)
agg = {}
for dim, vs in groups.items():
agg[dim] = {}
for ver, alphas in vs.items():
agg[dim][ver] = {}
for a, recs in alphas.items():
rrs = [r["rr"] for r in recs if r.get("rr") is not None]
pqs_steered = [r["steered_pqs"] for r in recs if r.get("steered_pqs") is not None]
pqs_base = [r["base_pqs"] for r in recs if r.get("base_pqs") is not None]
collapses = [r["collapsed"] for r in recs]
plan_cnt_s = [r["steered_plan"] for r in recs]
mon_cnt_s = [r["steered_mon"] for r in recs]
agg[dim][ver][a] = {
"n": len(recs),
"rr_mean": float(np.mean(rrs)) if rrs else 0.0,
"rr_std": float(np.std(rrs)) if rrs else 0.0,
"pqs_steered_mean": float(np.mean(pqs_steered)) if pqs_steered else 0.0,
"pqs_base_mean": float(np.mean(pqs_base)) if pqs_base else 0.0,
"pqs_delta_mean": (float(np.mean(pqs_steered) - np.mean(pqs_base))
if pqs_steered and pqs_base else 0.0),
"collapse_rate": float(np.mean(collapses)) if collapses else 0.0,
"steered_plan_mean": float(np.mean(plan_cnt_s)) if plan_cnt_s else 0.0,
"steered_mon_mean": float(np.mean(mon_cnt_s)) if mon_cnt_s else 0.0,
}
return agg
def _numeric_alphas(per_a):
"""Return sorted alpha-keys that can be parsed as floats."""
out = []
for a in per_a.keys():
try:
out.append((a, float(a)))
except (ValueError, TypeError):
pass
out.sort(key=lambda x: x[1])
return [a for a, _ in out]
def plot_sweep_curves(agg, save_path):
"""Plot alpha vs RR and alpha vs PQS-delta for each (dim, version).
Under NEW SEMANTICS:
- alpha=1.0 is the baseline (no steering); RR and ΔPQS should be ≈ 0 there.
- Lower alpha = stronger suppression. Successful steering produces:
RR going UP as alpha goes DOWN (negative correlation).
ΔPQS going DOWN as alpha goes DOWN (planning quality drops).
- alpha > 1 = amplification (overthinker direction).
"""
import matplotlib.pyplot as plt
dims = sorted(agg.keys())
n = len(dims)
fig, axes = plt.subplots(2, n, figsize=(7 * n, 10))
if n == 1:
axes = axes.reshape(2, 1)
colors = {"v1_raw": "#1f77b4", "v_pca_subspace": "#d62728"}
for ci, dim in enumerate(dims):
ax_rr = axes[0, ci]
ax_pqs = axes[1, ci]
for ver in agg[dim]:
if ver.startswith("force_"):
continue # force-prompt has alpha=None, plot separately
color = colors.get(ver, "gray")
sorted_alphas = _numeric_alphas(agg[dim][ver])
xs = [float(a) for a in sorted_alphas]
rrs = [agg[dim][ver][a]["rr_mean"] for a in sorted_alphas]
pqs_deltas = [agg[dim][ver][a]["pqs_delta_mean"] for a in sorted_alphas]
ax_rr.plot(xs, rrs, "o-", label=ver, color=color)
ax_pqs.plot(xs, pqs_deltas, "o-", label=ver, color=color)
# Mark baseline at alpha=1 (NEW SEMANTICS)
ax_rr.axvline(1.0, color="black", linestyle="--", alpha=0.5, label="baseline (α=1)")
ax_pqs.axvline(1.0, color="black", linestyle="--", alpha=0.5)
ax_rr.axhline(0, color="gray", linestyle=":")
ax_pqs.axhline(0, color="gray", linestyle=":")
ax_rr.set_xlabel("α (1=baseline, 0=full suppress, >1=amplify)")
ax_rr.set_ylabel("RR (reduction rate)")
ax_rr.set_title(f"{dim} — trigger count change\n(higher RR at α<1 = good)")
ax_rr.legend(fontsize=8)
ax_rr.grid(alpha=0.3)
ax_pqs.set_xlabel("α (1=baseline, 0=full suppress, >1=amplify)")
ax_pqs.set_ylabel("Δ PQS (steered − base)")
ax_pqs.set_title(f"{dim} — planning quality change\n(negative ΔPQS at α<1 = capability loss)")
ax_pqs.legend(fontsize=8)
ax_pqs.grid(alpha=0.3)
plt.tight_layout()
plt.savefig(save_path, dpi=120)
plt.close()
def spearman_monotonic(xs, ys):
from scipy.stats import spearmanr
if len(xs) < 3:
return 0.0
rho, _ = spearmanr(xs, ys)
if rho is None or np.isnan(rho):
return 0.0
return float(rho)
def generate_report(agg, downstream, interaction_summary):
lines = []
lines.append("# Student Simulation — Final Report\n")
lines.append("## 1. Overview\n")
lines.append(f"Model: Qwen3-30B-A3B-Thinking-2507\n")
# Interaction
if interaction_summary:
lines.append("## 2. Dimension Interaction\n")
lines.append(f"- Jaccard overlap of top-K experts: **{interaction_summary.get('jaccard_overlap', 0):.3f}**")
lines.append(f"- # PMI pairs (same-layer plan-mon): {interaction_summary.get('n_pmi_pairs', 0)}")
if "pmi_stats" in interaction_summary:
s = interaction_summary["pmi_stats"]
lines.append(f"- PMI mean: {s['mean']:+.3f} (positive = experts tend to co-activate)")
lines.append("")
# Sweep results
lines.append("## 3. Steering Sweep Results\n")
lines.append("> **NEW SEMANTICS**: α=1.0 is baseline (no steering). "
"Lower α = stronger suppression. α>1 = amplification.\n")
lines.append("> A WORKING steering shows: max RR at LOW α (e.g. α=0), "
"and Spearman ρ(α, RR) **negative** (≤ -0.5).\n")
for dim in sorted(agg.keys()):
lines.append(f"### 3.{1 if dim == 'planning' else 2} {dim.capitalize()}\n")
lines.append("| version | best α (max RR) | max RR | ΔPQS at best α | Spearman ρ(α, RR) | monotonic? | collapse% |")
lines.append("|---|---|---|---|---|---|---|")
for ver in sorted(agg[dim].keys()):
if ver.startswith("force_"):
continue
per_a = agg[dim][ver]
sorted_a = _numeric_alphas(per_a)
if not sorted_a:
continue
xs = [float(a) for a in sorted_a]
rrs = [per_a[a]["rr_mean"] for a in sorted_a]
pqss = [per_a[a]["pqs_delta_mean"] for a in sorted_a]
best_i = int(np.argmax(rrs))
best_a = xs[best_i]
rho = spearman_monotonic(xs, rrs)
# Under new semantics, working steering produces NEGATIVE Spearman
is_monotonic = rho <= -0.5
collapse = per_a[sorted_a[best_i]]["collapse_rate"]
lines.append(f"| {ver} | {best_a:+.2f} | {rrs[best_i]:+.3f} | "
f"{pqss[best_i]:+.3f} | {rho:+.3f} | "
f"{'YES' if is_monotonic else 'no'} | {collapse:.1%} |")
# Force prompt
force_versions = [v for v in agg[dim] if v.startswith("force_")]
if force_versions:
lines.append("\n**Force-prompt baseline**:")
for v in force_versions:
# force has alpha=None
per_a = agg[dim][v]
keys = list(per_a.keys())
if keys:
r = per_a[keys[0]]
lines.append(f"- {v}: RR={r['rr_mean']:+.3f}, ΔPQS={r['pqs_delta_mean']:+.3f}")
lines.append("")
# Diagnosis table (PQS × RR)
lines.append("## 4. Diagnosis: RR vs PQS (Planning, v_pca_subspace)\n")
lines.append("> Under NEW semantics: α=1 is baseline. Steering effects "
"should appear as α decreases below 1.\n")
if "planning" in agg and "v_pca_subspace" in agg["planning"]:
per_a = agg["planning"]["v_pca_subspace"]
lines.append("| α | meaning | RR | ΔPQS | Interpretation |")
lines.append("|---|---|---|---|---|")
for a in _numeric_alphas(per_a):
r = per_a[a]
rr = r["rr_mean"]
dp = r["pqs_delta_mean"]
af = float(a)
if abs(af - 1.0) < 1e-6:
meaning = "baseline"
elif af < 1.0 and af >= 0.0:
meaning = f"{(1.0-af)*100:.0f}% suppression"
elif af < 0.0:
meaning = "over-suppression"
else:
meaning = "amplification"
# Diagnosis: only meaningful for non-baseline
if abs(af - 1.0) < 1e-6:
interp = "(reference)"
elif rr > 0.2 and dp < -0.05:
interp = "✅ capability suppression"
elif rr > 0.2 and abs(dp) < 0.05:
interp = "⚠️ surface-only (RR drops but PQS unchanged)"
elif abs(rr) < 0.1:
interp = "— no effect"
else:
interp = "?"
lines.append(f"| {af:+.2f} | {meaning} | {rr:+.3f} | {dp:+.3f} | {interp} |")
lines.append("")
# Downstream
if downstream:
lines.append("## 5. Downstream Accuracy\n")
ts_names = list(downstream.get("baseline", {}).keys())
# 5a: Raw accuracies (pivot view)
lines.append("### 5.1 Raw accuracies\n")
lines.append("| config | " + " | ".join(ts_names) + " |")
lines.append("|---|" + "---|" * len(ts_names))
for cfg in downstream:
row = [cfg]
for ts in ts_names:
if ts in downstream[cfg]:
a = downstream[cfg][ts]["accuracy"]
row.append(f"{a:.3f}")
else:
row.append("-")
lines.append("| " + " | ".join(row) + " |")
lines.append("")
# 5b: Drop vs baseline (the answer to: "did steering hurt accuracy?")
non_base_cfgs = [c for c in downstream if c != "baseline"]
if non_base_cfgs:
lines.append("### 5.2 Accuracy drop vs baseline\n")
lines.append("> **absolute drop** = baseline_acc − steered_acc (positive = WORSE under steering)\n"
"> **relative drop** = absolute_drop / baseline_acc\n"
"> **McNemar p** = paired-test p-value on per-problem correctness\n"
"> **regr/rec** = #problems where baseline was right→steered wrong / vice versa\n")
lines.append("| config | testset | baseline | steered | Δ abs | Δ rel | regr/rec | McNemar p | sig p<0.05 |")
lines.append("|---|---|---|---|---|---|---|---|---|")
for cfg in non_base_cfgs:
for ts in ts_names:
if ts not in downstream[cfg]:
continue
rec = downstream[cfg][ts]
vb = rec.get("vs_baseline")
if not vb:
continue
p = vb["mcnemar_p_value"]
p_str = f"{p:.3g}" if p is not None else "n/a"
sig = "✅" if vb["significant_at_0_05"] else "—"
lines.append(
f"| {cfg} | {ts} | {vb['baseline_accuracy']:.3f} | "
f"{vb['steered_accuracy']:.3f} | "
f"{vb['absolute_drop']:+.3f} | {vb['relative_drop']:+.1%} | "
f"{vb['n_regressions']}/{vb['n_recoveries']} | "
f"{p_str} | {sig} |"
)
lines.append("")
# Go/No-Go
lines.append("## 6. Go/No-Go Decision\n")
lines.append("> Decision criteria under NEW semantics:\n"
"> - WORKING: max RR > 0.3 AT α < 1 AND Spearman ρ(α, RR) ≤ -0.5\n"
"> - For planning, additionally need ΔPQS < -0.05 at the best α.\n")
def _check_dim_working(dim, require_pqs_drop=False):
"""A dim 'works' if some version shows monotonic RR rise as α drops below 1."""
if dim not in agg:
return False
for ver, per_a in agg[dim].items():
if ver.startswith("force_"):
continue
sorted_a = _numeric_alphas(per_a)
if len(sorted_a) < 3:
continue
xs = [float(a) for a in sorted_a]
rrs = [per_a[a]["rr_mean"] for a in sorted_a]
pqss = [per_a[a]["pqs_delta_mean"] for a in sorted_a]
rho = spearman_monotonic(xs, rrs)
max_rr = max(rrs)
min_pqs = min(pqss)
if max_rr > 0.3 and rho <= -0.5:
if require_pqs_drop and min_pqs >= -0.05:
continue
return True
return False
mon_ok = _check_dim_working("monitoring", require_pqs_drop=False)
plan_ok = _check_dim_working("planning", require_pqs_drop=True)
if mon_ok and plan_ok:
lines.append("**[GO]** Both dimensions show monotonic α-RR sweeps + planning shows PQS decline. "
"Proceed to further student-simulation experiments.")
elif mon_ok and not plan_ok:
lines.append("**[PARTIAL]** Monitoring dimension works (RR responds to α monotonically), "
"but planning either has no RR response or shows surface-only suppression "
"(RR drops without PQS drop). Focus paper on monitoring; investigate why "
"planning is distributed at the residual-stream level.")
elif not mon_ok and plan_ok:
lines.append("**[UNEXPECTED]** Planning works but monitoring does not. "
"Review monitoring regex coverage and decision-point labeling.")
else:
lines.append("**[NO-GO]** Neither dimension shows clean steering. "
"Reconsider methodology or scale.")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser()
args = parser.parse_args()
ensure_dirs()
log = setup_logger("13_analyze", LOGS_DIR / "13_analyze.log")
if not SWEEP_LOG.exists():
log.error(f"No sweep log at {SWEEP_LOG}")
return
records = read_jsonl(SWEEP_LOG)
log.info(f"Loaded {len(records)} sweep records")
agg = aggregate_sweep(records)
# Save aggregated
agg_serializable = {
dim: {ver: {a: v for a, v in alphas.items()} for ver, alphas in vs.items()}
for dim, vs in agg.items()
}
write_json(agg_serializable, SWEEP_RR_JSON)
log.info(f"Saved aggregated sweep: {SWEEP_RR_JSON}")
# Plot
plot_sweep_curves(agg, SWEEP_CURVES_FIG)
log.info(f"Saved curves: {SWEEP_CURVES_FIG}")
# Load downstream (if available)
downstream = {}
if DOWNSTREAM_ACC_JSON.exists():
downstream = read_json(DOWNSTREAM_ACC_JSON)
# Drop per_sample to keep report compact
for cfg in downstream:
for ts in downstream[cfg]:
downstream[cfg][ts].pop("per_sample", None)
# Interaction summary
interaction = {}
ip = RESULTS_DIR / "interaction_summary.json"
if ip.exists():
interaction = read_json(ip)
# Generate report
report = generate_report(agg, downstream, interaction)
FINAL_REPORT.write_text(report, encoding="utf-8")
log.info(f"Saved final report: {FINAL_REPORT}")
if __name__ == "__main__":
main()