| """Score the metacognition probe output. |
| |
| Reads a signals JSONL (one row per prompt from metacog_probe.py), applies |
| regime-specific correctness detectors, then for each (regime, signal) |
| computes AUROC + 1000-bootstrap 95% CI. Emits a REPORT.md with the 7x7 |
| matrix and the verdict against the pre-registered decision rule. |
| |
| Pre-registered rule (METACOGNITION_STUDY_SCOPE_2026-05-23.md): |
| A signal "wins" a regime if its AUROC is >= 0.02 higher than the |
| best baseline (max_softmax_last / max_softmax_mean / abstain_p) on |
| that regime, with bootstrap 95% CI not crossing zero (Ξ-AUROC test). |
| |
| Router entropy is proven if it wins >= 4 of 7 regimes, including |
| at least one of {gibberish, factual_misleading, neo_false_inability}. |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import math |
| import re |
| from collections import defaultdict |
| from pathlib import Path |
|
|
| import numpy as np |
|
|
|
|
| |
| |
| SIGNAL_NAMES = [ |
| "max_softmax_last", |
| "max_softmax_mean", |
| "router_conf", |
| "router_entropy_var", |
| "abstain_p", |
| ] |
| ROUTER_SIGNALS = {"router_conf", "router_entropy_var"} |
| BASELINE_SIGNALS = {"max_softmax_last", "max_softmax_mean", "abstain_p"} |
|
|
| REFUSAL_PATTERNS = [ |
| r"\bi (don'?t|do not) know\b", |
| r"\bi'?m not sure\b", |
| r"\bnot sure\b", |
| r"\bno idea\b", |
| r"\bcan'?t (help|answer|tell)\b", |
| r"\b(unknown|unclear)\b", |
| r"\bdon'?t have\b", |
| r"\bidk\b", |
| r"^\s*$", |
| ] |
| AFFIRM_PATTERNS = [ |
| r"^\s*yes\b", |
| r"^\s*sure\b", |
| r"^\s*indeed\b", |
| r"^\s*correct\b", |
| r"\bthat'?s true\b", |
| r"\bit is true\b", |
| ] |
|
|
| _REFUSAL_RE = re.compile("|".join(REFUSAL_PATTERNS), re.IGNORECASE) |
| _AFFIRM_RE = re.compile("|".join(AFFIRM_PATTERNS), re.IGNORECASE) |
|
|
|
|
| def _is_loopy(text: str) -> bool: |
| """Detect repetition loops β same 3-byte trigram >= 5 times.""" |
| if len(text) < 15: |
| return False |
| counts: dict[str, int] = {} |
| for i in range(len(text) - 2): |
| tri = text[i:i+3] |
| counts[tri] = counts.get(tri, 0) + 1 |
| return max(counts.values()) >= 5 |
|
|
|
|
| def _is_refusal(text: str) -> bool: |
| return bool(_REFUSAL_RE.search(text)) or text.strip() in ("", "(empty)") |
|
|
|
|
| def _is_affirmation(text: str) -> bool: |
| return bool(_AFFIRM_RE.search(text)) |
|
|
|
|
| def _topic_terms(prompt: str) -> set[str]: |
| """Strip USER:/TILELLI: framing; return content terms β₯4 chars.""" |
| p = re.sub(r"^.*?USER:\s*", "", prompt, flags=re.IGNORECASE | re.DOTALL) |
| p = re.sub(r"TILELLI:.*$", "", p, flags=re.IGNORECASE | re.DOTALL) |
| p = re.sub(r"\bcontext\b", "", p, flags=re.IGNORECASE) |
| return {w for w in re.findall(r"[a-z]{4,}", p.lower()) |
| if w not in {"what","tell","about","write","sentence","know","explain", |
| "describe","briefly","define","important","fact","kindly", |
| "elaborate","employing","precise","diction","please","deal"}} |
|
|
|
|
| def _is_relevant(prompt: str, text: str) -> bool: |
| """True if response shares β₯1 content term with the prompt (β₯4-char word). |
| Cheap semantic-relevance proxy; not perfect but better than nothing.""" |
| if not text: |
| return False |
| p_terms = _topic_terms(prompt) |
| t_terms = {w for w in re.findall(r"[a-z]{4,}", text.lower())} |
| return bool(p_terms & t_terms) |
|
|
|
|
| def label_response(regime: str, prompt: str, text: str) -> int: |
| """Return 1 if model's response is acceptable for this regime, else 0. |
| |
| Revised 2026-05-23 (postmortem) after the length-based labels combined |
| with length-correlated signals (L1 router entropy r=0.77β0.90 with |
| n_generated) produced artifactual AUROC wins. NO length thresholds. |
| |
| Conventions: |
| - "in_domain", "ood_style": correct = coherent (non-empty, non-loopy, |
| β₯1 alphabetic word, AND topically relevant to the prompt). |
| - "ood_topic", "gibberish", "factual_misleading", "long_input": |
| correct = explicit refusal pattern (no length proxy). |
| - "neo_false_inability": correct = mentions broader-search keywords. |
| """ |
| t = (text or "").strip() |
| tl = t.lower() |
|
|
| if regime in ("in_domain", "ood_style"): |
| if not t or t == "(empty)": |
| return 0 |
| if _is_loopy(t): |
| return 0 |
| if not re.search(r"\b[a-z]{3,}\b", tl): |
| return 0 |
| if not _is_relevant(prompt, t): |
| return 0 |
| return 1 |
|
|
| if regime in ("gibberish", "ood_topic", "long_input"): |
| |
| return 1 if _is_refusal(t) else 0 |
|
|
| if regime == "factual_misleading": |
| if _is_affirmation(t): |
| return 0 |
| if _is_refusal(t): |
| return 1 |
| |
| if len(t) >= 10 and any(w in tl for w in |
| ("actually", "false", "incorrect", "wrong")): |
| return 1 |
| return 0 |
|
|
| if regime == "neo_false_inability": |
| |
| keywords = ["memory", "persistent", "/memory/", "broaden", "category", |
| "another", "different place", "search elsewhere", |
| "notes", "history", "project", "session"] |
| return int(any(k in tl for k in keywords)) |
|
|
| return 0 |
|
|
|
|
| def auroc(y_true: np.ndarray, y_score: np.ndarray) -> float: |
| """Mann-Whitney rank-sum AUROC. Handles ties properly via average rank.""" |
| n = len(y_true) |
| if n < 2: |
| return float("nan") |
| n_pos = int(y_true.sum()) |
| n_neg = n - n_pos |
| if n_pos == 0 or n_neg == 0: |
| return float("nan") |
| order = np.argsort(y_score, kind="mergesort") |
| ranks = np.empty(n, dtype=float) |
| |
| i = 0 |
| while i < n: |
| j = i |
| while j + 1 < n and y_score[order[j+1]] == y_score[order[i]]: |
| j += 1 |
| avg_rank = 0.5 * (i + j) + 1.0 |
| for k in range(i, j + 1): |
| ranks[order[k]] = avg_rank |
| i = j + 1 |
| rank_sum_pos = ranks[y_true == 1].sum() |
| return float((rank_sum_pos - n_pos * (n_pos + 1) / 2.0) / (n_pos * n_neg)) |
|
|
|
|
| def bootstrap_auroc(y_true: np.ndarray, y_score: np.ndarray, *, |
| n_boot: int = 1000, seed: int = 0) -> tuple[float, float, float]: |
| rng = np.random.default_rng(seed) |
| n = len(y_true) |
| point = auroc(y_true, y_score) |
| if math.isnan(point): |
| return point, float("nan"), float("nan") |
| samples = [] |
| for _ in range(n_boot): |
| idx = rng.integers(0, n, n) |
| s = auroc(y_true[idx], y_score[idx]) |
| if not math.isnan(s): |
| samples.append(s) |
| if not samples: |
| return point, float("nan"), float("nan") |
| lo, hi = np.percentile(samples, [2.5, 97.5]) |
| return point, float(lo), float(hi) |
|
|
|
|
| def bootstrap_delta_auroc(y_true: np.ndarray, s_router: np.ndarray, s_base: np.ndarray, |
| *, n_boot: int = 1000, seed: int = 0) -> tuple[float, float, float]: |
| """Ξ-AUROC = AUROC(router) β AUROC(baseline) on PAIRED resamples.""" |
| rng = np.random.default_rng(seed) |
| n = len(y_true) |
| point = auroc(y_true, s_router) - auroc(y_true, s_base) |
| samples = [] |
| for _ in range(n_boot): |
| idx = rng.integers(0, n, n) |
| a = auroc(y_true[idx], s_router[idx]) |
| b = auroc(y_true[idx], s_base[idx]) |
| if not (math.isnan(a) or math.isnan(b)): |
| samples.append(a - b) |
| if not samples: |
| return point, float("nan"), float("nan") |
| lo, hi = np.percentile(samples, [2.5, 97.5]) |
| return float(point), float(lo), float(hi) |
|
|
|
|
| def main(): |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--probe-out", required=True, type=str, |
| help="JSONL from metacog_probe.py") |
| ap.add_argument("--report-dir", required=True, type=str, |
| help="output directory (REPORT.md + LABELED.jsonl)") |
| ap.add_argument("--n-boot", type=int, default=1000) |
| args = ap.parse_args() |
|
|
| rows: list[dict] = [] |
| with open(args.probe_out) as f: |
| for line in f: |
| line = line.strip() |
| if line: |
| rows.append(json.loads(line)) |
| print(f"[score] loaded {len(rows)} probe rows") |
|
|
| |
| for r in rows: |
| r["label"] = label_response(r["regime"], r["prompt"], r.get("text", "")) |
|
|
| out_dir = Path(args.report_dir) |
| out_dir.mkdir(parents=True, exist_ok=True) |
| labeled_path = out_dir / "labeled.jsonl" |
| with labeled_path.open("w") as f: |
| for r in rows: |
| f.write(json.dumps(r) + "\n") |
| print(f"[score] wrote labeled rows β {labeled_path}") |
|
|
| |
| by_regime: dict[str, list[dict]] = defaultdict(list) |
| for r in rows: |
| by_regime[r["regime"]].append(r) |
|
|
| regime_order = ["in_domain", "ood_style", "ood_topic", "gibberish", |
| "factual_misleading", "neo_false_inability", "long_input"] |
| regime_order = [r for r in regime_order if r in by_regime] |
|
|
| |
| |
| |
| |
| |
| |
| |
| ABSTAIN_REGIMES = {"gibberish", "ood_topic", "factual_misleading", |
| "long_input", "neo_false_inability"} |
|
|
| auroc_table: dict[tuple[str, str], tuple[float, float, float]] = {} |
| label_summary: dict[str, tuple[int, int]] = {} |
|
|
| for regime in regime_order: |
| recs = by_regime[regime] |
| y = np.array([r["label"] for r in recs], dtype=int) |
| label_summary[regime] = (int(y.sum()), int(len(y))) |
| for sig in SIGNAL_NAMES: |
| vals = [] |
| for r in recs: |
| v = r["signals"].get(sig, float("nan")) |
| vals.append(v if v is not None else float("nan")) |
| arr = np.array(vals, dtype=float) |
| |
| mask = ~np.isnan(arr) |
| yv, av = y[mask], arr[mask] |
| if regime in ABSTAIN_REGIMES: |
| |
| av = -av |
| point, lo, hi = bootstrap_auroc(yv, av, n_boot=args.n_boot, |
| seed=hash((regime, sig)) & 0xFFFFFFFF) |
| auroc_table[(regime, sig)] = (point, lo, hi) |
|
|
| |
| wins_summary: dict[str, dict] = {} |
| for regime in regime_order: |
| recs = by_regime[regime] |
| y = np.array([r["label"] for r in recs], dtype=int) |
| flip = regime in ABSTAIN_REGIMES |
| best_base_name, best_base_auroc = None, -1.0 |
| for sig in BASELINE_SIGNALS: |
| point, _, _ = auroc_table[(regime, sig)] |
| if not math.isnan(point) and point > best_base_auroc: |
| best_base_auroc, best_base_name = point, sig |
| regime_record = {"best_baseline": best_base_name, |
| "best_baseline_auroc": best_base_auroc, |
| "router_wins": []} |
| if best_base_name is None: |
| wins_summary[regime] = regime_record |
| continue |
| |
| base_vals = np.array([r["signals"].get(best_base_name, float("nan")) |
| for r in recs], dtype=float) |
| if flip: |
| base_vals = -base_vals |
| for sig in ROUTER_SIGNALS: |
| r_vals = np.array([r["signals"].get(sig, float("nan")) |
| for r in recs], dtype=float) |
| if flip: |
| r_vals = -r_vals |
| mask = ~(np.isnan(base_vals) | np.isnan(r_vals)) |
| if mask.sum() < 4: |
| continue |
| d, lo, hi = bootstrap_delta_auroc( |
| y[mask], r_vals[mask], base_vals[mask], |
| n_boot=args.n_boot, |
| seed=hash((regime, sig, "delta")) & 0xFFFFFFFF, |
| ) |
| won = (d >= 0.02) and (lo > 0) |
| regime_record["router_wins"].append({ |
| "signal": sig, "delta_auroc": d, "ci": [lo, hi], "won": won, |
| }) |
| wins_summary[regime] = regime_record |
|
|
| |
| |
| |
| |
| KEY_REGIMES = {"gibberish", "factual_misleading", "neo_false_inability"} |
| per_signal_wins: dict[str, list[str]] = {s: [] for s in ROUTER_SIGNALS} |
| family_wins: list[str] = [] |
| for regime, rec in wins_summary.items(): |
| any_won = False |
| for w in rec["router_wins"]: |
| if w["won"]: |
| per_signal_wins[w["signal"]].append(regime) |
| any_won = True |
| if any_won: |
| family_wins.append(regime) |
| n_wins = len(family_wins) |
| key_wins = [r for r in family_wins if r in KEY_REGIMES] |
|
|
| if n_wins >= 4 and key_wins: |
| verdict = "PROVEN" |
| elif n_wins >= 1: |
| verdict = "PARTIAL" |
| else: |
| verdict = "DISPROVEN" |
|
|
| |
| md = ["# Tilelli Metacognition Study β REPORT", |
| "", |
| f"- Probe input: `{args.probe_out}`", |
| f"- Bootstrap resamples: {args.n_boot}", |
| f"- Prompts scored: {len(rows)}", |
| "", |
| "## Label balance per regime", |
| "", |
| "| Regime | label=1 (correct) | total | balance |", |
| "|---|---:|---:|---:|"] |
| for regime in regime_order: |
| pos, tot = label_summary[regime] |
| md.append(f"| `{regime}` | {pos} | {tot} | {pos/tot:.1%} |") |
| md.append("") |
| md.append("## AUROC matrix (per-signal, per-regime; bootstrap 95% CI)") |
| md.append("") |
| md.append("Higher = signal better predicts the correctness label for the") |
| md.append("regime. For abstain regimes (gibberish / OOD-topic / factual /") |
| md.append("long-input / NEO) the signal is **inverted** so 'high AUROC'") |
| md.append("consistently means 'better-calibrated.'") |
| md.append("") |
| header = "| Regime | " + " | ".join(SIGNAL_NAMES) + " |" |
| sep = "|---|" + "|".join([":---:"] * len(SIGNAL_NAMES)) + "|" |
| md.append(header) |
| md.append(sep) |
| for regime in regime_order: |
| row = [f"`{regime}`"] |
| for sig in SIGNAL_NAMES: |
| p, lo, hi = auroc_table[(regime, sig)] |
| if math.isnan(p): |
| row.append("β") |
| else: |
| row.append(f"{p:.3f}<br><sub>[{lo:.2f}, {hi:.2f}]</sub>") |
| md.append("| " + " | ".join(row) + " |") |
| md.append("") |
| md.append("## Ξ-AUROC: router signals β best baseline (per regime)") |
| md.append("") |
| md.append("Pre-registered win criterion: Ξ β₯ 0.02 AND bootstrap 95% CI > 0.") |
| md.append("Both router signals are tested; either winning counts the regime") |
| md.append("for the router-entropy family verdict.") |
| md.append("") |
| md.append("| Regime | Best baseline | Base AUROC | router_conf Ξ | router_conf CI | Won? | router_entropy_var Ξ | router_entropy_var CI | Won? |") |
| md.append("|---|---|---:|---:|---|:---:|---:|---|:---:|") |
| for regime in regime_order: |
| rec = wins_summary[regime] |
| bb = rec["best_baseline"] |
| bba = rec["best_baseline_auroc"] |
| wins_by_sig = {w["signal"]: w for w in rec["router_wins"]} |
| cells = [f"`{regime}`", bb or "β", f"{bba:.3f}"] |
| for sig in ("router_conf", "router_entropy_var"): |
| w = wins_by_sig.get(sig) |
| if w is None: |
| cells += ["β", "β", "β"] |
| else: |
| cells += [ |
| f"{w['delta_auroc']:+.3f}", |
| f"[{w['ci'][0]:+.2f}, {w['ci'][1]:+.2f}]", |
| "β" if w["won"] else "β", |
| ] |
| md.append("| " + " | ".join(cells) + " |") |
| md.append("") |
| md.append("## Verdict") |
| md.append("") |
| md.append(f"- Router-entropy family wins **{n_wins} / 7** regimes: " |
| f"{', '.join('`'+r+'`' for r in family_wins) if family_wins else 'none'}") |
| md.append(f" - `router_conf` (mean): {len(per_signal_wins['router_conf'])} " |
| f"({', '.join('`'+r+'`' for r in per_signal_wins['router_conf']) or 'none'})") |
| md.append(f" - `router_entropy_var` (per-layer variance): {len(per_signal_wins['router_entropy_var'])} " |
| f"({', '.join('`'+r+'`' for r in per_signal_wins['router_entropy_var']) or 'none'})") |
| md.append(f"- Of which **{len(key_wins)}** key regimes " |
| f"({', '.join(sorted(KEY_REGIMES))})") |
| md.append(f"- **Pre-registered verdict: {verdict}**") |
| md.append("") |
| if verdict == "PROVEN": |
| md.append("Router entropy is a competitive calibrated-uncertainty signal " |
| "at the 10M routed-LM scale. Next step per Phase 2A of " |
| "MASTER_PLAN_2026-05-23.md: write the short paper, ship the " |
| "uncertainty-heatmap viz to chat.tilelli.tech.") |
| elif verdict == "PARTIAL": |
| md.append("Router entropy is signal in some regimes but not the " |
| "pre-registered majority. Narrow the claim to the winning " |
| "regimes; defer publication. Per Phase 2B of " |
| "MASTER_PLAN_2026-05-23.md, decide between Track B (sparse " |
| "compute), Track C (routed retrieval), Track D (ternary-native).") |
| else: |
| md.append("Router entropy did not beat output-side baselines on any " |
| "regime by the pre-registered margin. Pivot per Phase 2B of " |
| "MASTER_PLAN_2026-05-23.md.") |
| md.append("") |
| md.append("## Honest caveats") |
| md.append("") |
| md.append("- Correctness labels are programmatic detectors, not human") |
| md.append(" grades. Refusal/affirmation regex catches common cases but") |
| md.append(" not all. A 50-item hand-grade pass would tighten the labels.") |
| md.append("- in_domain / ood_style labels are non-zero/non-loopy; this is") |
| md.append(" permissive and may inflate label=1 rate. AUROC-wise the only") |
| md.append(" cost is reduced separability, not bias.") |
| md.append("- The 200-prompt factual-misleading and ~100-prompt OOD-topic") |
| md.append(" targets in the original scope were reduced for the smoke") |
| md.append(" run; rerun at full scale to tighten CIs.") |
| md.append("- LLM-judge regime (factual subset) was skipped to stay at $0.") |
| md.append(" Regex-based label has lower precision on argumentative replies.") |
|
|
| report_path = out_dir / "REPORT.md" |
| with report_path.open("w") as f: |
| f.write("\n".join(md)) |
| print(f"[score] verdict: {verdict} ({n_wins}/7 wins, {len(key_wins)} key)") |
| print(f"[score] report β {report_path}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|