File size: 3,500 Bytes
951f760
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
from pathlib import Path


DEFAULT_INCLUDE_PATHS = [
    "hydra",
    "subsystems",
    "scripts",
    "htm_rust",
    "harness",
    "configs",
    "prepare.py",
    "prepare_nemotron.py",
    "train.py",
    "pyproject.toml",
    "uv.lock",
]


def _iter_files(path: Path) -> list[Path]:
    if not path.exists():
        return []
    if path.is_file():
        return [path]
    return sorted(p for p in path.rglob("*") if p.is_file())


def classify_overlay_pairs(*, repo_root: Path, include_paths: list[str]) -> dict[str, list[str]]:
    overlay_root = repo_root / "hf_jobs" / "feather_h200_image" / "overlay"
    identical: list[str] = []
    root_ahead: list[str] = []
    overlay_only: list[str] = []
    missing_overlay: list[str] = []

    for rel in include_paths:
        root_path = repo_root / rel
        overlay_path = overlay_root / rel

        root_files = {p.relative_to(root_path).as_posix(): p for p in _iter_files(root_path)} if root_path.exists() and root_path.is_dir() else {}
        overlay_files = {p.relative_to(overlay_path).as_posix(): p for p in _iter_files(overlay_path)} if overlay_path.exists() and overlay_path.is_dir() else {}

        if root_path.is_file() or overlay_path.is_file():
            rel_name = rel.replace("\\", "/")
            if root_path.exists() and overlay_path.exists():
                if root_path.read_bytes() == overlay_path.read_bytes():
                    identical.append(rel_name)
                else:
                    root_ahead.append(rel_name)
            elif root_path.exists():
                missing_overlay.append(rel_name)
            elif overlay_path.exists():
                overlay_only.append(rel_name)
            continue

        for subrel, root_file in root_files.items():
            rel_name = f"{rel}/{subrel}".replace("\\", "/")
            overlay_file = overlay_files.get(subrel)
            if overlay_file is None:
                missing_overlay.append(rel_name)
            elif root_file.read_bytes() == overlay_file.read_bytes():
                identical.append(rel_name)
            else:
                root_ahead.append(rel_name)

        for subrel in overlay_files:
            if subrel not in root_files:
                overlay_only.append(f"{rel}/{subrel}".replace("\\", "/"))

    for bucket in (identical, root_ahead, overlay_only, missing_overlay):
        bucket.sort()

    return {
        "identical": identical,
        "root_ahead": root_ahead,
        "overlay_only": overlay_only,
        "missing_overlay": missing_overlay,
    }


def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Audit mirrored H200 overlay files against root source-of-truth paths")
    parser.add_argument("--repo-root", type=Path, default=Path(__file__).resolve().parents[1])
    parser.add_argument("--include-path", action="append", default=[])
    return parser.parse_args(argv)


def main(argv: list[str] | None = None) -> int:
    args = parse_args(argv)
    include_paths = args.include_path or DEFAULT_INCLUDE_PATHS
    payload = classify_overlay_pairs(repo_root=args.repo_root, include_paths=include_paths)
    print(json.dumps(payload, indent=2, sort_keys=True))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())