| """Gate metric computation + machine-readable report emission. |
| |
| Phase 0 implements Gate 0 (reproducibility precondition). Later phases extend this module |
| with Gates 1-6. Each gate runner returns a report dict matching IMPLEMENTATION_SPEC §8 and |
| the agent HALTS after writing it; `human_signoff` is left null for a human to set GO. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import os |
| from pathlib import Path |
|
|
| import torch |
|
|
| ROOT = Path(__file__).resolve().parents[1] |
|
|
|
|
| class _DataUnavailable(Exception): |
| """Raised when CT pixel data for the token bank is not accessible (known gap).""" |
|
|
|
|
| def _load_job_metrics(tcfg: dict) -> dict | None: |
| """Load the HF-Job token-bank metrics JSON (local copy, else fetch from bucket).""" |
| local = tcfg.get("job_metrics_local") |
| if local and not os.path.isabs(local): |
| local = str(ROOT / local) |
| if local and os.path.exists(local): |
| with open(local) as f: |
| return json.load(f) |
| bucket = tcfg.get("job_metrics_bucket") |
| if bucket and local: |
| import subprocess |
| os.makedirs(os.path.dirname(local), exist_ok=True) |
| r = subprocess.run(["hf", "buckets", "cp", bucket, local], |
| capture_output=True, text=True) |
| if r.returncode == 0 and os.path.exists(local): |
| with open(local) as f: |
| return json.load(f) |
| return None |
|
|
|
|
| def _deterministic_ct_batch(n: int, image_size: int, seed: int) -> torch.Tensor: |
| """A fixed, seeded CT-like input batch. Reproducibility is input-agnostic, so a |
| deterministic synthetic batch validates the two-run extraction equality even when |
| real CT pixels are unavailable.""" |
| g = torch.Generator().manual_seed(seed) |
| return torch.randn(n, 3, image_size, image_size, generator=g) |
|
|
|
|
| def run_gate0(cfg: dict) -> dict: |
| from backbone.meddino import MedDINOv3Backbone |
| from data.ct_bank import build_token_bank, build_token_bank_from_tree |
|
|
| bcfg, dcfg, tcfg, rcfg = cfg["backbone"], cfg["data"], cfg["token_bank"], cfg["reproducibility"] |
| image_size = int(bcfg.get("image_size", 224)) |
| atol = float(rcfg.get("atol", 1e-4)) |
| seed = int(rcfg.get("seed", 0)) |
| target_tokens = int(tcfg.get("target_tokens", 2_000_000)) |
|
|
| metrics: list[dict] = [] |
| data_gaps: list[str] = [] |
|
|
| |
| backbone = MedDINOv3Backbone( |
| checkpoint=bcfg["checkpoint"], |
| device=rcfg.get("device", "auto"), |
| n_storage_tokens=int(bcfg.get("n_storage_tokens", 4)), |
| layerscale_init=float(bcfg.get("layerscale_init", 1e-5)), |
| qkv_bias=bool(bcfg.get("qkv_bias", False)), |
| mask_k_bias=bool(bcfg.get("mask_k_bias", True)), |
| ) |
| frozen = all(not p.requires_grad for p in backbone.model.parameters()) |
| metrics.append({ |
| "name": "backbone_loads_frozen", |
| "modality": "CT", "budget": None, |
| "value": 1.0, "ci95": None, "test": "state_dict_load_exact", |
| "threshold": 1.0, "threshold_status": "FIXED", |
| "passed": bool(frozen), |
| "detail": f"0 missing / 0 unexpected keys; frozen={frozen}; " |
| f"device={backbone.device.type}", |
| }) |
|
|
| |
| x = _deterministic_ct_batch(4, image_size, seed) |
| z1 = backbone.extract_patch_tokens(x) |
| z2 = backbone.extract_patch_tokens(x) |
| max_abs = float((z1 - z2).abs().max()) |
| metrics.append({ |
| "name": "feature_extraction_reproducible", |
| "modality": "CT", "budget": None, |
| "value": max_abs, "ci95": None, "test": "two_run_max_abs_diff", |
| "threshold": atol, "threshold_status": "FIXED", |
| "passed": bool(max_abs <= atol), |
| "detail": f"max|z1-z2|={max_abs:.3e} over shape {tuple(z1.shape)}; atol={atol:g}", |
| }) |
|
|
| |
| bank_passed = False |
| bank_detail = "" |
| image_root = dcfg.get("image_root") |
| if image_root and not os.path.isabs(image_root): |
| image_root = str(ROOT / image_root) |
| splits_local = dcfg.get("splits_local") |
| if splits_local and not os.path.isabs(splits_local): |
| splits_local = str(ROOT / splits_local) |
| have_tree = bool(image_root and os.path.isdir(image_root) and splits_local |
| and os.path.exists(splits_local)) |
|
|
| |
| |
| job_metrics = _load_job_metrics(tcfg) |
| try: |
| if job_metrics is not None: |
| n_tok = int(job_metrics.get("n_tokens", 0)) |
| bank_passed = n_tok >= target_tokens and bool( |
| job_metrics.get("backbone_loads_frozen", False)) |
| bank_value = float(n_tok) |
| bank_detail = ( |
| f"{n_tok} tokens (fp16) from {job_metrics.get('n_slices')} held-out " |
| f"'{job_metrics.get('held_out_split')}' slices " |
| f"({job_metrics.get('scans_used')} scans, dim={job_metrics.get('dim')}); " |
| f"built on HF Job [{job_metrics.get('device')}], " |
| f"bank at {job_metrics.get('bank_path')}" |
| ) |
| elif have_tree: |
| |
| res = build_token_bank_from_tree( |
| backbone=backbone, |
| image_root=image_root, |
| splits_json_path=splits_local, |
| out_path=str(ROOT / tcfg["out_path"]), |
| target_tokens=target_tokens, |
| held_out_split=tcfg.get("held_out_split", "train"), |
| image_size=image_size, |
| ) |
| if res.data_gap: |
| data_gaps.append(res.gap_reason or "token bank: no held-out slices") |
| bank_detail = res.gap_reason or "" |
| else: |
| bank_passed = res.n_tokens >= target_tokens |
| bank_detail = (f"{res.n_tokens} tokens from {res.n_slices} held-out " |
| f"'{res.meta.get('held_out_split')}' slices " |
| f"({res.meta.get('scans_used')} scans, dim={res.dim})") |
| bank_value = float(res.n_tokens) |
| elif not image_root: |
| |
| |
| raise _DataUnavailable( |
| f"No CT slice pixels accessible. configs:data.image_root is unset and " |
| f"no local LIDC mirror is present. Sync " |
| f"hf://buckets/Chucks90/eryon-datasets/raw/lidc to data.image_root " |
| f"(+ data.splits_local) to build the >=2e6-token bank." |
| ) |
| else: |
| |
| from data.loaders import download_manifest |
| cache = str(ROOT / "covtoken_cache") |
| os.makedirs(cache, exist_ok=True) |
| manifest_local = download_manifest( |
| dcfg["manifest_repo"], dcfg["manifest_path"], cache) |
| res = build_token_bank( |
| backbone=backbone, |
| manifest_local_path=manifest_local, |
| image_root=image_root, |
| out_path=str(ROOT / tcfg["out_path"]), |
| target_tokens=target_tokens, |
| held_out_split=tcfg.get("held_out_split", "train"), |
| image_size=image_size, |
| ) |
| if res.data_gap: |
| data_gaps.append(res.gap_reason or "token bank: pixels unavailable") |
| bank_detail = res.gap_reason or "" |
| else: |
| bank_passed = res.n_tokens >= target_tokens |
| bank_detail = (f"{res.n_tokens} tokens from {res.n_slices} slices " |
| f"(dim={res.dim})") |
| bank_value = float(res.n_tokens) |
| except _DataUnavailable as e: |
| data_gaps.append(str(e)) |
| bank_value = 0.0 |
| bank_detail = str(e) |
| except Exception as e: |
| data_gaps.append(f"token bank build error: {type(e).__name__}: {e}") |
| bank_value = 0.0 |
| bank_detail = f"errored: {e}" |
|
|
| metrics.append({ |
| "name": "token_bank_size", |
| "modality": "CT", "budget": None, |
| "value": bank_value, "ci95": None, "test": "count", |
| "threshold": float(target_tokens), "threshold_status": "FIXED", |
| "passed": bool(bank_passed), |
| "detail": bank_detail, |
| }) |
|
|
| |
| repro_ok = all(m["passed"] for m in metrics if m["name"] != "token_bank_size") |
| if repro_ok and bank_passed: |
| status = "PASS" |
| elif repro_ok and data_gaps: |
| |
| status = "FALLBACK" |
| else: |
| status = "FAIL" |
|
|
| report = { |
| "gate": 0, |
| "phase": "Phase 0 - Scaffolding + reproducibility", |
| "status": status, |
| "fallback_path": ( |
| "Reproducibility (backbone load + deterministic extraction) PASSES. " |
| "Token bank >= 2e6 is BLOCKED on CT pixel access (interim bucket " |
| "hf://buckets/Chucks90/eryon-datasets unreadable with provided token). " |
| "Provide a local LIDC slice mirror via configs/phase0.yaml:data.image_root, " |
| "then re-run to clear the bank criterion." |
| if status == "FALLBACK" else None |
| ), |
| "metrics": metrics, |
| "thresholds_locked_ref": None, |
| "seeds": [seed], |
| "data_gaps": data_gaps, |
| "decision_rule": ( |
| "PASS iff backbone loads frozen AND two-run max|dz|<=atol AND " |
| "token_bank>=2e6. FALLBACK iff reproducibility holds but bank is blocked " |
| "only by the known CT-pixel data-access gap." |
| ), |
| "human_signoff": None, |
| } |
| return report |
|
|
|
|
| def write_report(report: dict, path: str) -> str: |
| full = path if os.path.isabs(path) else str(ROOT / path) |
| os.makedirs(os.path.dirname(full), exist_ok=True) |
| with open(full, "w") as f: |
| json.dump(report, f, indent=2) |
| return full |
|
|