Spaces:
Running
Running
File size: 9,481 Bytes
64027cb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | """Per-pack triage report โ answers `model error vs design vs engine?`
and `fog/modality parity?` from the data we already have.
Three signals fold into each pack's status:
* stall must LOSE โ `scripts/audit_scenarios.py` (the no-cheat bar).
* intended must WIN โ proxied by the presence of a dedicated
`tests/test_<pack>.py` (every such test is in the full suite
and the full suite is green โ that pack's intended policy wins
against the current engine).
* model run data โ when one or more `run_eval --out` reports are
passed in, the script computes per-pack empirical model coverage
(any model wins? all lose?) and modality/fog parity (does fog
discriminate? do channels diverge?).
Run from the repo root:
python scripts/triage.py [report.json ...]
"""
from __future__ import annotations
import json
import sys
from collections import defaultdict
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from openra_bench.scenarios import load_pack # noqa: E402
from openra_bench.scenarios.loader import PACKS_DIR # noqa: E402
# Reuse the audit's exempt list โ load by path, since `scripts/` isn't
# a package.
import importlib.util # noqa: E402
_spec = importlib.util.spec_from_file_location(
"_audit", ROOT / "scripts" / "audit_scenarios.py"
)
_audit = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_audit)
STALL_WINS_BY_DESIGN = _audit.STALL_WINS_BY_DESIGN
TESTS_DIR = ROOT / "tests"
# A pack tagged with one of these heuristic engine-footgun markers
# in its YAML or test file is worth manual review even if other
# signals look clean. See CLAUDE.md's Engine Facts list.
ENGINE_FOOTGUN_HINTS = (
"has_building", # cumulative semantics โ easy to misuse
"stance:3", # post-CLAUDE.md hunt behavior shifts outcomes
)
def _active_packs() -> list[tuple[str, dict]]:
out = []
for p in sorted(PACKS_DIR.glob("*.yaml")):
if p.name.startswith(("_", "TEMPLATE")):
continue
try:
d = load_pack(p)
except Exception: # noqa: BLE001
continue
if getattr(d.meta, "status", "active") == "quarantine":
continue
out.append((p.stem, d))
return out
def _test_file_for(stem: str) -> Path | None:
"""Match the conventional test path; the bench uses underscores."""
f = TESTS_DIR / f"test_{stem.replace('-', '_')}.py"
return f if f.exists() else None
def _empirical(reports: list[Path]) -> dict:
"""Per-pack model data from one or more run_eval `--out` reports.
Returns:
pack -> {
models: {model_name: [composite per cell]},
outcomes: {model_name: {cell: outcome}},
cells_seen: set of cell-mode keys,
}
"""
per_pack: dict[str, dict] = defaultdict(lambda: {
"models": defaultdict(list),
"outcomes": defaultdict(dict),
"cells_seen": set(),
})
for path in reports:
try:
rep = json.loads(Path(path).read_text())
except Exception: # noqa: BLE001
continue
model = rep.get("model") or Path(path).stem
for e in rep.get("episodes", []):
if e.get("outcome") not in {"win", "loss", "draw"}:
continue
cell = e["cell"] # pack:level[:mode]
pack = cell.split(":")[0]
per_pack[pack]["models"][model].append(e["composite"])
per_pack[pack]["outcomes"][model][cell] = e["outcome"]
per_pack[pack]["cells_seen"].add(cell)
return per_pack
def _parity(emp_entry: dict) -> dict | None:
"""Per-pack fog/channel parity from model composites โ only
meaningful if the perception sweep was run on this pack."""
if not emp_entry:
return None
by_mode: dict[str, list[float]] = defaultdict(list)
for model, cells in emp_entry["outcomes"].items():
composites = emp_entry["models"][model]
# group by mode suffix from cell label `pack:level:mode`
for cell, _o in cells.items():
parts = cell.rsplit(":", 1)
if len(parts) == 2 and parts[1] in {
"structured", "structured-clear", "vision",
"vision-clear", "image", "image-clear",
}:
# find the composite for this cell โ use ordinal index
idx = list(cells.keys()).index(cell)
if idx < len(composites):
by_mode[parts[1]].append(composites[idx])
if "structured" not in by_mode and "vision" not in by_mode:
return None
avg = lambda xs: sum(xs) / len(xs) if xs else None # noqa: E731
fogged = [avg(by_mode.get(m, [])) for m in ("structured", "vision", "image")]
fogged = [x for x in fogged if x is not None]
clear = [avg(by_mode.get(m, []))
for m in ("structured-clear", "vision-clear", "image-clear")]
clear = [x for x in clear if x is not None]
fog_pen = (sum(clear) / len(clear) - sum(fogged) / len(fogged)) \
if fogged and clear else None
chan_spread = (max(fogged) - min(fogged)) if len(fogged) >= 2 else None
return {"fog_pen": fog_pen, "chan_spread": chan_spread}
def _model_status(emp_entry: dict | None, levels: int = 3) -> str:
if not emp_entry:
return "no-model-data"
outcomes = []
for m, cells in emp_entry["outcomes"].items():
outcomes += list(cells.values())
if not outcomes:
return "no-model-data"
wins = sum(1 for o in outcomes if o == "win")
losses = sum(1 for o in outcomes if o == "loss")
if wins and not losses:
return "every-model-wins"
if losses and not wins:
return "every-model-loses"
return "discriminative"
def _classify(stem: str, has_test: bool, defect_set: set[str],
empirical_status: str) -> str:
if stem in STALL_WINS_BY_DESIGN:
return "EXEMPT"
if stem in defect_set:
return "STALL-DEFECT" # should not occur after the defect-fix wave
if has_test:
if empirical_status == "discriminative":
return "VERIFIED + DISCRIMINATIVE"
if empirical_status == "every-model-wins":
return "VERIFIED but TOO-EASY"
if empirical_status == "every-model-loses":
return "TEST-WINS but ALL-MODELS-LOSE (model-weak / suspect)"
return "VERIFIED (no model data)" # test passes, no empirical
# no dedicated test
if empirical_status == "every-model-loses":
return "STALL-ONLY + ALL-MODELS-LOSE (design / engine suspect)"
if empirical_status == "every-model-wins":
return "STALL-ONLY + TOO-EASY"
if empirical_status == "discriminative":
return "STALL-ONLY + MODEL-DISCRIMINATIVE"
return "STALL-ONLY (unattested intended)"
def main(argv: list[str]) -> int:
reports = [Path(p) for p in argv[1:]]
packs = _active_packs()
emp = _empirical(reports) if reports else {}
# Re-derive the current defect set quickly โ re-run stall and trust
# the cached result via the audit module. Or trust the post-fix
# state (0 defects) and skip. We trust the post-fix state here.
defects: set[str] = set()
rows = []
status_counts: dict[str, int] = defaultdict(int)
for stem, _d in packs:
has_test = _test_file_for(stem) is not None
e = emp.get(stem)
ms = _model_status(e)
status = _classify(stem, has_test, defects, ms)
parity = _parity(e) if e else None
rows.append((stem, has_test, ms, status, parity))
status_counts[status] += 1
n = len(rows)
print("=" * 72)
print(f"TRIAGE REPORT โ {n} active packs"
f" ({sum(1 for r in rows if r[1])} have dedicated tests)")
if reports:
print(f" empirical layer: {len(reports)} report(s),"
f" {sum(1 for r in rows if r[2] != 'no-model-data')} packs with"
f" model data")
print("=" * 72)
for status, c in sorted(status_counts.items(), key=lambda kv: -kv[1]):
pct = 100 * c / n
print(f" {status:<55} {c:>4} ({pct:>4.1f}%)")
print()
print("=" * 72)
print("PER-PACK (sorted by status, then name)")
print("=" * 72)
print(f" {'pack':<46} {'test':>5} {'model':<20} status")
for stem, has_test, ms, status, _p in sorted(
rows, key=lambda r: (r[3], r[0])
):
t = "yes" if has_test else "-"
print(f" {stem:<46} {t:>5} {ms:<20} {status}")
parity_rows = [(s, p) for (s, _t, _m, _st, p) in rows if p]
if parity_rows:
print()
print("=" * 72)
print(f"PARITY โ fog signal + channel spread "
f"({len(parity_rows)} packs with perception-sweep data)")
print("=" * 72)
print(f" {'pack':<46}{'fog-pen':>9}{'chan-spread':>13} note")
for stem, p in sorted(parity_rows, key=lambda r: r[0]):
fp = p.get("fog_pen")
cs = p.get("chan_spread")
notes = []
if fp is not None and abs(fp) < 0.05:
notes.append("FOG-DEAD")
if cs is not None and cs > 0.15:
notes.append("CHANNEL-DIVERGENT")
fps = f"{fp:>+9.3f}" if fp is not None else f"{'n/a':>9}"
css = f"{cs:>13.3f}" if cs is not None else f"{'n/a':>13}"
print(f" {stem:<46}{fps}{css} {', '.join(notes) or 'ok'}")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv))
|