| """Auto-push new pilot artifacts to HF as they appear. |
| |
| Designed to run as a long-lived (~8 h) nohup'd daemon on the training box. |
| Token is read from stdin and held only in process memory. |
| |
| Polls every 2 min for: |
| - new local ckpts (ckpt-step10000, ckpt-step12000): pushes each as |
| `ckpts/<name>/` under the repo. |
| - `final/` directory + `final/ablation_n200.json` (the auto-eval result): |
| pushes both, regenerates the README with the full ablation table. |
| |
| Exits after final/ is pushed, or after `--max_hours` (default 8) regardless. |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import shutil |
| import sys |
| import time |
| from pathlib import Path |
|
|
|
|
| def push_folder(api, folder, path_in_repo, repo, msg): |
| api.upload_folder( |
| folder_path=str(folder), |
| path_in_repo=path_in_repo, |
| repo_id=repo, |
| repo_type="model", |
| commit_message=msg, |
| ) |
|
|
|
|
| def push_file(api, fpath, path_in_repo, repo, msg): |
| api.upload_file( |
| path_or_fileobj=str(fpath), |
| path_in_repo=path_in_repo, |
| repo_id=repo, |
| repo_type="model", |
| commit_message=msg, |
| ) |
|
|
|
|
| def ckpt_complete(ckpt_dir: Path) -> bool: |
| """A ckpt is complete iff all 3 expected artifacts exist with nonzero size.""" |
| for rel in ("model/adapter_model.safetensors", "projector.pt", "head.pt"): |
| p = ckpt_dir / rel |
| if not p.exists() or p.stat().st_size == 0: |
| return False |
| return True |
|
|
|
|
| def regen_readme(pilot_dir: Path, code_dir: Path, repo: str) -> str: |
| """Re-import the upload helper from the local code dir.""" |
| sys.path.insert(0, str(code_dir.parent)) |
| from experiments.blt_reasoner.scripts.hf_upload_pilot import build_readme |
| return build_readme(pilot_dir, code_dir, repo) |
|
|
|
|
| def main(): |
| p = argparse.ArgumentParser() |
| p.add_argument("--repo", required=True) |
| p.add_argument("--pilot_dir", required=True) |
| p.add_argument("--code_dir", required=True) |
| p.add_argument("--train_pid", type=int, required=True) |
| p.add_argument("--max_hours", type=float, default=8.0) |
| p.add_argument("--log", default=None) |
| p.add_argument("--ckpts_to_watch", default="ckpt-step10000,ckpt-step12000") |
| args = p.parse_args() |
|
|
| log_path = args.log or os.path.join(args.pilot_dir, "auto_push.log") |
| log_f = open(log_path, "a", buffering=1) |
|
|
| def log(m): |
| line = f"[{time.strftime('%H:%M:%S')}] {m}" |
| print(line, flush=True) |
| log_f.write(line + "\n") |
|
|
| |
| |
| token = os.environ.pop("BLT_HF_TOKEN", "").strip() |
| if not token: |
| try: |
| token = sys.stdin.read().strip() |
| except Exception: |
| token = "" |
| if not token.startswith("hf_"): |
| log("ERROR: no hf_ token in BLT_HF_TOKEN env or stdin; aborting") |
| sys.exit(2) |
|
|
| from huggingface_hub import HfApi |
| api = HfApi(token=token) |
|
|
| pilot = Path(args.pilot_dir) |
| code = Path(args.code_dir) |
| watchlist = [c.strip() for c in args.ckpts_to_watch.split(",") if c.strip()] |
| pushed = set() |
|
|
| log(f"daemon start: repo={args.repo} pilot={pilot} train_pid={args.train_pid} watching={watchlist}") |
|
|
| deadline = time.time() + args.max_hours * 3600 |
| final_pushed = False |
|
|
| while time.time() < deadline and not final_pushed: |
| |
| for ckpt_name in watchlist: |
| if ckpt_name in pushed: |
| continue |
| ckpt = pilot / ckpt_name |
| if ckpt.exists() and ckpt_complete(ckpt): |
| |
| time.sleep(15) |
| try: |
| log(f"pushing {ckpt_name}") |
| push_folder(api, ckpt, f"ckpts/{ckpt_name}", args.repo, |
| f"Add {ckpt_name}") |
| pushed.add(ckpt_name) |
| log(f" ok: {ckpt_name}") |
| except Exception as e: |
| log(f" ERROR pushing {ckpt_name}: {e!r}; will retry") |
|
|
| |
| try: |
| os.kill(args.train_pid, 0) |
| train_alive = True |
| except ProcessLookupError: |
| train_alive = False |
| except PermissionError: |
| train_alive = True |
|
|
| if not train_alive: |
| |
| final_dir = pilot / "final" |
| final_abl = final_dir / "ablation_n200.json" |
| if final_dir.exists() and final_abl.exists(): |
| |
| time.sleep(30) |
| try: |
| log("pushing final/ (ckpt + ablation_n200.json)") |
| push_folder(api, final_dir, "final", args.repo, |
| "Add final ckpt + n=200 pre-registered z-ablation") |
| |
| for name in ("auto_eval.log", "run.log", "metrics.jsonl"): |
| f = pilot / name |
| if f.exists(): |
| try: |
| push_file(api, f, f"logs/{name}", args.repo, |
| f"Refresh logs/{name} at end of pilot") |
| except Exception as e: |
| log(f" warn: log push {name}: {e!r}") |
| |
| try: |
| readme = regen_readme(pilot, code, args.repo) |
| tmp = Path("/tmp/blt_final_readme.md") |
| tmp.write_text(readme) |
| push_file(api, tmp, "README.md", args.repo, |
| "Regenerate README with final ablation results") |
| tmp.unlink(missing_ok=True) |
| except Exception as e: |
| log(f" warn: README regen: {e!r}") |
| final_pushed = True |
| log("DONE: final pushed; daemon exiting") |
| except Exception as e: |
| log(f" ERROR pushing final: {e!r}; will retry") |
| else: |
| log(f"train PID gone but final/ablation_n200.json not yet present; waiting") |
|
|
| if not final_pushed: |
| time.sleep(120) |
|
|
| if not final_pushed: |
| log(f"deadline reached or daemon exiting without final push (pushed={pushed})") |
| log_f.close() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|