feather-a10-runtime / overlay /scripts /cycle_executor.py
Jackoatmon's picture
Update Feather training runtime image
951f760 verified
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import importlib.util
import importlib
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any
from scripts.benchmark_preflight import build_readiness_report
from scripts.hf_routing import resolve_routing
REPO_ROOT = Path(__file__).resolve().parents[1]
FREEZE_PATH = REPO_ROOT / "artifacts" / "cycle_1_execution_freeze.json"
RUNNER_PATH = REPO_ROOT / "scripts" / "benchmark_runner.py"
def active_hf_token() -> str | None:
token = os.environ.get("HF_TOKEN")
if token:
return token
try:
from huggingface_hub.utils import get_token
return get_token()
except Exception:
return None
def missing_benchmark_dependencies() -> list[str]:
required = ["mamba_ssm", "transformers"]
missing: list[str] = []
for name in required:
try:
spec = importlib.util.find_spec(name)
except (ImportError, ValueError):
spec = None
if spec is None:
try:
importlib.import_module(name)
except Exception:
missing.append(name)
return missing
def load_cycle_freeze(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def load_cycle_benchmarks(path: Path) -> list[str]:
payload = json.loads(path.read_text(encoding="utf-8"))
out: list[str] = []
for section in ("coding_benchmarks", "reasoning_benchmarks"):
for slot in ("fast_iteration", "milestone"):
entry = payload.get(section, {}).get(slot)
if isinstance(entry, dict) and entry.get("name"):
out.append(str(entry["name"]))
return out
def build_preflight_report(
*,
cache_dir: Path,
output_repo: str | None = None,
tokenizer_repo: str | None = None,
) -> dict[str, object]:
return build_readiness_report(
cache_dir=cache_dir,
hf_token_present=bool(active_hf_token()),
dependencies_present=not bool(missing_benchmark_dependencies()),
missing_dependencies=missing_benchmark_dependencies(),
output_repo=output_repo,
tokenizer_repo=tokenizer_repo,
)
def write_preflight_report(path: Path, payload: dict[str, object]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
def write_cycle_summary(path: Path, payload: list[dict[str, Any]]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
def build_remote_checkpoint_report(output_repo: str, token: str | None) -> dict[str, Any]:
from huggingface_hub import HfApi
from scripts.benchmark_checkpoint_report import build_checkpoint_report
files = HfApi(token=token).list_repo_files(repo_id=output_repo, repo_type="model", token=token)
return build_checkpoint_report(files)
def ensure_benchmark_assets(
*,
cache_dir: Path,
output_repo: str,
tokenizer_repo: str,
token: str | None,
hydrate: bool,
) -> dict[str, str] | None:
if not hydrate:
return None
from scripts.benchmark_assets import hydrate_benchmark_assets
return hydrate_benchmark_assets(
cache_dir=cache_dir,
output_repo=output_repo,
tokenizer_repo=tokenizer_repo,
token=token,
)
def build_benchmark_command(
freeze: dict[str, Any],
*,
benchmark: str,
variant: str,
seed: int,
out_dir: Path,
) -> tuple[list[str], dict[str, str]]:
variant_cfg = freeze["variants"][variant]
env = os.environ.copy()
env.update({str(k): str(v) for k, v in variant_cfg.get("env", {}).items()})
env["HYDRA_SEED"] = str(seed)
out_dir.mkdir(parents=True, exist_ok=True)
result_path = out_dir / f"{benchmark.lower()}_{variant}_seed{seed}.json"
ledger_path = out_dir / "benchmark_ledger.json"
cmd = [
sys.executable,
str(RUNNER_PATH),
"--benchmark",
benchmark,
"--generator-mode",
"hydra",
"--out",
str(result_path),
"--ledger",
str(ledger_path),
"--variant",
variant,
"--seed",
str(seed),
]
return cmd, env
def build_cycle_plan(freeze: dict[str, Any], *, benchmark: str, out_dir: Path) -> list[dict[str, Any]]:
runnable_variants = [
name for name, cfg in freeze.get("variants", {}).items()
if isinstance(cfg, dict) and cfg.get("status") == "runnable_now"
]
seeds = [int(seed) for seed in freeze.get("seeds", [])]
plan: list[dict[str, Any]] = []
for variant in runnable_variants:
for seed in seeds:
cmd, env = build_benchmark_command(
freeze,
benchmark=benchmark,
variant=variant,
seed=seed,
out_dir=out_dir,
)
plan.append({
"benchmark": benchmark,
"variant": variant,
"seed": seed,
"command": cmd,
"env": env,
})
return plan
def execute_cycle_plan(plan: list[dict[str, Any]], *, repo_root: Path) -> list[dict[str, Any]]:
results: list[dict[str, Any]] = []
for item in plan:
proc = subprocess.run(item["command"], cwd=str(repo_root), env=item["env"])
results.append(
{
"benchmark": item["benchmark"],
"variant": item["variant"],
"seed": item["seed"],
"returncode": proc.returncode,
}
)
return results
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Execute a frozen Cycle 1 benchmark run")
parser.add_argument("--freeze", type=Path, default=FREEZE_PATH)
parser.add_argument("--suite", type=Path, default=REPO_ROOT / "artifacts" / "benchmark_suite.cycle1.json")
parser.add_argument("--benchmark", required=True)
parser.add_argument("--variant", required=True)
parser.add_argument("--seed", type=int, required=True)
parser.add_argument("--out-dir", type=Path, default=REPO_ROOT / "artifacts" / "runs")
parser.add_argument("--preflight-out", type=Path)
parser.add_argument("--summary-out", type=Path)
parser.add_argument("--hydrate-assets", action="store_true")
parser.add_argument("--all-runnable", action="store_true")
parser.add_argument("--all-benchmarks", action="store_true")
parser.add_argument("--require-ready", action="store_true")
parser.add_argument("--output-repo")
parser.add_argument("--tokenizer-repo")
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
cache_dir = Path(os.path.expanduser("~/.cache/autoresearch"))
report = None
token = active_hf_token()
routing = resolve_routing(token=token)
output_repo = args.output_repo or routing.output_repo
tokenizer_repo = args.tokenizer_repo or routing.output_repo
if args.hydrate_assets:
try:
ensure_benchmark_assets(
cache_dir=cache_dir,
output_repo=output_repo,
tokenizer_repo=tokenizer_repo,
token=token,
hydrate=True,
)
except FileNotFoundError as exc:
checkpoint_report = None
try:
checkpoint_report = build_remote_checkpoint_report(output_repo, token)
except Exception:
checkpoint_report = None
if args.summary_out is not None:
write_cycle_summary(
args.summary_out,
[{
"status": "blocked",
"reason": "asset_hydration_failed",
"error": str(exc),
"checkpoint_candidates": checkpoint_report,
}],
)
return 3
if args.preflight_out is not None:
report = build_preflight_report(
cache_dir=cache_dir,
output_repo=output_repo,
tokenizer_repo=tokenizer_repo,
)
write_preflight_report(args.preflight_out, report)
if args.require_ready:
if report is None:
report = build_preflight_report(
cache_dir=cache_dir,
output_repo=output_repo,
tokenizer_repo=tokenizer_repo,
)
if not bool(report.get("ready_for_hydra_benchmarks")):
checkpoint_report = None
try:
checkpoint_report = build_remote_checkpoint_report(output_repo, token)
except Exception:
checkpoint_report = None
if args.summary_out is not None:
write_cycle_summary(
args.summary_out,
[{
"status": "blocked",
"reason": "preflight_not_ready",
"preflight": report,
"checkpoint_candidates": checkpoint_report,
}],
)
return 2
freeze = load_cycle_freeze(args.freeze)
if args.all_runnable:
benchmarks = load_cycle_benchmarks(args.suite) if args.all_benchmarks else [args.benchmark]
plan = []
for benchmark in benchmarks:
plan.extend(build_cycle_plan(freeze, benchmark=benchmark, out_dir=args.out_dir))
results = execute_cycle_plan(plan, repo_root=REPO_ROOT)
if args.summary_out is not None:
write_cycle_summary(args.summary_out, results)
return 0 if all(item["returncode"] == 0 for item in results) else 1
cmd, env = build_benchmark_command(
freeze,
benchmark=args.benchmark,
variant=args.variant,
seed=args.seed,
out_dir=args.out_dir,
)
proc = subprocess.run(cmd, cwd=str(REPO_ROOT), env=env)
if args.summary_out is not None:
write_cycle_summary(
args.summary_out,
[{
"benchmark": args.benchmark,
"variant": args.variant,
"seed": args.seed,
"returncode": proc.returncode,
}],
)
return proc.returncode
if __name__ == "__main__":
raise SystemExit(main())