Spaces:
Restarting
Restarting
| """ | |
| benchmark_loader.py — Load and sample from the Phase B benchmark prompt pool. | |
| The pool (benchmark_pool.yaml) contains 17 categories × 4 chains × 3 layers | |
| = 204 prompts, with 8 conceptual threads woven through in a near-uniform | |
| bipartite (each thread spans 7-10 categories). Subversion is in | |
| priority_categories so it's force-included in every per-run sample. | |
| This loader: | |
| 1. Loads the pool YAML | |
| 2. Builds (Q1, Q2, Q3) chain triples — each Q2 and Q3 references the | |
| same parent Q1 by `parent` field (Q3 is a sibling to Q2 under Q1, | |
| not a Q1→Q2→Q3 lineage) | |
| 3. Samples N chains per run with stratification discipline: | |
| - Force-include 1 chain from each priority category (subversion) | |
| - At most 2 chains per category (prevents category dominance) | |
| - At least 3 threads with 2+ representatives (cross-category co-firing) | |
| - Multi-complexity coverage across all 3 layers | |
| 4. Returns interleaved Q1/Q2/Q3 turn sequence: | |
| turns 0..N-1: Q1s (one per sampled chain, in chain order) | |
| turns N..2N-1: matching Q2s (same chain order) | |
| turns 2N..3N-1: matching Q3s (same chain order) | |
| 5. Returns same-cat pair indices for the heatmap math. Phase A semantics | |
| preserved: pairs are Q1↔Q2 only `[(i, i+N) for i in range(N)]`. | |
| Q3 turns contribute to substrate but aren't part of the strict same- | |
| cat-reselect calculation. Future work (Option B) can add Q1↔Q3 and | |
| Q2↔Q3 pairings. | |
| # ---- Changelog ---- | |
| # [2026-05-10] Claude Opus 4.7 — Phase A loader (Q1/Q2 pairs, 10 cats) | |
| # [2026-05-11] Claude Opus 4.7 — Phase B loader (Q1/Q2/Q3 chains, 17 cats, | |
| # priority_categories). Function renamed sample_pairs → | |
| # sample_chains. Returns 24-turn interleave (3 layers × 8 | |
| # chains). Subversion is forced in every sample to give | |
| # substrate consistent expectation-subverting content | |
| # exposure for the surprise-axis hypothesis test. | |
| # ------------------- | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import random | |
| from collections import Counter | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import yaml | |
| _DEFAULT_POOL_PATH = os.path.join( | |
| os.path.dirname(os.path.abspath(__file__)), | |
| "benchmark_pool.yaml", | |
| ) | |
| def load_pool(path: Optional[str] = None) -> Dict[str, Any]: | |
| """Load the benchmark pool YAML. | |
| Returns a dict with keys: | |
| threads: list of thread names (8 entries) | |
| complexity_levels: list of complexity tags (6 entries) | |
| priority_categories: list of categories that must appear in every | |
| per-run sample (typically just ["subversion"]) | |
| q1_layer: list of 68 Q1 dicts (id, category, thread, | |
| complexity, text) | |
| q2_layer: list of 68 Q2 dicts (adds: parent → Q1 id) | |
| q3_layer: list of 68 Q3 dicts (parent → Q1 id; Q3 is | |
| sibling to Q2 under Q1) | |
| """ | |
| p = path or _DEFAULT_POOL_PATH | |
| with open(p) as f: | |
| return yaml.safe_load(f) | |
| def _build_chains( | |
| pool: Dict[str, Any], | |
| ) -> List[Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]]]: | |
| """Build (Q1, Q2, Q3) chain triples from the pool. | |
| Each Q2's and Q3's `parent` field references its Q1's `id`. Chains | |
| without a complete (Q1, Q2, Q3) triple are skipped; Phase B | |
| discipline guarantees full triples but defensive code stays. | |
| """ | |
| q2_by_parent = {q["parent"]: q for q in pool.get("q2_layer", [])} | |
| q3_by_parent = {q["parent"]: q for q in pool.get("q3_layer", [])} | |
| chains: List[ | |
| Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]] | |
| ] = [] | |
| for q1 in pool.get("q1_layer", []): | |
| q2 = q2_by_parent.get(q1["id"]) | |
| q3 = q3_by_parent.get(q1["id"]) | |
| if q2 is not None and q3 is not None: | |
| chains.append((q1, q2, q3)) | |
| return chains | |
| def _validate_sample( | |
| sample: List[Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]]], | |
| max_per_category: int = 2, | |
| min_threads_with_dups: int = 3, | |
| min_distinct_complexity_levels: int = 3, | |
| ) -> bool: | |
| """Stratification discipline check. | |
| Returns True if the sample respects: | |
| - max `max_per_category` chains per category (default 2) | |
| - at least `min_threads_with_dups` threads with 2+ instances (3) | |
| - at least `min_distinct_complexity_levels` distinct complexity | |
| tags across the sample's combined Q1+Q2+Q3 levels (3) | |
| Counted across (Q1, Q2, Q3) chain triples. Each chain contributes | |
| its (single) thread once and contributes 3 complexity tags (one per | |
| layer). | |
| """ | |
| if not sample: | |
| return False | |
| cats = Counter(q1["category"] for q1, _q2, _q3 in sample) | |
| if any(count > max_per_category for count in cats.values()): | |
| return False | |
| threads = Counter(q1["thread"] for q1, _q2, _q3 in sample) | |
| threads_with_dups = sum( | |
| 1 for count in threads.values() if count >= 2 | |
| ) | |
| if threads_with_dups < min_threads_with_dups: | |
| return False | |
| complexities: set = set() | |
| for q1, q2, q3 in sample: | |
| complexities.add(q1["complexity"]) | |
| complexities.add(q2["complexity"]) | |
| complexities.add(q3["complexity"]) | |
| if len(complexities) < min_distinct_complexity_levels: | |
| return False | |
| return True | |
| def sample_chains( | |
| pool: Optional[Dict[str, Any]] = None, | |
| n_chains: int = 8, | |
| seed: Optional[int] = None, | |
| max_attempts: int = 200, | |
| ) -> Tuple[ | |
| List[Tuple[str, str]], | |
| List[Tuple[int, int]], | |
| List[Dict[str, Any]], | |
| ]: | |
| """Sample `n_chains` chains with stratification + priority discipline. | |
| Args: | |
| pool: Pre-loaded pool dict. If None, loads from default path. | |
| n_chains: Total number of (Q1, Q2, Q3) chains to sample. Each | |
| chain contributes 3 turns, so total turns = 3 * n_chains. | |
| Phase B default 8 chains → 24 turns/run. | |
| seed: RNG seed for reproducibility. None = nondeterministic. | |
| max_attempts: Rejection-sampling retry budget on the non-priority | |
| portion of the sample. | |
| Returns: | |
| interleaved_questions: list of (category, prompt_text) tuples, | |
| 3*n_chains entries. Turn structure: | |
| 0..n-1: Q1s | |
| n..2n-1: Q2s (matching, same order) | |
| 2n..3n-1: Q3s (matching, same order) | |
| same_cat_pairs: list of (q1_turn_idx, q2_turn_idx) tuples, | |
| n_chains entries. Phase A semantics: | |
| always [(i, i+n_chains) for i in range(n)]. | |
| Q3 turns aren't paired here (Option A from | |
| 2026-05-11; future Option B can add Q1↔Q3 | |
| and Q2↔Q3 pairs). | |
| sample_meta: list of n_chains dicts with q1_id, q2_id, | |
| q3_id, category, thread, q1_complexity, | |
| q2_complexity, q3_complexity. | |
| Priority categories (from pool["priority_categories"]) are force- | |
| included: one chain from each priority category is pre-selected | |
| before rejection sampling fills the remaining slots from the non- | |
| priority pool. Stratification is checked on the COMBINED final | |
| sample, so the forced chain's thread/complexity contribute to the | |
| constraint accounting. | |
| """ | |
| if pool is None: | |
| pool = load_pool() | |
| chains = _build_chains(pool) | |
| if len(chains) < n_chains: | |
| raise ValueError( | |
| f"Pool has {len(chains)} chains, cannot sample {n_chains}" | |
| ) | |
| priority_cats: List[str] = pool.get("priority_categories", []) or [] | |
| rng = random.Random(seed) | |
| # Step 1 — Pre-select forced chains from priority categories | |
| forced: List[ | |
| Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]] | |
| ] = [] | |
| for cat in priority_cats: | |
| cat_chains = [c for c in chains if c[0]["category"] == cat] | |
| if cat_chains: | |
| forced.append(rng.choice(cat_chains)) | |
| # Step 2 — Fill remaining slots from non-priority chains via | |
| # rejection sampling against the COMBINED (forced + sampled) total | |
| n_remaining = n_chains - len(forced) | |
| if n_remaining < 0: | |
| raise ValueError( | |
| f"More priority categories ({len(forced)}) than n_chains " | |
| f"({n_chains}); reduce priority list or raise n_chains" | |
| ) | |
| forced_ids = {c[0]["id"] for c in forced} | |
| non_priority = [c for c in chains if c[0]["id"] not in forced_ids] | |
| selected: Optional[ | |
| List[Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]]] | |
| ] = None | |
| for _attempt in range(max_attempts): | |
| if n_remaining > 0: | |
| candidate_remaining = rng.sample(non_priority, n_remaining) | |
| else: | |
| candidate_remaining = [] | |
| candidate = forced + candidate_remaining | |
| if _validate_sample(candidate): | |
| selected = candidate | |
| break | |
| if selected is None: | |
| # Fallback — accept partial constraint satisfaction rather than | |
| # aborting. Forced chains still included; remaining slots filled | |
| # by best-effort random draw. | |
| if n_remaining > 0: | |
| selected = forced + rng.sample(non_priority, n_remaining) | |
| else: | |
| selected = list(forced) | |
| interleaved: List[Tuple[str, str]] = [] | |
| for q1, _q2, _q3 in selected: | |
| interleaved.append((q1["category"], q1["text"])) | |
| for _q1, q2, _q3 in selected: | |
| interleaved.append((q2["category"], q2["text"])) | |
| for _q1, _q2, q3 in selected: | |
| interleaved.append((q3["category"], q3["text"])) | |
| same_cat_pairs: List[Tuple[int, int]] = [ | |
| (i, i + n_chains) for i in range(n_chains) | |
| ] | |
| sample_meta: List[Dict[str, Any]] = [] | |
| for q1, q2, q3 in selected: | |
| sample_meta.append({ | |
| "q1_id": q1["id"], | |
| "q2_id": q2["id"], | |
| "q3_id": q3["id"], | |
| "category": q1["category"], | |
| "thread": q1["thread"], | |
| "q1_complexity": q1["complexity"], | |
| "q2_complexity": q2["complexity"], | |
| "q3_complexity": q3["complexity"], | |
| }) | |
| return interleaved, same_cat_pairs, sample_meta | |
| def describe_sample( | |
| sample_meta: List[Dict[str, Any]], | |
| ) -> Dict[str, Any]: | |
| """Produce a small structured summary of a sample for logging. | |
| Used by the benchmark to surface in JSON output what was actually | |
| sampled this run — useful for correlating per-run substrate | |
| behavior with which threads / categories / complexity registers | |
| were exercised, and for confirming priority_categories are | |
| being respected. | |
| """ | |
| cats = Counter(m["category"] for m in sample_meta) | |
| threads = Counter(m["thread"] for m in sample_meta) | |
| complexities: Counter = Counter() | |
| for m in sample_meta: | |
| complexities[m["q1_complexity"]] += 1 | |
| complexities[m["q2_complexity"]] += 1 | |
| complexities[m["q3_complexity"]] += 1 | |
| return { | |
| "n_chains": len(sample_meta), | |
| "n_turns": 3 * len(sample_meta), | |
| "categories_sampled": dict(cats), | |
| "threads_sampled": dict(threads), | |
| "complexity_distribution": dict(complexities), | |
| "chain_ids": [ | |
| (m["q1_id"], m["q2_id"], m["q3_id"]) for m in sample_meta | |
| ], | |
| } | |