feather-a10-runtime / overlay /scripts /audit_overlay_sync.py
Jackoatmon's picture
Update Feather training runtime image
951f760 verified
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from pathlib import Path
DEFAULT_INCLUDE_PATHS = [
"hydra",
"subsystems",
"scripts",
"htm_rust",
"harness",
"configs",
"prepare.py",
"prepare_nemotron.py",
"train.py",
"pyproject.toml",
"uv.lock",
]
def _iter_files(path: Path) -> list[Path]:
if not path.exists():
return []
if path.is_file():
return [path]
return sorted(p for p in path.rglob("*") if p.is_file())
def classify_overlay_pairs(*, repo_root: Path, include_paths: list[str]) -> dict[str, list[str]]:
overlay_root = repo_root / "hf_jobs" / "feather_h200_image" / "overlay"
identical: list[str] = []
root_ahead: list[str] = []
overlay_only: list[str] = []
missing_overlay: list[str] = []
for rel in include_paths:
root_path = repo_root / rel
overlay_path = overlay_root / rel
root_files = {p.relative_to(root_path).as_posix(): p for p in _iter_files(root_path)} if root_path.exists() and root_path.is_dir() else {}
overlay_files = {p.relative_to(overlay_path).as_posix(): p for p in _iter_files(overlay_path)} if overlay_path.exists() and overlay_path.is_dir() else {}
if root_path.is_file() or overlay_path.is_file():
rel_name = rel.replace("\\", "/")
if root_path.exists() and overlay_path.exists():
if root_path.read_bytes() == overlay_path.read_bytes():
identical.append(rel_name)
else:
root_ahead.append(rel_name)
elif root_path.exists():
missing_overlay.append(rel_name)
elif overlay_path.exists():
overlay_only.append(rel_name)
continue
for subrel, root_file in root_files.items():
rel_name = f"{rel}/{subrel}".replace("\\", "/")
overlay_file = overlay_files.get(subrel)
if overlay_file is None:
missing_overlay.append(rel_name)
elif root_file.read_bytes() == overlay_file.read_bytes():
identical.append(rel_name)
else:
root_ahead.append(rel_name)
for subrel in overlay_files:
if subrel not in root_files:
overlay_only.append(f"{rel}/{subrel}".replace("\\", "/"))
for bucket in (identical, root_ahead, overlay_only, missing_overlay):
bucket.sort()
return {
"identical": identical,
"root_ahead": root_ahead,
"overlay_only": overlay_only,
"missing_overlay": missing_overlay,
}
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Audit mirrored H200 overlay files against root source-of-truth paths")
parser.add_argument("--repo-root", type=Path, default=Path(__file__).resolve().parents[1])
parser.add_argument("--include-path", action="append", default=[])
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
include_paths = args.include_path or DEFAULT_INCLUDE_PATHS
payload = classify_overlay_pairs(repo_root=args.repo_root, include_paths=include_paths)
print(json.dumps(payload, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())