""" Final analysis: aggregate sweep log, plot curves, produce Markdown report. """ import sys import argparse import json from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) import numpy as np from configs.paths import ( ensure_dirs, LOGS_DIR, RESULTS_DIR, SWEEP_RR_JSON, SWEEP_PQS_JSON, SWEEP_CURVES_FIG, DOWNSTREAM_ACC_JSON, FINAL_REPORT, ) from configs.model import ALPHA_SWEEP, DIRECTION_VERSIONS from src.utils import setup_logger, read_jsonl, read_json, write_json SWEEP_LOG = RESULTS_DIR / "sweep_log.jsonl" def aggregate_sweep(records): """ Aggregate per-record sweep log into summary dicts: {dim: {version: {alpha: {"rr_mean": ..., "pqs_mean": ..., "collapse": ..., "n": ...}}}} Note: alpha is stored as a string key. force_* versions have alpha key 'None' (skipped in monotonicity / sort-by-alpha analysis but still listed in tables). """ groups = {} for r in records: dim = r.get("dim") ver = r.get("version") alpha = r.get("alpha") if dim is None or ver is None: continue # Use a canonical key. For force_* (alpha=None) we still group it. a_key = "None" if alpha is None else str(alpha) groups.setdefault(dim, {}).setdefault(ver, {}).setdefault(a_key, []).append(r) agg = {} for dim, vs in groups.items(): agg[dim] = {} for ver, alphas in vs.items(): agg[dim][ver] = {} for a, recs in alphas.items(): rrs = [r["rr"] for r in recs if r.get("rr") is not None] pqs_steered = [r["steered_pqs"] for r in recs if r.get("steered_pqs") is not None] pqs_base = [r["base_pqs"] for r in recs if r.get("base_pqs") is not None] collapses = [r["collapsed"] for r in recs] plan_cnt_s = [r["steered_plan"] for r in recs] mon_cnt_s = [r["steered_mon"] for r in recs] agg[dim][ver][a] = { "n": len(recs), "rr_mean": float(np.mean(rrs)) if rrs else 0.0, "rr_std": float(np.std(rrs)) if rrs else 0.0, "pqs_steered_mean": float(np.mean(pqs_steered)) if pqs_steered else 0.0, "pqs_base_mean": float(np.mean(pqs_base)) if pqs_base else 0.0, "pqs_delta_mean": (float(np.mean(pqs_steered) - np.mean(pqs_base)) if pqs_steered and pqs_base else 0.0), "collapse_rate": float(np.mean(collapses)) if collapses else 0.0, "steered_plan_mean": float(np.mean(plan_cnt_s)) if plan_cnt_s else 0.0, "steered_mon_mean": float(np.mean(mon_cnt_s)) if mon_cnt_s else 0.0, } return agg def _numeric_alphas(per_a): """Return sorted alpha-keys that can be parsed as floats.""" out = [] for a in per_a.keys(): try: out.append((a, float(a))) except (ValueError, TypeError): pass out.sort(key=lambda x: x[1]) return [a for a, _ in out] def plot_sweep_curves(agg, save_path): """Plot alpha vs RR and alpha vs PQS-delta for each (dim, version). Under NEW SEMANTICS: - alpha=1.0 is the baseline (no steering); RR and ΔPQS should be ≈ 0 there. - Lower alpha = stronger suppression. Successful steering produces: RR going UP as alpha goes DOWN (negative correlation). ΔPQS going DOWN as alpha goes DOWN (planning quality drops). - alpha > 1 = amplification (overthinker direction). """ import matplotlib.pyplot as plt dims = sorted(agg.keys()) n = len(dims) fig, axes = plt.subplots(2, n, figsize=(7 * n, 10)) if n == 1: axes = axes.reshape(2, 1) colors = {"v1_raw": "#1f77b4", "v_pca_subspace": "#d62728"} for ci, dim in enumerate(dims): ax_rr = axes[0, ci] ax_pqs = axes[1, ci] for ver in agg[dim]: if ver.startswith("force_"): continue # force-prompt has alpha=None, plot separately color = colors.get(ver, "gray") sorted_alphas = _numeric_alphas(agg[dim][ver]) xs = [float(a) for a in sorted_alphas] rrs = [agg[dim][ver][a]["rr_mean"] for a in sorted_alphas] pqs_deltas = [agg[dim][ver][a]["pqs_delta_mean"] for a in sorted_alphas] ax_rr.plot(xs, rrs, "o-", label=ver, color=color) ax_pqs.plot(xs, pqs_deltas, "o-", label=ver, color=color) # Mark baseline at alpha=1 (NEW SEMANTICS) ax_rr.axvline(1.0, color="black", linestyle="--", alpha=0.5, label="baseline (α=1)") ax_pqs.axvline(1.0, color="black", linestyle="--", alpha=0.5) ax_rr.axhline(0, color="gray", linestyle=":") ax_pqs.axhline(0, color="gray", linestyle=":") ax_rr.set_xlabel("α (1=baseline, 0=full suppress, >1=amplify)") ax_rr.set_ylabel("RR (reduction rate)") ax_rr.set_title(f"{dim} — trigger count change\n(higher RR at α<1 = good)") ax_rr.legend(fontsize=8) ax_rr.grid(alpha=0.3) ax_pqs.set_xlabel("α (1=baseline, 0=full suppress, >1=amplify)") ax_pqs.set_ylabel("Δ PQS (steered − base)") ax_pqs.set_title(f"{dim} — planning quality change\n(negative ΔPQS at α<1 = capability loss)") ax_pqs.legend(fontsize=8) ax_pqs.grid(alpha=0.3) plt.tight_layout() plt.savefig(save_path, dpi=120) plt.close() def spearman_monotonic(xs, ys): from scipy.stats import spearmanr if len(xs) < 3: return 0.0 rho, _ = spearmanr(xs, ys) if rho is None or np.isnan(rho): return 0.0 return float(rho) def generate_report(agg, downstream, interaction_summary): lines = [] lines.append("# Student Simulation — Final Report\n") lines.append("## 1. Overview\n") lines.append(f"Model: Qwen3-30B-A3B-Thinking-2507\n") # Interaction if interaction_summary: lines.append("## 2. Dimension Interaction\n") lines.append(f"- Jaccard overlap of top-K experts: **{interaction_summary.get('jaccard_overlap', 0):.3f}**") lines.append(f"- # PMI pairs (same-layer plan-mon): {interaction_summary.get('n_pmi_pairs', 0)}") if "pmi_stats" in interaction_summary: s = interaction_summary["pmi_stats"] lines.append(f"- PMI mean: {s['mean']:+.3f} (positive = experts tend to co-activate)") lines.append("") # Sweep results lines.append("## 3. Steering Sweep Results\n") lines.append("> **NEW SEMANTICS**: α=1.0 is baseline (no steering). " "Lower α = stronger suppression. α>1 = amplification.\n") lines.append("> A WORKING steering shows: max RR at LOW α (e.g. α=0), " "and Spearman ρ(α, RR) **negative** (≤ -0.5).\n") for dim in sorted(agg.keys()): lines.append(f"### 3.{1 if dim == 'planning' else 2} {dim.capitalize()}\n") lines.append("| version | best α (max RR) | max RR | ΔPQS at best α | Spearman ρ(α, RR) | monotonic? | collapse% |") lines.append("|---|---|---|---|---|---|---|") for ver in sorted(agg[dim].keys()): if ver.startswith("force_"): continue per_a = agg[dim][ver] sorted_a = _numeric_alphas(per_a) if not sorted_a: continue xs = [float(a) for a in sorted_a] rrs = [per_a[a]["rr_mean"] for a in sorted_a] pqss = [per_a[a]["pqs_delta_mean"] for a in sorted_a] best_i = int(np.argmax(rrs)) best_a = xs[best_i] rho = spearman_monotonic(xs, rrs) # Under new semantics, working steering produces NEGATIVE Spearman is_monotonic = rho <= -0.5 collapse = per_a[sorted_a[best_i]]["collapse_rate"] lines.append(f"| {ver} | {best_a:+.2f} | {rrs[best_i]:+.3f} | " f"{pqss[best_i]:+.3f} | {rho:+.3f} | " f"{'YES' if is_monotonic else 'no'} | {collapse:.1%} |") # Force prompt force_versions = [v for v in agg[dim] if v.startswith("force_")] if force_versions: lines.append("\n**Force-prompt baseline**:") for v in force_versions: # force has alpha=None per_a = agg[dim][v] keys = list(per_a.keys()) if keys: r = per_a[keys[0]] lines.append(f"- {v}: RR={r['rr_mean']:+.3f}, ΔPQS={r['pqs_delta_mean']:+.3f}") lines.append("") # Diagnosis table (PQS × RR) lines.append("## 4. Diagnosis: RR vs PQS (Planning, v_pca_subspace)\n") lines.append("> Under NEW semantics: α=1 is baseline. Steering effects " "should appear as α decreases below 1.\n") if "planning" in agg and "v_pca_subspace" in agg["planning"]: per_a = agg["planning"]["v_pca_subspace"] lines.append("| α | meaning | RR | ΔPQS | Interpretation |") lines.append("|---|---|---|---|---|") for a in _numeric_alphas(per_a): r = per_a[a] rr = r["rr_mean"] dp = r["pqs_delta_mean"] af = float(a) if abs(af - 1.0) < 1e-6: meaning = "baseline" elif af < 1.0 and af >= 0.0: meaning = f"{(1.0-af)*100:.0f}% suppression" elif af < 0.0: meaning = "over-suppression" else: meaning = "amplification" # Diagnosis: only meaningful for non-baseline if abs(af - 1.0) < 1e-6: interp = "(reference)" elif rr > 0.2 and dp < -0.05: interp = "✅ capability suppression" elif rr > 0.2 and abs(dp) < 0.05: interp = "⚠️ surface-only (RR drops but PQS unchanged)" elif abs(rr) < 0.1: interp = "— no effect" else: interp = "?" lines.append(f"| {af:+.2f} | {meaning} | {rr:+.3f} | {dp:+.3f} | {interp} |") lines.append("") # Downstream if downstream: lines.append("## 5. Downstream Accuracy\n") ts_names = list(downstream.get("baseline", {}).keys()) # 5a: Raw accuracies (pivot view) lines.append("### 5.1 Raw accuracies\n") lines.append("| config | " + " | ".join(ts_names) + " |") lines.append("|---|" + "---|" * len(ts_names)) for cfg in downstream: row = [cfg] for ts in ts_names: if ts in downstream[cfg]: a = downstream[cfg][ts]["accuracy"] row.append(f"{a:.3f}") else: row.append("-") lines.append("| " + " | ".join(row) + " |") lines.append("") # 5b: Drop vs baseline (the answer to: "did steering hurt accuracy?") non_base_cfgs = [c for c in downstream if c != "baseline"] if non_base_cfgs: lines.append("### 5.2 Accuracy drop vs baseline\n") lines.append("> **absolute drop** = baseline_acc − steered_acc (positive = WORSE under steering)\n" "> **relative drop** = absolute_drop / baseline_acc\n" "> **McNemar p** = paired-test p-value on per-problem correctness\n" "> **regr/rec** = #problems where baseline was right→steered wrong / vice versa\n") lines.append("| config | testset | baseline | steered | Δ abs | Δ rel | regr/rec | McNemar p | sig p<0.05 |") lines.append("|---|---|---|---|---|---|---|---|---|") for cfg in non_base_cfgs: for ts in ts_names: if ts not in downstream[cfg]: continue rec = downstream[cfg][ts] vb = rec.get("vs_baseline") if not vb: continue p = vb["mcnemar_p_value"] p_str = f"{p:.3g}" if p is not None else "n/a" sig = "✅" if vb["significant_at_0_05"] else "—" lines.append( f"| {cfg} | {ts} | {vb['baseline_accuracy']:.3f} | " f"{vb['steered_accuracy']:.3f} | " f"{vb['absolute_drop']:+.3f} | {vb['relative_drop']:+.1%} | " f"{vb['n_regressions']}/{vb['n_recoveries']} | " f"{p_str} | {sig} |" ) lines.append("") # Go/No-Go lines.append("## 6. Go/No-Go Decision\n") lines.append("> Decision criteria under NEW semantics:\n" "> - WORKING: max RR > 0.3 AT α < 1 AND Spearman ρ(α, RR) ≤ -0.5\n" "> - For planning, additionally need ΔPQS < -0.05 at the best α.\n") def _check_dim_working(dim, require_pqs_drop=False): """A dim 'works' if some version shows monotonic RR rise as α drops below 1.""" if dim not in agg: return False for ver, per_a in agg[dim].items(): if ver.startswith("force_"): continue sorted_a = _numeric_alphas(per_a) if len(sorted_a) < 3: continue xs = [float(a) for a in sorted_a] rrs = [per_a[a]["rr_mean"] for a in sorted_a] pqss = [per_a[a]["pqs_delta_mean"] for a in sorted_a] rho = spearman_monotonic(xs, rrs) max_rr = max(rrs) min_pqs = min(pqss) if max_rr > 0.3 and rho <= -0.5: if require_pqs_drop and min_pqs >= -0.05: continue return True return False mon_ok = _check_dim_working("monitoring", require_pqs_drop=False) plan_ok = _check_dim_working("planning", require_pqs_drop=True) if mon_ok and plan_ok: lines.append("**[GO]** Both dimensions show monotonic α-RR sweeps + planning shows PQS decline. " "Proceed to further student-simulation experiments.") elif mon_ok and not plan_ok: lines.append("**[PARTIAL]** Monitoring dimension works (RR responds to α monotonically), " "but planning either has no RR response or shows surface-only suppression " "(RR drops without PQS drop). Focus paper on monitoring; investigate why " "planning is distributed at the residual-stream level.") elif not mon_ok and plan_ok: lines.append("**[UNEXPECTED]** Planning works but monitoring does not. " "Review monitoring regex coverage and decision-point labeling.") else: lines.append("**[NO-GO]** Neither dimension shows clean steering. " "Reconsider methodology or scale.") lines.append("") return "\n".join(lines) def main(): parser = argparse.ArgumentParser() args = parser.parse_args() ensure_dirs() log = setup_logger("13_analyze", LOGS_DIR / "13_analyze.log") if not SWEEP_LOG.exists(): log.error(f"No sweep log at {SWEEP_LOG}") return records = read_jsonl(SWEEP_LOG) log.info(f"Loaded {len(records)} sweep records") agg = aggregate_sweep(records) # Save aggregated agg_serializable = { dim: {ver: {a: v for a, v in alphas.items()} for ver, alphas in vs.items()} for dim, vs in agg.items() } write_json(agg_serializable, SWEEP_RR_JSON) log.info(f"Saved aggregated sweep: {SWEEP_RR_JSON}") # Plot plot_sweep_curves(agg, SWEEP_CURVES_FIG) log.info(f"Saved curves: {SWEEP_CURVES_FIG}") # Load downstream (if available) downstream = {} if DOWNSTREAM_ACC_JSON.exists(): downstream = read_json(DOWNSTREAM_ACC_JSON) # Drop per_sample to keep report compact for cfg in downstream: for ts in downstream[cfg]: downstream[cfg][ts].pop("per_sample", None) # Interaction summary interaction = {} ip = RESULTS_DIR / "interaction_summary.json" if ip.exists(): interaction = read_json(ip) # Generate report report = generate_report(agg, downstream, interaction) FINAL_REPORT.write_text(report, encoding="utf-8") log.info(f"Saved final report: {FINAL_REPORT}") if __name__ == "__main__": main()