#!/usr/bin/env python3 from __future__ import annotations import argparse import json from pathlib import Path DEFAULT_INCLUDE_PATHS = [ "hydra", "subsystems", "scripts", "htm_rust", "harness", "configs", "prepare.py", "prepare_nemotron.py", "train.py", "pyproject.toml", "uv.lock", ] def _iter_files(path: Path) -> list[Path]: if not path.exists(): return [] if path.is_file(): return [path] return sorted(p for p in path.rglob("*") if p.is_file()) def classify_overlay_pairs(*, repo_root: Path, include_paths: list[str]) -> dict[str, list[str]]: overlay_root = repo_root / "hf_jobs" / "feather_h200_image" / "overlay" identical: list[str] = [] root_ahead: list[str] = [] overlay_only: list[str] = [] missing_overlay: list[str] = [] for rel in include_paths: root_path = repo_root / rel overlay_path = overlay_root / rel root_files = {p.relative_to(root_path).as_posix(): p for p in _iter_files(root_path)} if root_path.exists() and root_path.is_dir() else {} overlay_files = {p.relative_to(overlay_path).as_posix(): p for p in _iter_files(overlay_path)} if overlay_path.exists() and overlay_path.is_dir() else {} if root_path.is_file() or overlay_path.is_file(): rel_name = rel.replace("\\", "/") if root_path.exists() and overlay_path.exists(): if root_path.read_bytes() == overlay_path.read_bytes(): identical.append(rel_name) else: root_ahead.append(rel_name) elif root_path.exists(): missing_overlay.append(rel_name) elif overlay_path.exists(): overlay_only.append(rel_name) continue for subrel, root_file in root_files.items(): rel_name = f"{rel}/{subrel}".replace("\\", "/") overlay_file = overlay_files.get(subrel) if overlay_file is None: missing_overlay.append(rel_name) elif root_file.read_bytes() == overlay_file.read_bytes(): identical.append(rel_name) else: root_ahead.append(rel_name) for subrel in overlay_files: if subrel not in root_files: overlay_only.append(f"{rel}/{subrel}".replace("\\", "/")) for bucket in (identical, root_ahead, overlay_only, missing_overlay): bucket.sort() return { "identical": identical, "root_ahead": root_ahead, "overlay_only": overlay_only, "missing_overlay": missing_overlay, } def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Audit mirrored H200 overlay files against root source-of-truth paths") parser.add_argument("--repo-root", type=Path, default=Path(__file__).resolve().parents[1]) parser.add_argument("--include-path", action="append", default=[]) return parser.parse_args(argv) def main(argv: list[str] | None = None) -> int: args = parse_args(argv) include_paths = args.include_path or DEFAULT_INCLUDE_PATHS payload = classify_overlay_pairs(repo_root=args.repo_root, include_paths=include_paths) print(json.dumps(payload, indent=2, sort_keys=True)) return 0 if __name__ == "__main__": raise SystemExit(main())