Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| import importlib.util | |
| import importlib | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| from typing import Any | |
| from scripts.benchmark_preflight import build_readiness_report | |
| from scripts.hf_routing import resolve_routing | |
| REPO_ROOT = Path(__file__).resolve().parents[1] | |
| FREEZE_PATH = REPO_ROOT / "artifacts" / "cycle_1_execution_freeze.json" | |
| RUNNER_PATH = REPO_ROOT / "scripts" / "benchmark_runner.py" | |
| def active_hf_token() -> str | None: | |
| token = os.environ.get("HF_TOKEN") | |
| if token: | |
| return token | |
| try: | |
| from huggingface_hub.utils import get_token | |
| return get_token() | |
| except Exception: | |
| return None | |
| def missing_benchmark_dependencies() -> list[str]: | |
| required = ["mamba_ssm", "transformers"] | |
| missing: list[str] = [] | |
| for name in required: | |
| try: | |
| spec = importlib.util.find_spec(name) | |
| except (ImportError, ValueError): | |
| spec = None | |
| if spec is None: | |
| try: | |
| importlib.import_module(name) | |
| except Exception: | |
| missing.append(name) | |
| return missing | |
| def load_cycle_freeze(path: Path) -> dict[str, Any]: | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| def load_cycle_benchmarks(path: Path) -> list[str]: | |
| payload = json.loads(path.read_text(encoding="utf-8")) | |
| out: list[str] = [] | |
| for section in ("coding_benchmarks", "reasoning_benchmarks"): | |
| for slot in ("fast_iteration", "milestone"): | |
| entry = payload.get(section, {}).get(slot) | |
| if isinstance(entry, dict) and entry.get("name"): | |
| out.append(str(entry["name"])) | |
| return out | |
| def build_preflight_report( | |
| *, | |
| cache_dir: Path, | |
| output_repo: str | None = None, | |
| tokenizer_repo: str | None = None, | |
| ) -> dict[str, object]: | |
| return build_readiness_report( | |
| cache_dir=cache_dir, | |
| hf_token_present=bool(active_hf_token()), | |
| dependencies_present=not bool(missing_benchmark_dependencies()), | |
| missing_dependencies=missing_benchmark_dependencies(), | |
| output_repo=output_repo, | |
| tokenizer_repo=tokenizer_repo, | |
| ) | |
| def write_preflight_report(path: Path, payload: dict[str, object]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") | |
| def write_cycle_summary(path: Path, payload: list[dict[str, Any]]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") | |
| def build_remote_checkpoint_report(output_repo: str, token: str | None) -> dict[str, Any]: | |
| from huggingface_hub import HfApi | |
| from scripts.benchmark_checkpoint_report import build_checkpoint_report | |
| files = HfApi(token=token).list_repo_files(repo_id=output_repo, repo_type="model", token=token) | |
| return build_checkpoint_report(files) | |
| def ensure_benchmark_assets( | |
| *, | |
| cache_dir: Path, | |
| output_repo: str, | |
| tokenizer_repo: str, | |
| token: str | None, | |
| hydrate: bool, | |
| ) -> dict[str, str] | None: | |
| if not hydrate: | |
| return None | |
| from scripts.benchmark_assets import hydrate_benchmark_assets | |
| return hydrate_benchmark_assets( | |
| cache_dir=cache_dir, | |
| output_repo=output_repo, | |
| tokenizer_repo=tokenizer_repo, | |
| token=token, | |
| ) | |
| def build_benchmark_command( | |
| freeze: dict[str, Any], | |
| *, | |
| benchmark: str, | |
| variant: str, | |
| seed: int, | |
| out_dir: Path, | |
| ) -> tuple[list[str], dict[str, str]]: | |
| variant_cfg = freeze["variants"][variant] | |
| env = os.environ.copy() | |
| env.update({str(k): str(v) for k, v in variant_cfg.get("env", {}).items()}) | |
| env["HYDRA_SEED"] = str(seed) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| result_path = out_dir / f"{benchmark.lower()}_{variant}_seed{seed}.json" | |
| ledger_path = out_dir / "benchmark_ledger.json" | |
| cmd = [ | |
| sys.executable, | |
| str(RUNNER_PATH), | |
| "--benchmark", | |
| benchmark, | |
| "--generator-mode", | |
| "hydra", | |
| "--out", | |
| str(result_path), | |
| "--ledger", | |
| str(ledger_path), | |
| "--variant", | |
| variant, | |
| "--seed", | |
| str(seed), | |
| ] | |
| return cmd, env | |
| def build_cycle_plan(freeze: dict[str, Any], *, benchmark: str, out_dir: Path) -> list[dict[str, Any]]: | |
| runnable_variants = [ | |
| name for name, cfg in freeze.get("variants", {}).items() | |
| if isinstance(cfg, dict) and cfg.get("status") == "runnable_now" | |
| ] | |
| seeds = [int(seed) for seed in freeze.get("seeds", [])] | |
| plan: list[dict[str, Any]] = [] | |
| for variant in runnable_variants: | |
| for seed in seeds: | |
| cmd, env = build_benchmark_command( | |
| freeze, | |
| benchmark=benchmark, | |
| variant=variant, | |
| seed=seed, | |
| out_dir=out_dir, | |
| ) | |
| plan.append({ | |
| "benchmark": benchmark, | |
| "variant": variant, | |
| "seed": seed, | |
| "command": cmd, | |
| "env": env, | |
| }) | |
| return plan | |
| def execute_cycle_plan(plan: list[dict[str, Any]], *, repo_root: Path) -> list[dict[str, Any]]: | |
| results: list[dict[str, Any]] = [] | |
| for item in plan: | |
| proc = subprocess.run(item["command"], cwd=str(repo_root), env=item["env"]) | |
| results.append( | |
| { | |
| "benchmark": item["benchmark"], | |
| "variant": item["variant"], | |
| "seed": item["seed"], | |
| "returncode": proc.returncode, | |
| } | |
| ) | |
| return results | |
| def parse_args(argv: list[str] | None = None) -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Execute a frozen Cycle 1 benchmark run") | |
| parser.add_argument("--freeze", type=Path, default=FREEZE_PATH) | |
| parser.add_argument("--suite", type=Path, default=REPO_ROOT / "artifacts" / "benchmark_suite.cycle1.json") | |
| parser.add_argument("--benchmark", required=True) | |
| parser.add_argument("--variant", required=True) | |
| parser.add_argument("--seed", type=int, required=True) | |
| parser.add_argument("--out-dir", type=Path, default=REPO_ROOT / "artifacts" / "runs") | |
| parser.add_argument("--preflight-out", type=Path) | |
| parser.add_argument("--summary-out", type=Path) | |
| parser.add_argument("--hydrate-assets", action="store_true") | |
| parser.add_argument("--all-runnable", action="store_true") | |
| parser.add_argument("--all-benchmarks", action="store_true") | |
| parser.add_argument("--require-ready", action="store_true") | |
| parser.add_argument("--output-repo") | |
| parser.add_argument("--tokenizer-repo") | |
| return parser.parse_args(argv) | |
| def main(argv: list[str] | None = None) -> int: | |
| args = parse_args(argv) | |
| cache_dir = Path(os.path.expanduser("~/.cache/autoresearch")) | |
| report = None | |
| token = active_hf_token() | |
| routing = resolve_routing(token=token) | |
| output_repo = args.output_repo or routing.output_repo | |
| tokenizer_repo = args.tokenizer_repo or routing.output_repo | |
| if args.hydrate_assets: | |
| try: | |
| ensure_benchmark_assets( | |
| cache_dir=cache_dir, | |
| output_repo=output_repo, | |
| tokenizer_repo=tokenizer_repo, | |
| token=token, | |
| hydrate=True, | |
| ) | |
| except FileNotFoundError as exc: | |
| checkpoint_report = None | |
| try: | |
| checkpoint_report = build_remote_checkpoint_report(output_repo, token) | |
| except Exception: | |
| checkpoint_report = None | |
| if args.summary_out is not None: | |
| write_cycle_summary( | |
| args.summary_out, | |
| [{ | |
| "status": "blocked", | |
| "reason": "asset_hydration_failed", | |
| "error": str(exc), | |
| "checkpoint_candidates": checkpoint_report, | |
| }], | |
| ) | |
| return 3 | |
| if args.preflight_out is not None: | |
| report = build_preflight_report( | |
| cache_dir=cache_dir, | |
| output_repo=output_repo, | |
| tokenizer_repo=tokenizer_repo, | |
| ) | |
| write_preflight_report(args.preflight_out, report) | |
| if args.require_ready: | |
| if report is None: | |
| report = build_preflight_report( | |
| cache_dir=cache_dir, | |
| output_repo=output_repo, | |
| tokenizer_repo=tokenizer_repo, | |
| ) | |
| if not bool(report.get("ready_for_hydra_benchmarks")): | |
| checkpoint_report = None | |
| try: | |
| checkpoint_report = build_remote_checkpoint_report(output_repo, token) | |
| except Exception: | |
| checkpoint_report = None | |
| if args.summary_out is not None: | |
| write_cycle_summary( | |
| args.summary_out, | |
| [{ | |
| "status": "blocked", | |
| "reason": "preflight_not_ready", | |
| "preflight": report, | |
| "checkpoint_candidates": checkpoint_report, | |
| }], | |
| ) | |
| return 2 | |
| freeze = load_cycle_freeze(args.freeze) | |
| if args.all_runnable: | |
| benchmarks = load_cycle_benchmarks(args.suite) if args.all_benchmarks else [args.benchmark] | |
| plan = [] | |
| for benchmark in benchmarks: | |
| plan.extend(build_cycle_plan(freeze, benchmark=benchmark, out_dir=args.out_dir)) | |
| results = execute_cycle_plan(plan, repo_root=REPO_ROOT) | |
| if args.summary_out is not None: | |
| write_cycle_summary(args.summary_out, results) | |
| return 0 if all(item["returncode"] == 0 for item in results) else 1 | |
| cmd, env = build_benchmark_command( | |
| freeze, | |
| benchmark=args.benchmark, | |
| variant=args.variant, | |
| seed=args.seed, | |
| out_dir=args.out_dir, | |
| ) | |
| proc = subprocess.run(cmd, cwd=str(REPO_ROOT), env=env) | |
| if args.summary_out is not None: | |
| write_cycle_summary( | |
| args.summary_out, | |
| [{ | |
| "benchmark": args.benchmark, | |
| "variant": args.variant, | |
| "seed": args.seed, | |
| "returncode": proc.returncode, | |
| }], | |
| ) | |
| return proc.returncode | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |