#!/usr/bin/env python3 from __future__ import annotations import argparse import importlib.util import importlib import json import os import subprocess import sys from pathlib import Path from typing import Any from scripts.benchmark_preflight import build_readiness_report from scripts.hf_routing import resolve_routing REPO_ROOT = Path(__file__).resolve().parents[1] FREEZE_PATH = REPO_ROOT / "artifacts" / "cycle_1_execution_freeze.json" RUNNER_PATH = REPO_ROOT / "scripts" / "benchmark_runner.py" def active_hf_token() -> str | None: token = os.environ.get("HF_TOKEN") if token: return token try: from huggingface_hub.utils import get_token return get_token() except Exception: return None def missing_benchmark_dependencies() -> list[str]: required = ["mamba_ssm", "transformers"] missing: list[str] = [] for name in required: try: spec = importlib.util.find_spec(name) except (ImportError, ValueError): spec = None if spec is None: try: importlib.import_module(name) except Exception: missing.append(name) return missing def load_cycle_freeze(path: Path) -> dict[str, Any]: return json.loads(path.read_text(encoding="utf-8")) def load_cycle_benchmarks(path: Path) -> list[str]: payload = json.loads(path.read_text(encoding="utf-8")) out: list[str] = [] for section in ("coding_benchmarks", "reasoning_benchmarks"): for slot in ("fast_iteration", "milestone"): entry = payload.get(section, {}).get(slot) if isinstance(entry, dict) and entry.get("name"): out.append(str(entry["name"])) return out def build_preflight_report( *, cache_dir: Path, output_repo: str | None = None, tokenizer_repo: str | None = None, ) -> dict[str, object]: return build_readiness_report( cache_dir=cache_dir, hf_token_present=bool(active_hf_token()), dependencies_present=not bool(missing_benchmark_dependencies()), missing_dependencies=missing_benchmark_dependencies(), output_repo=output_repo, tokenizer_repo=tokenizer_repo, ) def write_preflight_report(path: Path, payload: dict[str, object]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") def write_cycle_summary(path: Path, payload: list[dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") def build_remote_checkpoint_report(output_repo: str, token: str | None) -> dict[str, Any]: from huggingface_hub import HfApi from scripts.benchmark_checkpoint_report import build_checkpoint_report files = HfApi(token=token).list_repo_files(repo_id=output_repo, repo_type="model", token=token) return build_checkpoint_report(files) def ensure_benchmark_assets( *, cache_dir: Path, output_repo: str, tokenizer_repo: str, token: str | None, hydrate: bool, ) -> dict[str, str] | None: if not hydrate: return None from scripts.benchmark_assets import hydrate_benchmark_assets return hydrate_benchmark_assets( cache_dir=cache_dir, output_repo=output_repo, tokenizer_repo=tokenizer_repo, token=token, ) def build_benchmark_command( freeze: dict[str, Any], *, benchmark: str, variant: str, seed: int, out_dir: Path, ) -> tuple[list[str], dict[str, str]]: variant_cfg = freeze["variants"][variant] env = os.environ.copy() env.update({str(k): str(v) for k, v in variant_cfg.get("env", {}).items()}) env["HYDRA_SEED"] = str(seed) out_dir.mkdir(parents=True, exist_ok=True) result_path = out_dir / f"{benchmark.lower()}_{variant}_seed{seed}.json" ledger_path = out_dir / "benchmark_ledger.json" cmd = [ sys.executable, str(RUNNER_PATH), "--benchmark", benchmark, "--generator-mode", "hydra", "--out", str(result_path), "--ledger", str(ledger_path), "--variant", variant, "--seed", str(seed), ] return cmd, env def build_cycle_plan(freeze: dict[str, Any], *, benchmark: str, out_dir: Path) -> list[dict[str, Any]]: runnable_variants = [ name for name, cfg in freeze.get("variants", {}).items() if isinstance(cfg, dict) and cfg.get("status") == "runnable_now" ] seeds = [int(seed) for seed in freeze.get("seeds", [])] plan: list[dict[str, Any]] = [] for variant in runnable_variants: for seed in seeds: cmd, env = build_benchmark_command( freeze, benchmark=benchmark, variant=variant, seed=seed, out_dir=out_dir, ) plan.append({ "benchmark": benchmark, "variant": variant, "seed": seed, "command": cmd, "env": env, }) return plan def execute_cycle_plan(plan: list[dict[str, Any]], *, repo_root: Path) -> list[dict[str, Any]]: results: list[dict[str, Any]] = [] for item in plan: proc = subprocess.run(item["command"], cwd=str(repo_root), env=item["env"]) results.append( { "benchmark": item["benchmark"], "variant": item["variant"], "seed": item["seed"], "returncode": proc.returncode, } ) return results def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Execute a frozen Cycle 1 benchmark run") parser.add_argument("--freeze", type=Path, default=FREEZE_PATH) parser.add_argument("--suite", type=Path, default=REPO_ROOT / "artifacts" / "benchmark_suite.cycle1.json") parser.add_argument("--benchmark", required=True) parser.add_argument("--variant", required=True) parser.add_argument("--seed", type=int, required=True) parser.add_argument("--out-dir", type=Path, default=REPO_ROOT / "artifacts" / "runs") parser.add_argument("--preflight-out", type=Path) parser.add_argument("--summary-out", type=Path) parser.add_argument("--hydrate-assets", action="store_true") parser.add_argument("--all-runnable", action="store_true") parser.add_argument("--all-benchmarks", action="store_true") parser.add_argument("--require-ready", action="store_true") parser.add_argument("--output-repo") parser.add_argument("--tokenizer-repo") return parser.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = parse_args(argv) cache_dir = Path(os.path.expanduser("~/.cache/autoresearch")) report = None token = active_hf_token() routing = resolve_routing(token=token) output_repo = args.output_repo or routing.output_repo tokenizer_repo = args.tokenizer_repo or routing.output_repo if args.hydrate_assets: try: ensure_benchmark_assets( cache_dir=cache_dir, output_repo=output_repo, tokenizer_repo=tokenizer_repo, token=token, hydrate=True, ) except FileNotFoundError as exc: checkpoint_report = None try: checkpoint_report = build_remote_checkpoint_report(output_repo, token) except Exception: checkpoint_report = None if args.summary_out is not None: write_cycle_summary( args.summary_out, [{ "status": "blocked", "reason": "asset_hydration_failed", "error": str(exc), "checkpoint_candidates": checkpoint_report, }], ) return 3 if args.preflight_out is not None: report = build_preflight_report( cache_dir=cache_dir, output_repo=output_repo, tokenizer_repo=tokenizer_repo, ) write_preflight_report(args.preflight_out, report) if args.require_ready: if report is None: report = build_preflight_report( cache_dir=cache_dir, output_repo=output_repo, tokenizer_repo=tokenizer_repo, ) if not bool(report.get("ready_for_hydra_benchmarks")): checkpoint_report = None try: checkpoint_report = build_remote_checkpoint_report(output_repo, token) except Exception: checkpoint_report = None if args.summary_out is not None: write_cycle_summary( args.summary_out, [{ "status": "blocked", "reason": "preflight_not_ready", "preflight": report, "checkpoint_candidates": checkpoint_report, }], ) return 2 freeze = load_cycle_freeze(args.freeze) if args.all_runnable: benchmarks = load_cycle_benchmarks(args.suite) if args.all_benchmarks else [args.benchmark] plan = [] for benchmark in benchmarks: plan.extend(build_cycle_plan(freeze, benchmark=benchmark, out_dir=args.out_dir)) results = execute_cycle_plan(plan, repo_root=REPO_ROOT) if args.summary_out is not None: write_cycle_summary(args.summary_out, results) return 0 if all(item["returncode"] == 0 for item in results) else 1 cmd, env = build_benchmark_command( freeze, benchmark=args.benchmark, variant=args.variant, seed=args.seed, out_dir=args.out_dir, ) proc = subprocess.run(cmd, cwd=str(REPO_ROOT), env=env) if args.summary_out is not None: write_cycle_summary( args.summary_out, [{ "benchmark": args.benchmark, "variant": args.variant, "seed": args.seed, "returncode": proc.returncode, }], ) return proc.returncode if __name__ == "__main__": raise SystemExit(main())