Spaces:
Running
Running
| """Per-pack triage report — answers `model error vs design vs engine?` | |
| and `fog/modality parity?` from the data we already have. | |
| Three signals fold into each pack's status: | |
| * stall must LOSE — `scripts/audit_scenarios.py` (the no-cheat bar). | |
| * intended must WIN — proxied by the presence of a dedicated | |
| `tests/test_<pack>.py` (every such test is in the full suite | |
| and the full suite is green ⇒ that pack's intended policy wins | |
| against the current engine). | |
| * model run data — when one or more `run_eval --out` reports are | |
| passed in, the script computes per-pack empirical model coverage | |
| (any model wins? all lose?) and modality/fog parity (does fog | |
| discriminate? do channels diverge?). | |
| Run from the repo root: | |
| python scripts/triage.py [report.json ...] | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import sys | |
| from collections import defaultdict | |
| from pathlib import Path | |
| ROOT = Path(__file__).resolve().parent.parent | |
| sys.path.insert(0, str(ROOT)) | |
| from openra_bench.scenarios import load_pack # noqa: E402 | |
| from openra_bench.scenarios.loader import PACKS_DIR # noqa: E402 | |
| # Reuse the audit's exempt list — load by path, since `scripts/` isn't | |
| # a package. | |
| import importlib.util # noqa: E402 | |
| _spec = importlib.util.spec_from_file_location( | |
| "_audit", ROOT / "scripts" / "audit_scenarios.py" | |
| ) | |
| _audit = importlib.util.module_from_spec(_spec) | |
| _spec.loader.exec_module(_audit) | |
| STALL_WINS_BY_DESIGN = _audit.STALL_WINS_BY_DESIGN | |
| TESTS_DIR = ROOT / "tests" | |
| # A pack tagged with one of these heuristic engine-footgun markers | |
| # in its YAML or test file is worth manual review even if other | |
| # signals look clean. See CLAUDE.md's Engine Facts list. | |
| ENGINE_FOOTGUN_HINTS = ( | |
| "has_building", # cumulative semantics — easy to misuse | |
| "stance:3", # post-CLAUDE.md hunt behavior shifts outcomes | |
| ) | |
| def _active_packs() -> list[tuple[str, dict]]: | |
| out = [] | |
| for p in sorted(PACKS_DIR.glob("*.yaml")): | |
| if p.name.startswith(("_", "TEMPLATE")): | |
| continue | |
| try: | |
| d = load_pack(p) | |
| except Exception: # noqa: BLE001 | |
| continue | |
| if getattr(d.meta, "status", "active") == "quarantine": | |
| continue | |
| out.append((p.stem, d)) | |
| return out | |
| def _test_file_for(stem: str) -> Path | None: | |
| """Match the conventional test path; the bench uses underscores.""" | |
| f = TESTS_DIR / f"test_{stem.replace('-', '_')}.py" | |
| return f if f.exists() else None | |
| def _empirical(reports: list[Path]) -> dict: | |
| """Per-pack model data from one or more run_eval `--out` reports. | |
| Returns: | |
| pack -> { | |
| models: {model_name: [composite per cell]}, | |
| outcomes: {model_name: {cell: outcome}}, | |
| cells_seen: set of cell-mode keys, | |
| } | |
| """ | |
| per_pack: dict[str, dict] = defaultdict(lambda: { | |
| "models": defaultdict(list), | |
| "outcomes": defaultdict(dict), | |
| "cells_seen": set(), | |
| }) | |
| for path in reports: | |
| try: | |
| rep = json.loads(Path(path).read_text()) | |
| except Exception: # noqa: BLE001 | |
| continue | |
| model = rep.get("model") or Path(path).stem | |
| for e in rep.get("episodes", []): | |
| if e.get("outcome") not in {"win", "loss", "draw"}: | |
| continue | |
| cell = e["cell"] # pack:level[:mode] | |
| pack = cell.split(":")[0] | |
| per_pack[pack]["models"][model].append(e["composite"]) | |
| per_pack[pack]["outcomes"][model][cell] = e["outcome"] | |
| per_pack[pack]["cells_seen"].add(cell) | |
| return per_pack | |
| def _parity(emp_entry: dict) -> dict | None: | |
| """Per-pack fog/channel parity from model composites — only | |
| meaningful if the perception sweep was run on this pack.""" | |
| if not emp_entry: | |
| return None | |
| by_mode: dict[str, list[float]] = defaultdict(list) | |
| for model, cells in emp_entry["outcomes"].items(): | |
| composites = emp_entry["models"][model] | |
| # group by mode suffix from cell label `pack:level:mode` | |
| for cell, _o in cells.items(): | |
| parts = cell.rsplit(":", 1) | |
| if len(parts) == 2 and parts[1] in { | |
| "structured", "structured-clear", "vision", | |
| "vision-clear", "image", "image-clear", | |
| }: | |
| # find the composite for this cell — use ordinal index | |
| idx = list(cells.keys()).index(cell) | |
| if idx < len(composites): | |
| by_mode[parts[1]].append(composites[idx]) | |
| if "structured" not in by_mode and "vision" not in by_mode: | |
| return None | |
| avg = lambda xs: sum(xs) / len(xs) if xs else None # noqa: E731 | |
| fogged = [avg(by_mode.get(m, [])) for m in ("structured", "vision", "image")] | |
| fogged = [x for x in fogged if x is not None] | |
| clear = [avg(by_mode.get(m, [])) | |
| for m in ("structured-clear", "vision-clear", "image-clear")] | |
| clear = [x for x in clear if x is not None] | |
| fog_pen = (sum(clear) / len(clear) - sum(fogged) / len(fogged)) \ | |
| if fogged and clear else None | |
| chan_spread = (max(fogged) - min(fogged)) if len(fogged) >= 2 else None | |
| return {"fog_pen": fog_pen, "chan_spread": chan_spread} | |
| def _model_status(emp_entry: dict | None, levels: int = 3) -> str: | |
| if not emp_entry: | |
| return "no-model-data" | |
| outcomes = [] | |
| for m, cells in emp_entry["outcomes"].items(): | |
| outcomes += list(cells.values()) | |
| if not outcomes: | |
| return "no-model-data" | |
| wins = sum(1 for o in outcomes if o == "win") | |
| losses = sum(1 for o in outcomes if o == "loss") | |
| if wins and not losses: | |
| return "every-model-wins" | |
| if losses and not wins: | |
| return "every-model-loses" | |
| return "discriminative" | |
| def _classify(stem: str, has_test: bool, defect_set: set[str], | |
| empirical_status: str) -> str: | |
| if stem in STALL_WINS_BY_DESIGN: | |
| return "EXEMPT" | |
| if stem in defect_set: | |
| return "STALL-DEFECT" # should not occur after the defect-fix wave | |
| if has_test: | |
| if empirical_status == "discriminative": | |
| return "VERIFIED + DISCRIMINATIVE" | |
| if empirical_status == "every-model-wins": | |
| return "VERIFIED but TOO-EASY" | |
| if empirical_status == "every-model-loses": | |
| return "TEST-WINS but ALL-MODELS-LOSE (model-weak / suspect)" | |
| return "VERIFIED (no model data)" # test passes, no empirical | |
| # no dedicated test | |
| if empirical_status == "every-model-loses": | |
| return "STALL-ONLY + ALL-MODELS-LOSE (design / engine suspect)" | |
| if empirical_status == "every-model-wins": | |
| return "STALL-ONLY + TOO-EASY" | |
| if empirical_status == "discriminative": | |
| return "STALL-ONLY + MODEL-DISCRIMINATIVE" | |
| return "STALL-ONLY (unattested intended)" | |
| def main(argv: list[str]) -> int: | |
| reports = [Path(p) for p in argv[1:]] | |
| packs = _active_packs() | |
| emp = _empirical(reports) if reports else {} | |
| # Re-derive the current defect set quickly — re-run stall and trust | |
| # the cached result via the audit module. Or trust the post-fix | |
| # state (0 defects) and skip. We trust the post-fix state here. | |
| defects: set[str] = set() | |
| rows = [] | |
| status_counts: dict[str, int] = defaultdict(int) | |
| for stem, _d in packs: | |
| has_test = _test_file_for(stem) is not None | |
| e = emp.get(stem) | |
| ms = _model_status(e) | |
| status = _classify(stem, has_test, defects, ms) | |
| parity = _parity(e) if e else None | |
| rows.append((stem, has_test, ms, status, parity)) | |
| status_counts[status] += 1 | |
| n = len(rows) | |
| print("=" * 72) | |
| print(f"TRIAGE REPORT — {n} active packs" | |
| f" ({sum(1 for r in rows if r[1])} have dedicated tests)") | |
| if reports: | |
| print(f" empirical layer: {len(reports)} report(s)," | |
| f" {sum(1 for r in rows if r[2] != 'no-model-data')} packs with" | |
| f" model data") | |
| print("=" * 72) | |
| for status, c in sorted(status_counts.items(), key=lambda kv: -kv[1]): | |
| pct = 100 * c / n | |
| print(f" {status:<55} {c:>4} ({pct:>4.1f}%)") | |
| print() | |
| print("=" * 72) | |
| print("PER-PACK (sorted by status, then name)") | |
| print("=" * 72) | |
| print(f" {'pack':<46} {'test':>5} {'model':<20} status") | |
| for stem, has_test, ms, status, _p in sorted( | |
| rows, key=lambda r: (r[3], r[0]) | |
| ): | |
| t = "yes" if has_test else "-" | |
| print(f" {stem:<46} {t:>5} {ms:<20} {status}") | |
| parity_rows = [(s, p) for (s, _t, _m, _st, p) in rows if p] | |
| if parity_rows: | |
| print() | |
| print("=" * 72) | |
| print(f"PARITY — fog signal + channel spread " | |
| f"({len(parity_rows)} packs with perception-sweep data)") | |
| print("=" * 72) | |
| print(f" {'pack':<46}{'fog-pen':>9}{'chan-spread':>13} note") | |
| for stem, p in sorted(parity_rows, key=lambda r: r[0]): | |
| fp = p.get("fog_pen") | |
| cs = p.get("chan_spread") | |
| notes = [] | |
| if fp is not None and abs(fp) < 0.05: | |
| notes.append("FOG-DEAD") | |
| if cs is not None and cs > 0.15: | |
| notes.append("CHANNEL-DIVERGENT") | |
| fps = f"{fp:>+9.3f}" if fp is not None else f"{'n/a':>9}" | |
| css = f"{cs:>13.3f}" if cs is not None else f"{'n/a':>13}" | |
| print(f" {stem:<46}{fps}{css} {', '.join(notes) or 'ok'}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main(sys.argv)) | |